diff --git a/lib/observables/nopeObservable.ts b/lib/observables/nopeObservable.ts index 32a3fe8..4b7706b 100644 --- a/lib/observables/nopeObservable.ts +++ b/lib/observables/nopeObservable.ts @@ -9,6 +9,7 @@ import { BehaviorSubject, Observable, Subscription } from "rxjs"; import { generateId } from "../helpers/idMethods"; import { callImmediate } from "../helpers/runtimeMethods"; +import { getNopeLogger } from "../logger/getLogger"; import { INopeObservable, INopeObserver, @@ -21,6 +22,8 @@ import { IwaitForCallback } from "../types/nope/nopeObservable.interface"; +const logger = getNopeLogger("obervable"); + /** * RsJX based Observable. * @@ -45,14 +48,14 @@ export class NopeObservable */ public setter: | (( - value: S | null, - sender: string | null, - timeStamp: number | null, - ...data - ) => { - data: T | null; - valid: boolean; - }) + value: S | null, + sender: string | null, + timeStamp: number | null, + ...data + ) => { + data: T | null; + valid: boolean; + }) | null = null; /** @@ -209,6 +212,8 @@ export class NopeObservable return this._value as any as G; } + protected _lastDataUpdate: number; + /** * A Function to subscribe to updates of the Observable. * @param observer The Observer. Could be a Function or a Partial Observer. @@ -225,13 +230,20 @@ export class NopeObservable ): INopeObserver { const _this = this; - let active = false; - + let active = true; let _observer: IPartialObserver; + let _first = true; if (typeof observer === "object") { _observer = { next: (data: G) => { + // Make shure we are skipping the current Item, if desired + if (_first && options.skipCurrent) { + _first = false; + return; + } + _first = false; + if (active && data !== undefined && observer.next) { if (mode === "immediate") { callImmediate( @@ -265,6 +277,13 @@ export class NopeObservable } else if (typeof observer === "function") { _observer = { next: (data: G) => { + // Make shure we are skipping the current Item, if desired + if (_first && options.skipCurrent) { + _first = false; + return; + } + _first = false; + if (active && data !== undefined) { if (mode === "immediate") { callImmediate( @@ -279,8 +298,11 @@ export class NopeObservable } } }, - complete: () => {}, - error: (error) => {} + complete: () => { }, + error: (error) => { + logger.error(""); + logger.error(error); + } }; } @@ -299,12 +321,15 @@ export class NopeObservable active = true; - if (!options.skipCurrent) { - const _this = this; - callImmediate(() => { - _observer.next(_this.getContent()); - }); - } + // if (!options.skipCurrent) { + // const _this = this; + // const _start = Date.now(); + // callImmediate(() => { + // if (_this._lastDataUpdate < _start) { + // _observer.next(_this.getContent()); + // } + // }); + // } return ret; } @@ -374,53 +399,51 @@ export class NopeObservable ): Promise { const _this = this; - const _isAsync = testCallback[Symbol.toStringTag] === "AsyncFunction"; + let resolved = false; + let subscription: INopeObserver = null; - if (_isAsync) { - let called = false; - return new Promise(async (resolve, reject) => { - try { - if (options.testCurrent && (await testCallback(_this.getContent()))) { - resolve(_this.getContent()); - } else { - let subscription: INopeObserver = null; - subscription = _this.subscribe({ - next: async (content, ...args) => { - if (!called && (await testCallback(content, ...args))) { - subscription.unsubscribe(); - - called = true; - - resolve(content); - } - } - }); - } - } catch (e) { - reject(e); + return new Promise((resolve, reject) => { + const finish = (error: any, test: boolean, data: G) => { + // Reject the error. + if (error) { + reject(error); } - }); - } else { - return new Promise((resolve, reject) => { - try { - if (options.testCurrent && testCallback(_this.getContent())) { - resolve(_this.getContent()); - } else { - let subscription: INopeObserver = null; - subscription = _this.subscribe({ - next: (content, ...args) => { - if (testCallback(content, ...args)) { - subscription.unsubscribe(); - resolve(content); - } - } - }); - } - } catch (e) { - reject(e); + + // Unsubscribe the Subscription. + if (test && subscription) { + subscription.unsubscribe(); + subscription = null; } - }); - } + + if (test && !resolved) { + // Mark the Task as Resolved. + resolved = true; + resolve(data); + } + }; + + let first = true; + + const checkData = (data: G) => { + if (first && options.testCurrent || !first) { + // Create a promise of the data + const prom = Promise.resolve(testCallback(_this.getContent())); + + // Now we link the element + prom.catch(e => finish(e, false, data)); + prom.then(r => finish(false, r, data)); + } + + first = false; + + }; + + try { + subscription = _this.subscribe(data => checkData(data)); + } catch (e) { + reject(e); + } + }); } /** diff --git a/test/testNopeObservable.ts b/test/testNopeObservable.ts index 788f9a8..8b8d021 100644 --- a/test/testNopeObservable.ts +++ b/test/testNopeObservable.ts @@ -1,28 +1,25 @@ -import { filter } from 'rxjs/operators'; -import { nopeObservable } from "../lib/observables/nopeObservable"; +import { NopeObservable } from "../lib/observables/nopeObservable"; -const observable = new nopeObservable(); -const subscriptionSmaller = observable.observable.pipe( - filter((v, idx) => v < 10) -).subscribe((v) => { - console.log('smaller 10:', v) -}); - -const subscriptionGreater = observable.observable.pipe( - filter((v, idx) => v > 10) -).subscribe((v) => { - console.log('greater 10: ', v) -}); - -const subEnhanced = observable.enhancedSubscription((v) => console.log('enhanced', v), { - pipe: (scope, observable) => { - return observable.pipe(filter((v, idx) => v > 10)) +const observable = new NopeObservable(); +observable.setContent(0); +const observer = observable.subscribe({ + next: (...args) => { + console.log("Received the data", ...args); } }); - +const max = 10; let i = 1; -while (i < 20) { - observable.setContent(i++); - console.log('current', observable.getContent()); -} \ No newline at end of file + +while (i <= max) { + observable.setContent(i); + i++; +} + +i = 10; + +observable.subscribe((...args) => { + console.log("shoud not receive 10:", ...args); +}, "sync", { skipCurrent: true }); + +observable.setContent(1337); \ No newline at end of file diff --git a/test/testObservable.ts b/test/testObservable.ts index 90934a4..4cdd0cf 100644 --- a/test/testObservable.ts +++ b/test/testObservable.ts @@ -1,16 +1,16 @@ -import { nopeObservable } from "../lib/observables/nopeObservable"; +import { NopeObservable } from "../lib/observables/nopeObservable"; import { generateBenchmarkFunction } from "../modules/funcs/generateBenchmarkFunction"; let max = 10000000; -const benchmark = generateBenchmarkFunction(max, ''); -const observable = new nopeObservable(); +const benchmark = generateBenchmarkFunction(max, ""); +const observable = new NopeObservable(); max = max * 10; -let i = 0 +let i = 0; -let subscription = observable.subscribe({ +const subscription = observable.subscribe({ next: benchmark -}) +}); while (i < max) { observable.setContent(i);