李恒道 发表于 2023-1-21 00:53:16

Rxjs关于为什么需要subscribeOn(queueScheduler)的探索

# 前文
Rxjs的资料还是比较匮乏的
最近挺感兴趣的
就没事研究一下
可能存在事实性错误
# 正文
我们可以看一段比较有意思的代码
```js
const sourceA$ = scheduled(, queueScheduler);
const sourceB$ = scheduled(, queueScheduler);
const combine$ = combineLatest();
combine$
.pipe(
    subscribeOn(queueScheduler),
    map(() => {
      return a + b;
    })
)
.subscribe((result) => {
    console.log("result2", result);
});

```
这个执行顺序是1+3|2+3|2+4
但是如果删掉subscribeOn(queueScheduler)
就会变成
2+3|2+4
为什么会导致这样?
queueScheduler是一个排队函数
而subscribeOn是指订阅时所用的方式
为什么订阅可以影响到最后得到的结果?
我们可以看一下他的源码
```js
export function subscribeOn<T>(scheduler: SchedulerLike, delay: number = 0): MonoTypeOperatorFunction<T> {
return operate((source, subscriber) => {
    subscriber.add(scheduler.schedule(() =>{
      source.subscribe(subscriber)
    }, delay));
});
}
```
这里相当于一个科里化
subscribeOn函数传入的是调度器函数
operate函数内传入的是source和subscriber
source是该operate的父观察者
subscriber是operate的子订阅者
然后执行了
```js
    subscriber.add(scheduler.schedule(() =>{
      source.subscribe(subscriber)
    }, delay));
```
subscriber.add根据调试时一个null值,内部有一个条件检测,所以add在本次调试无用
![图片.png](data/attachment/forum/202301/21/003728m13g6xllcv0yb3ia.png)
可以直接忽略,那剩下的就是
```js
scheduler.schedule(() =>{
      source.subscribe(subscriber)
    }, delay)
```
到底是不是这个函数起作用,我们可以试一下
```js
const sourceA$ = scheduled(, queueScheduler)
const sourceB$ = scheduled(, queueScheduler)
const combine$ = combineLatest();
const merge$ = combine$.pipe(
map(() => {
    return a + b;
})
);
const child = new SafeSubscriber(
(result) => {
    console.log("result", result);
},
null,
null
);

queueScheduler.schedule(() => {
merge$.subscribe(child);
}, 0);
```
发现结果没有变
但是如果把queueScheduler.schedule这些删掉,结果就又错了
所以关键在于queueScheduler.schedule的调度问题
我们去看一下这里的源码
```js
export const queueScheduler = new QueueScheduler(QueueAction);
```
可以看到初始化了一个QueueScheduler然后传入了QueueAction
```js
class AsyncScheduler extends Scheduler {
constructor(SchedulerAction: typeof Action, now: () => number = Scheduler.now) {
    super(SchedulerAction, now);
}
}
```
再往上看可以看到我们把QueueAction的构造函数赋值到了AsyncScheduler上的schedulerActionCtor上
```js
class Scheduler implements SchedulerLike {
constructor(private schedulerActionCtor: typeof Action, now: () => number = Scheduler.now) {
    this.now = now;
}
}
```
因为调用的是scheduler.schedule
```js
public schedule<T>(work: (this: SchedulerAction<T>, state?: T) => void, delay: number = 0, state?: T): Subscription {
    return new this.schedulerActionCtor<T>(this, work).schedule(state, delay);
}
```
发现初始化了schedulerActionCtor并传入this和回调函数work,然后调用schedule传入state和delay
我们传入的是QueueAction,所以去看QueueAction
```js
export class QueueAction<T> extends AsyncAction<T> {
constructor(protected scheduler: QueueScheduler, protected work: (this: SchedulerAction<T>, state?: T) => void) {
    super(scheduler, work);
}

public schedule(state?: T, delay: number = 0): Subscription {
    if (delay > 0) {
      return super.schedule(state, delay);
    }
    this.delay = delay;
    this.state = state;
    this.scheduler.flush(this);
    return this;
}

public execute(state: T, delay: number): any {
    return delay > 0 || this.closed ? super.execute(state, delay) : this._execute(state, delay);
}

protected requestAsyncId(scheduler: QueueScheduler, id?: TimerHandle, delay: number = 0): TimerHandle {
    // If delay exists and is greater than 0, or if the delay is null (the
    // action wasn't rescheduled) but was originally scheduled as an async
    // action, then recycle as an async action.

    if ((delay != null && delay > 0) || (delay == null && this.delay > 0)) {
      return super.requestAsyncId(scheduler, id, delay);
    }

    // Otherwise flush the scheduler starting with this action.
    scheduler.flush(this);

    // HACK: In the past, this was returning `void`. However, `void` isn't a valid
    // `TimerHandle`, and generally the return value here isn't really used. So the
    // compromise is to return `0` which is both "falsy" and a valid `TimerHandle`,
    // as opposed to refactoring every other instanceo of `requestAsyncId`.
    return 0;
}
}
```
因为调用了他的schedule,可以发现她调用了this.scheduler.flush(this),把自身回传,scheduler则是刚才我们new this.schedulerActionCtor<T>(this, work)传入的this,那再返回去看flush函数
```js
public flush(action: AsyncAction<any>): void {
    const { actions } = this;

    if (this._active) {
      actions.push(action);
      return;
    }

    let error: any;
    this._active = true;

    do {
      if ((error = action.execute(action.state, action.delay))) {
      break;
      }
    } while ((action = actions.shift()!)); // exhaust the scheduler queue

    this._active = false;

    if (error) {
      while ((action = actions.shift()!)) {
      action.unsubscribe();
      }
      throw error;
    }
}
```
总结一下,这里每次调用传入一个action任务
如果正在执行,就投入到数组尾部,如果没有执行,那就直接开始执行完队列
所以我们的
```js
scheduler.schedule(() =>{
      source.subscribe(subscriber)
}, delay)
```
正是影响了这里的flush队列
我们来模拟一下在scheduler.schedule里调用和不在scheduler.schedule调用的情况
## 在scheduler.schedule里调用
首先插入一个事件,负责订阅,订阅会传导到我们可以看一段比较有意思的代码
```js
const sourceA$ = scheduled(, queueScheduler);
const sourceB$ = scheduled(, queueScheduler);
const combine$ = combineLatest();
combine$
.pipe(
    subscribeOn(queueScheduler),
    map(() => {
      return a + b;
    })
)
.subscribe((result) => {
    console.log("result2", result);
});

```
这个执行顺序是1+3|2+3|2+4
但是如果删掉subscribeOn(queueScheduler)
就会变成
2+3|2+4
为什么会导致这样?
queueScheduler是一个排队函数
而subscribeOn是指订阅时所用的方式
为什么订阅可以影响到最后得到的结果?
我们可以看一下他的源码
```js
export function subscribeOn<T>(scheduler: SchedulerLike, delay: number = 0): MonoTypeOperatorFunction<T> {
return operate((source, subscriber) => {
    subscriber.add(scheduler.schedule(() =>{
      source.subscribe(subscriber)
    }, delay));
});
}
```
这里相当于一个科里化
subscribeOn函数传入的是调度器函数
operate函数内传入的是source和subscriber
source是该operate的父观察者
subscriber是operate的子订阅者
然后执行了
```js
    subscriber.add(scheduler.schedule(() =>{
      source.subscribe(subscriber)
    }, delay));
```
subscriber.add根据调试时一个null值,内部有一个条件检测,所以add在本次调试无用
![图片.png](data/attachment/forum/202301/21/003728m13g6xllcv0yb3ia.png)
可以直接忽略,那剩下的就是
```js
scheduler.schedule(() =>{
      source.subscribe(subscriber)
    }, delay)
```
到底是不是这个函数起作用,我们可以试一下
```js
const sourceA$ = scheduled(, queueScheduler)
const sourceB$ = scheduled(, queueScheduler)
const combine$ = combineLatest();
const merge$ = combine$.pipe(
map(() => {
    return a + b;
})
);
const child = new SafeSubscriber(
(result) => {
    console.log("result", result);
},
null,
null
);

queueScheduler.schedule(() => {
merge$.subscribe(child);
}, 0);
```
发现结果没有变
但是如果把queueScheduler.schedule这些删掉,结果就又错了
所以关键在于queueScheduler.schedule的调度问题
我们去看一下这里的源码
```js
export const queueScheduler = new QueueScheduler(QueueAction);
```
可以看到初始化了一个QueueScheduler然后传入了QueueAction
```js
class AsyncScheduler extends Scheduler {
constructor(SchedulerAction: typeof Action, now: () => number = Scheduler.now) {
    super(SchedulerAction, now);
}
}
```
再往上看可以看到我们把QueueAction的构造函数赋值到了AsyncScheduler上的schedulerActionCtor上
```js
class Scheduler implements SchedulerLike {
constructor(private schedulerActionCtor: typeof Action, now: () => number = Scheduler.now) {
    this.now = now;
}
}
```
因为调用的是scheduler.schedule
```js
public schedule<T>(work: (this: SchedulerAction<T>, state?: T) => void, delay: number = 0, state?: T): Subscription {
    return new this.schedulerActionCtor<T>(this, work).schedule(state, delay);
}
```
发现初始化了schedulerActionCtor并传入this和回调函数work,然后调用schedule传入state和delay
我们传入的是QueueAction,所以去看QueueAction
```js
export class QueueAction<T> extends AsyncAction<T> {
constructor(protected scheduler: QueueScheduler, protected work: (this: SchedulerAction<T>, state?: T) => void) {
    super(scheduler, work);
}

public schedule(state?: T, delay: number = 0): Subscription {
    if (delay > 0) {
      return super.schedule(state, delay);
    }
    this.delay = delay;
    this.state = state;
    this.scheduler.flush(this);
    return this;
}

public execute(state: T, delay: number): any {
    return delay > 0 || this.closed ? super.execute(state, delay) : this._execute(state, delay);
}

protected requestAsyncId(scheduler: QueueScheduler, id?: TimerHandle, delay: number = 0): TimerHandle {
    // If delay exists and is greater than 0, or if the delay is null (the
    // action wasn't rescheduled) but was originally scheduled as an async
    // action, then recycle as an async action.

    if ((delay != null && delay > 0) || (delay == null && this.delay > 0)) {
      return super.requestAsyncId(scheduler, id, delay);
    }

    // Otherwise flush the scheduler starting with this action.
    scheduler.flush(this);

    // HACK: In the past, this was returning `void`. However, `void` isn't a valid
    // `TimerHandle`, and generally the return value here isn't really used. So the
    // compromise is to return `0` which is both "falsy" and a valid `TimerHandle`,
    // as opposed to refactoring every other instanceo of `requestAsyncId`.
    return 0;
}
}
```
因为调用了他的schedule,可以发现她调用了this.scheduler.flush(this),把自身回传,scheduler则是刚才我们new this.schedulerActionCtor<T>(this, work)传入的this,那再返回去看flush函数
```js
public flush(action: AsyncAction<any>): void {
    const { actions } = this;

    if (this._active) {
      actions.push(action);
      return;
    }

    let error: any;
    this._active = true;

    do {
      if ((error = action.execute(action.state, action.delay))) {
      break;
      }
    } while ((action = actions.shift()!)); // exhaust the scheduler queue

    this._active = false;

    if (error) {
      while ((action = actions.shift()!)) {
      action.unsubscribe();
      }
      throw error;
    }
}
```
总结一下,这里每次调用传入一个action任务
如果正在执行,就投入到数组尾部,如果没有执行,那就直接开始执行完队列
所以我们的
```js
scheduler.schedule(() =>{
      source.subscribe(subscriber)
}, delay)
```
正是影响了这里的flush队列
我们来模拟一下在scheduler.schedule里调用和不在scheduler.schedule调用的情况
## 在scheduler.schedule里调用
首先插入一个事件,负责订阅,订阅会传导到sourceA和sourceB
因为A和B都使用了scheduled(xxx, queueScheduler)
所以也会使用queueScheduler来做递归任务
那么此时会出现任务队列

订阅事件回调(订阅A以及B)
sourceA提取数据
sourceB提取数据

执行完订阅就会删除,并且按顺序一个一个执行
因为sourceA提取后还会继续插入队列
所以接下来的是

订阅事件回调(删除)
sourceA提取数据
sourceB提取数据
sourceA提取数据
sourceB提取数据
空数组

按照我们所想的顺序执行了~

## 不在scheduler.schedule里调用
直接在主线程进行订阅,而在combineLatest()会按顺序进行订阅,先订阅A

因为此时不存在action,所以立即执行a的第一个任务
a执行第一个任务后会立刻插入一个任务
直到执行完成后,才开始订阅B任务,所以是

订阅事件回调(订阅A)
sourceA提取数据
sourceA提取数据
订阅事件回调(订阅B)
sourceB提取数据
sourceB提取数据
空数组

因为combineLatest的特性,两个必须同时存在数据才有返回
所以A最后的结尾值是2,而B会按顺序触发3,4,自然得到了2+3,2+4两个值
那么我们就彻底理解了Scheduler理论~

# 结语
撒花~



页: [1]
查看完整版本: Rxjs关于为什么需要subscribeOn(queueScheduler)的探索