具体可以参考
https://bbs.tampermonkey.net.cn/thread-4027-1-1.html
存在一个
while (buffer.length && active < concurrent) {
const bufferedValue = buffer.shift()!;
// Particularly for `expand`, we need to check to see if a scheduler was provided
// for when we want to start our inner subscription. Otherwise, we just start
// are next inner subscription.
if (innerSubScheduler) {
executeSchedule(subscriber, innerSubScheduler, () => doInnerSub(bufferedValue));
} else {
doInnerSub(bufferedValue);
}
}
这里会循环调用doInnerSub(bufferedValue);来继续调用
如果buffer存在的观察者过多容易导致堆栈溢出问题
关于这个可以参考
https://github.com/ReactiveX/rxjs/issues/4055
https://github.com/ReactiveX/rxjs/issues/3609
https://github.com/ReactiveX/rxjs/pull/3621
需要使用queueScheduler
因为uQueueScheduler将递归循环变成了迭代循环
public flush(action: AsyncAction<any>): void {
const { actions } = this;
if (this._active) {
actions.push(action);
return;
}
let error: any;
this._active = true;
do {
if ((error = action.execute(action.state, action.delay))) {
break;
}
} while ((action = actions.shift()!)); // exhaust the scheduler queue
this._active = false;
if (error) {
while ((action = actions.shift()!)) {
action.unsubscribe();
}
throw error;
}
}
实现原理类似于https://bbs.tampermonkey.net.cn/thread-4026-1-1.html
所以如果可观测到存在大量的堆积问题请尝试使用调度器
官方宣称这是一种广度搜索和深度搜索的差异问题
但是我个人来说还是不喜欢的...
结语
撒花~