Thank you for writing xstream, we use it heavily in a sizable project and have enjoyed using it.
There appears to be a subtle timing bug that seems to be caused by the way streams are asynchronously disconnected. The following unit test shows the bug. In the test, I have a producer (FiveProducer). that emits 5 but never completes, this is simply meant to mimic a stream that emits a value and may possibly emit more in the future. This producer has .compose(dropRepeats()).remember() added.
If I subscribe to this stream once (sub1 / spy1), and then unsubscribe from it, wait zero milliseconds, then subscribe a second time (spy2), I would expect both spy1 and spy2 to receive the value 5. However, spy2 does not.
class FiveProducer implements InternalProducer<number> {
public type = 'FiveProducer';
_start(out: InternalListener<number>): void {
out._n(5);
}
_stop(): void {}
}
test('Verify dropRepeats behaviour', async () => {
const stream = new Stream(new FiveProducer()).compose(dropRepeats()).remember();
const spy1 = jest.fn();
const sub1 = stream.subscribe({ next: spy1 });
sub1.unsubscribe();
// This is important -- must wait exactly zero.
await jest.advanceTimersByTimeAsync(0);
const spy2 = jest.fn();
stream.subscribe({ next: spy2 });
await jest.advanceTimersByTimeAsync(10);
// This succeeds -- spy1 is called once with the right value
expect(spy1).toHaveBeenCalledTimes(1);
expect(spy1).toHaveBeenNthCalledWith(1, 5);
// This fails -- spy2 is not called
expect(spy2).toHaveBeenCalledTimes(1);
expect(spy2).toHaveBeenNthCalledWith(1, 5);
});
What seems to be happening is:
sub1 is unsubscribed, which schedules an asynchronous stop of the .remember() stream.
- Asynchronously, the
.remember stream is stopped, which causes an asynchronous operation to be scheduled, to stop the .compose stream
- However, before this runs, my second spy attempts to subscribe to the stream. This starts the
.remember() stream, which doesn't emit a remembered value immediately (it has been stopped, and .has==false. Instead it starts the .compose stream.
- The
.compose stream runs this code, which cancels the stop timer, but does not cause any other action to happen:
if (this._stopID !== NO) {
clearTimeout(this._stopID);
this._stopID = NO;
}
The end effect is that the stream never receives any data at all. This feels like a race condition bug to me, but I do not know exactly where/how to fix this. I'd appreciate any advice, as we run into this bug somewhat often, and unpredictably, since our app heavily uses many streams asynchronously.
Thank you!
Thank you for writing
xstream, we use it heavily in a sizable project and have enjoyed using it.There appears to be a subtle timing bug that seems to be caused by the way streams are asynchronously disconnected. The following unit test shows the bug. In the test, I have a producer (
FiveProducer). that emits5but never completes, this is simply meant to mimic a stream that emits a value and may possibly emit more in the future. This producer has.compose(dropRepeats()).remember()added.If I subscribe to this stream once (
sub1/spy1), and then unsubscribe from it, wait zero milliseconds, then subscribe a second time (spy2), I would expect bothspy1andspy2to receive the value5. However,spy2does not.What seems to be happening is:
sub1is unsubscribed, which schedules an asynchronous stop of the.remember()stream..rememberstream is stopped, which causes an asynchronous operation to be scheduled, to stop the.composestream.remember()stream, which doesn't emit a remembered value immediately (it has been stopped, and.has==false. Instead it starts the.composestream..composestream runs this code, which cancels the stop timer, but does not cause any other action to happen:The end effect is that the stream never receives any data at all. This feels like a race condition bug to me, but I do not know exactly where/how to fix this. I'd appreciate any advice, as we run into this bug somewhat often, and unpredictably, since our app heavily uses many streams asynchronously.
Thank you!