Fixing Observables

This commit is contained in:
Martin Karkowski 2021-10-18 08:02:29 +02:00
parent 582bcfb3a3
commit f6a86c30f3
3 changed files with 111 additions and 91 deletions

View File

@ -9,6 +9,7 @@
import { BehaviorSubject, Observable, Subscription } from "rxjs"; import { BehaviorSubject, Observable, Subscription } from "rxjs";
import { generateId } from "../helpers/idMethods"; import { generateId } from "../helpers/idMethods";
import { callImmediate } from "../helpers/runtimeMethods"; import { callImmediate } from "../helpers/runtimeMethods";
import { getNopeLogger } from "../logger/getLogger";
import { import {
INopeObservable, INopeObservable,
INopeObserver, INopeObserver,
@ -21,6 +22,8 @@ import {
IwaitForCallback IwaitForCallback
} from "../types/nope/nopeObservable.interface"; } from "../types/nope/nopeObservable.interface";
const logger = getNopeLogger("obervable");
/** /**
* RsJX based Observable. * RsJX based Observable.
* *
@ -209,6 +212,8 @@ export class NopeObservable<T, S = T, G = T>
return this._value as any as G; return this._value as any as G;
} }
protected _lastDataUpdate: number;
/** /**
* A Function to subscribe to updates of the Observable. * A Function to subscribe to updates of the Observable.
* @param observer The Observer. Could be a Function or a Partial Observer. * @param observer The Observer. Could be a Function or a Partial Observer.
@ -225,13 +230,20 @@ export class NopeObservable<T, S = T, G = T>
): INopeObserver { ): INopeObserver {
const _this = this; const _this = this;
let active = false; let active = true;
let _observer: IPartialObserver<G>; let _observer: IPartialObserver<G>;
let _first = true;
if (typeof observer === "object") { if (typeof observer === "object") {
_observer = { _observer = {
next: (data: G) => { next: (data: G) => {
// Make shure we are skipping the current Item, if desired
if (_first && options.skipCurrent) {
_first = false;
return;
}
_first = false;
if (active && data !== undefined && observer.next) { if (active && data !== undefined && observer.next) {
if (mode === "immediate") { if (mode === "immediate") {
callImmediate( callImmediate(
@ -265,6 +277,13 @@ export class NopeObservable<T, S = T, G = T>
} else if (typeof observer === "function") { } else if (typeof observer === "function") {
_observer = { _observer = {
next: (data: G) => { next: (data: G) => {
// Make shure we are skipping the current Item, if desired
if (_first && options.skipCurrent) {
_first = false;
return;
}
_first = false;
if (active && data !== undefined) { if (active && data !== undefined) {
if (mode === "immediate") { if (mode === "immediate") {
callImmediate( callImmediate(
@ -279,8 +298,11 @@ export class NopeObservable<T, S = T, G = T>
} }
} }
}, },
complete: () => {}, complete: () => { },
error: (error) => {} error: (error) => {
logger.error("");
logger.error(error);
}
}; };
} }
@ -299,12 +321,15 @@ export class NopeObservable<T, S = T, G = T>
active = true; active = true;
if (!options.skipCurrent) { // if (!options.skipCurrent) {
const _this = this; // const _this = this;
callImmediate(() => { // const _start = Date.now();
_observer.next(_this.getContent()); // callImmediate(() => {
}); // if (_this._lastDataUpdate < _start) {
} // _observer.next(_this.getContent());
// }
// });
// }
return ret; return ret;
} }
@ -374,54 +399,52 @@ export class NopeObservable<T, S = T, G = T>
): Promise<G> { ): Promise<G> {
const _this = this; const _this = this;
const _isAsync = testCallback[Symbol.toStringTag] === "AsyncFunction"; let resolved = false;
if (_isAsync) {
let called = false;
return new Promise<G>(async (resolve, reject) => {
try {
if (options.testCurrent && (await testCallback(_this.getContent()))) {
resolve(_this.getContent());
} else {
let subscription: INopeObserver = null; let subscription: INopeObserver = null;
subscription = _this.subscribe({
next: async (content, ...args) => {
if (!called && (await testCallback(content, ...args))) {
subscription.unsubscribe();
called = true;
resolve(content);
}
}
});
}
} catch (e) {
reject(e);
}
});
} else {
return new Promise<G>((resolve, reject) => { return new Promise<G>((resolve, reject) => {
try { const finish = (error: any, test: boolean, data: G) => {
if (options.testCurrent && testCallback(_this.getContent())) { // Reject the error.
resolve(_this.getContent()); if (error) {
} else { reject(error);
let subscription: INopeObserver = null; }
subscription = _this.subscribe({
next: (content, ...args) => { // Unsubscribe the Subscription.
if (testCallback(content, ...args)) { if (test && subscription) {
subscription.unsubscribe(); subscription.unsubscribe();
resolve(content); subscription = null;
} }
if (test && !resolved) {
// Mark the Task as Resolved.
resolved = true;
resolve(data);
} }
}); };
let first = true;
const checkData = (data: G) => {
if (first && options.testCurrent || !first) {
// Create a promise of the data
const prom = Promise.resolve(testCallback(_this.getContent()));
// Now we link the element
prom.catch(e => finish(e, false, data));
prom.then(r => finish(false, r, data));
} }
first = false;
};
try {
subscription = _this.subscribe(data => checkData(data));
} catch (e) { } catch (e) {
reject(e); reject(e);
} }
}); });
} }
}
/** /**
* Async Function to Wait for an Update * Async Function to Wait for an Update

View File

@ -1,28 +1,25 @@
import { filter } from 'rxjs/operators'; import { NopeObservable } from "../lib/observables/nopeObservable";
import { nopeObservable } from "../lib/observables/nopeObservable";
const observable = new nopeObservable<number>(); const observable = new NopeObservable<number>();
const subscriptionSmaller = observable.observable.pipe( observable.setContent(0);
filter((v, idx) => v < 10) const observer = observable.subscribe({
).subscribe((v) => { next: (...args) => {
console.log('smaller 10:', v) console.log("Received the data", ...args);
});
const subscriptionGreater = observable.observable.pipe(
filter((v, idx) => v > 10)
).subscribe((v) => {
console.log('greater 10: ', v)
});
const subEnhanced = observable.enhancedSubscription((v) => console.log('enhanced', v), {
pipe: (scope, observable) => {
return observable.pipe(filter((v, idx) => v > 10))
} }
}); });
const max = 10;
let i = 1; let i = 1;
while (i < 20) {
observable.setContent(i++); while (i <= max) {
console.log('current', observable.getContent()); observable.setContent(i);
i++;
} }
i = 10;
observable.subscribe((...args) => {
console.log("shoud not receive 10:", ...args);
}, "sync", { skipCurrent: true });
observable.setContent(1337);

View File

@ -1,16 +1,16 @@
import { nopeObservable } from "../lib/observables/nopeObservable"; import { NopeObservable } from "../lib/observables/nopeObservable";
import { generateBenchmarkFunction } from "../modules/funcs/generateBenchmarkFunction"; import { generateBenchmarkFunction } from "../modules/funcs/generateBenchmarkFunction";
let max = 10000000; let max = 10000000;
const benchmark = generateBenchmarkFunction(max, ''); const benchmark = generateBenchmarkFunction(max, "");
const observable = new nopeObservable<number>(); const observable = new NopeObservable<number>();
max = max * 10; max = max * 10;
let i = 0 let i = 0;
let subscription = observable.subscribe({ const subscription = observable.subscribe({
next: benchmark next: benchmark
}) });
while (i < max) { while (i < max) {
observable.setContent(i); observable.setContent(i);