上一主题 下一主题
ScriptCat,新一代的脚本管理器脚本站,与全世界分享你的用户脚本油猴脚本开发指南教程目录
返回列表 发新帖

Rxjs关于为什么需要subscribeOn(queueScheduler)的探索

[复制链接]
  • TA的每日心情
    开心
    2023-2-28 23:59
  • 签到天数: 191 天

    [LV.7]常住居民III

    687

    主题

    5424

    回帖

    6362

    积分

    管理员

    非物质文化遗产社会摇传承人

    积分
    6362

    荣誉开发者管理员油中2周年生态建设者喜迎中秋

    发表于 2023-1-21 00:53:16 | 显示全部楼层 | 阅读模式

    前文

    Rxjs的资料还是比较匮乏的
    最近挺感兴趣的
    就没事研究一下
    可能存在事实性错误

    正文

    我们可以看一段比较有意思的代码

    const sourceA$ = scheduled([1, 2], queueScheduler);
    const sourceB$ = scheduled([3, 4], queueScheduler);
    const combine$ = combineLatest([sourceA$, sourceB$]);
    combine$
      .pipe(
        subscribeOn(queueScheduler),
        map(([a, b]) => {
          return a + b;
        })
      )
      .subscribe((result) => {
        console.log("result2", result);
      });
    

    这个执行顺序是1+3|2+3|2+4
    但是如果删掉subscribeOn(queueScheduler)
    就会变成
    2+3|2+4
    为什么会导致这样?
    queueScheduler是一个排队函数
    而subscribeOn是指订阅时所用的方式
    为什么订阅可以影响到最后得到的结果?
    我们可以看一下他的源码

    export function subscribeOn<T>(scheduler: SchedulerLike, delay: number = 0): MonoTypeOperatorFunction<T> {
      return operate((source, subscriber) => {
        subscriber.add(scheduler.schedule(() =>{
          source.subscribe(subscriber)
        }, delay));
      });
    }

    这里相当于一个科里化
    subscribeOn函数传入的是调度器函数
    operate函数内传入的是source和subscriber
    source是该operate的父观察者
    subscriber是operate的子订阅者
    然后执行了

        subscriber.add(scheduler.schedule(() =>{
          source.subscribe(subscriber)
        }, delay));

    subscriber.add根据调试时一个null值,内部有一个条件检测,所以add在本次调试无用
    图片.png
    可以直接忽略,那剩下的就是

    scheduler.schedule(() =>{
          source.subscribe(subscriber)
        }, delay)

    到底是不是这个函数起作用,我们可以试一下

    const sourceA$ = scheduled([1, 2], queueScheduler)
    const sourceB$ = scheduled([3, 4], queueScheduler)
    const combine$ = combineLatest([sourceA$, sourceB$]);
    const merge$ = combine$.pipe(
      map(([a, b]) => {
        return a + b;
      })
    );
    const child = new SafeSubscriber(
      (result) => {
        console.log("result", result);
      },
      null,
      null
    );
    
    queueScheduler.schedule(() => {
      merge$.subscribe(child);
    }, 0);

    发现结果没有变
    但是如果把queueScheduler.schedule这些删掉,结果就又错了
    所以关键在于queueScheduler.schedule的调度问题
    我们去看一下这里的源码

    export const queueScheduler = new QueueScheduler(QueueAction);

    可以看到初始化了一个QueueScheduler然后传入了QueueAction

    class AsyncScheduler extends Scheduler {
      constructor(SchedulerAction: typeof Action, now: () => number = Scheduler.now) {
        super(SchedulerAction, now);
      }
    }

    再往上看可以看到我们把QueueAction的构造函数赋值到了AsyncScheduler上的schedulerActionCtor上

    class Scheduler implements SchedulerLike {
      constructor(private schedulerActionCtor: typeof Action, now: () => number = Scheduler.now) {
        this.now = now;
      }
    }

    因为调用的是scheduler.schedule

      public schedule<T>(work: (this: SchedulerAction<T>, state?: T) => void, delay: number = 0, state?: T): Subscription {
        return new this.schedulerActionCtor<T>(this, work).schedule(state, delay);
      }

    发现初始化了schedulerActionCtor并传入this和回调函数work,然后调用schedule传入state和delay
    我们传入的是QueueAction,所以去看QueueAction

    export class QueueAction<T> extends AsyncAction<T> {
      constructor(protected scheduler: QueueScheduler, protected work: (this: SchedulerAction<T>, state?: T) => void) {
        super(scheduler, work);
      }
    
      public schedule(state?: T, delay: number = 0): Subscription {
        if (delay > 0) {
          return super.schedule(state, delay);
        }
        this.delay = delay;
        this.state = state;
        this.scheduler.flush(this);
        return this;
      }
    
      public execute(state: T, delay: number): any {
        return delay > 0 || this.closed ? super.execute(state, delay) : this._execute(state, delay);
      }
    
      protected requestAsyncId(scheduler: QueueScheduler, id?: TimerHandle, delay: number = 0): TimerHandle {
        // If delay exists and is greater than 0, or if the delay is null (the
        // action wasn't rescheduled) but was originally scheduled as an async
        // action, then recycle as an async action.
    
        if ((delay != null && delay > 0) || (delay == null && this.delay > 0)) {
          return super.requestAsyncId(scheduler, id, delay);
        }
    
        // Otherwise flush the scheduler starting with this action.
        scheduler.flush(this);
    
        // HACK: In the past, this was returning `void`. However, `void` isn't a valid
        // `TimerHandle`, and generally the return value here isn't really used. So the
        // compromise is to return `0` which is both "falsy" and a valid `TimerHandle`,
        // as opposed to refactoring every other instanceo of `requestAsyncId`.
        return 0;
      }
    }

    因为调用了他的schedule,可以发现她调用了this.scheduler.flush(this),把自身回传,scheduler则是刚才我们new this.schedulerActionCtor(this, work)传入的this,那再返回去看flush函数

      public flush(action: AsyncAction<any>): void {
        const { actions } = this;
    
        if (this._active) {
          actions.push(action);
          return;
        }
    
        let error: any;
        this._active = true;
    
        do {
          if ((error = action.execute(action.state, action.delay))) {
            break;
          }
        } while ((action = actions.shift()!)); // exhaust the scheduler queue
    
        this._active = false;
    
        if (error) {
          while ((action = actions.shift()!)) {
            action.unsubscribe();
          }
          throw error;
        }
      }

    总结一下,这里每次调用传入一个action任务
    如果正在执行,就投入到数组尾部,如果没有执行,那就直接开始执行完队列
    所以我们的

    scheduler.schedule(() =>{
          source.subscribe(subscriber)
    }, delay)

    正是影响了这里的flush队列
    我们来模拟一下在scheduler.schedule里调用和不在scheduler.schedule调用的情况

    在scheduler.schedule里调用

    首先插入一个事件,负责订阅,订阅会传导到我们可以看一段比较有意思的代码

    const sourceA$ = scheduled([1, 2], queueScheduler);
    const sourceB$ = scheduled([3, 4], queueScheduler);
    const combine$ = combineLatest([sourceA$, sourceB$]);
    combine$
      .pipe(
        subscribeOn(queueScheduler),
        map(([a, b]) => {
          return a + b;
        })
      )
      .subscribe((result) => {
        console.log("result2", result);
      });
    

    这个执行顺序是1+3|2+3|2+4
    但是如果删掉subscribeOn(queueScheduler)
    就会变成
    2+3|2+4
    为什么会导致这样?
    queueScheduler是一个排队函数
    而subscribeOn是指订阅时所用的方式
    为什么订阅可以影响到最后得到的结果?
    我们可以看一下他的源码

    export function subscribeOn<T>(scheduler: SchedulerLike, delay: number = 0): MonoTypeOperatorFunction<T> {
      return operate((source, subscriber) => {
        subscriber.add(scheduler.schedule(() =>{
          source.subscribe(subscriber)
        }, delay));
      });
    }

    这里相当于一个科里化
    subscribeOn函数传入的是调度器函数
    operate函数内传入的是source和subscriber
    source是该operate的父观察者
    subscriber是operate的子订阅者
    然后执行了

        subscriber.add(scheduler.schedule(() =>{
          source.subscribe(subscriber)
        }, delay));

    subscriber.add根据调试时一个null值,内部有一个条件检测,所以add在本次调试无用
    图片.png
    可以直接忽略,那剩下的就是

    scheduler.schedule(() =>{
          source.subscribe(subscriber)
        }, delay)

    到底是不是这个函数起作用,我们可以试一下

    const sourceA$ = scheduled([1, 2], queueScheduler)
    const sourceB$ = scheduled([3, 4], queueScheduler)
    const combine$ = combineLatest([sourceA$, sourceB$]);
    const merge$ = combine$.pipe(
      map(([a, b]) => {
        return a + b;
      })
    );
    const child = new SafeSubscriber(
      (result) => {
        console.log("result", result);
      },
      null,
      null
    );
    
    queueScheduler.schedule(() => {
      merge$.subscribe(child);
    }, 0);

    发现结果没有变
    但是如果把queueScheduler.schedule这些删掉,结果就又错了
    所以关键在于queueScheduler.schedule的调度问题
    我们去看一下这里的源码

    export const queueScheduler = new QueueScheduler(QueueAction);

    可以看到初始化了一个QueueScheduler然后传入了QueueAction

    class AsyncScheduler extends Scheduler {
      constructor(SchedulerAction: typeof Action, now: () => number = Scheduler.now) {
        super(SchedulerAction, now);
      }
    }

    再往上看可以看到我们把QueueAction的构造函数赋值到了AsyncScheduler上的schedulerActionCtor上

    class Scheduler implements SchedulerLike {
      constructor(private schedulerActionCtor: typeof Action, now: () => number = Scheduler.now) {
        this.now = now;
      }
    }

    因为调用的是scheduler.schedule

      public schedule<T>(work: (this: SchedulerAction<T>, state?: T) => void, delay: number = 0, state?: T): Subscription {
        return new this.schedulerActionCtor<T>(this, work).schedule(state, delay);
      }

    发现初始化了schedulerActionCtor并传入this和回调函数work,然后调用schedule传入state和delay
    我们传入的是QueueAction,所以去看QueueAction

    export class QueueAction<T> extends AsyncAction<T> {
      constructor(protected scheduler: QueueScheduler, protected work: (this: SchedulerAction<T>, state?: T) => void) {
        super(scheduler, work);
      }
    
      public schedule(state?: T, delay: number = 0): Subscription {
        if (delay > 0) {
          return super.schedule(state, delay);
        }
        this.delay = delay;
        this.state = state;
        this.scheduler.flush(this);
        return this;
      }
    
      public execute(state: T, delay: number): any {
        return delay > 0 || this.closed ? super.execute(state, delay) : this._execute(state, delay);
      }
    
      protected requestAsyncId(scheduler: QueueScheduler, id?: TimerHandle, delay: number = 0): TimerHandle {
        // If delay exists and is greater than 0, or if the delay is null (the
        // action wasn't rescheduled) but was originally scheduled as an async
        // action, then recycle as an async action.
    
        if ((delay != null && delay > 0) || (delay == null && this.delay > 0)) {
          return super.requestAsyncId(scheduler, id, delay);
        }
    
        // Otherwise flush the scheduler starting with this action.
        scheduler.flush(this);
    
        // HACK: In the past, this was returning `void`. However, `void` isn't a valid
        // `TimerHandle`, and generally the return value here isn't really used. So the
        // compromise is to return `0` which is both "falsy" and a valid `TimerHandle`,
        // as opposed to refactoring every other instanceo of `requestAsyncId`.
        return 0;
      }
    }

    因为调用了他的schedule,可以发现她调用了this.scheduler.flush(this),把自身回传,scheduler则是刚才我们new this.schedulerActionCtor(this, work)传入的this,那再返回去看flush函数

      public flush(action: AsyncAction<any>): void {
        const { actions } = this;
    
        if (this._active) {
          actions.push(action);
          return;
        }
    
        let error: any;
        this._active = true;
    
        do {
          if ((error = action.execute(action.state, action.delay))) {
            break;
          }
        } while ((action = actions.shift()!)); // exhaust the scheduler queue
    
        this._active = false;
    
        if (error) {
          while ((action = actions.shift()!)) {
            action.unsubscribe();
          }
          throw error;
        }
      }

    总结一下,这里每次调用传入一个action任务
    如果正在执行,就投入到数组尾部,如果没有执行,那就直接开始执行完队列
    所以我们的

    scheduler.schedule(() =>{
          source.subscribe(subscriber)
    }, delay)

    正是影响了这里的flush队列
    我们来模拟一下在scheduler.schedule里调用和不在scheduler.schedule调用的情况

    在scheduler.schedule里调用

    首先插入一个事件,负责订阅,订阅会传导到sourceA和sourceB
    因为A和B都使用了scheduled(xxx, queueScheduler)
    所以也会使用queueScheduler来做递归任务
    那么此时会出现任务队列

    订阅事件回调(订阅A以及B)
    sourceA提取数据
    sourceB提取数据

    执行完订阅就会删除,并且按顺序一个一个执行
    因为sourceA提取后还会继续插入队列
    所以接下来的是

    订阅事件回调(删除)
    sourceA提取数据
    sourceB提取数据
    sourceA提取数据
    sourceB提取数据
    空数组

    按照我们所想的顺序执行了~

    不在scheduler.schedule里调用

    直接在主线程进行订阅,而在combineLatest([sourceA$, sourceB$])会按顺序进行订阅,先订阅A

    因为此时不存在action,所以立即执行a的第一个任务
    a执行第一个任务后会立刻插入一个任务
    直到执行完成后,才开始订阅B任务,所以是

    订阅事件回调(订阅A)
    sourceA提取数据
    sourceA提取数据
    订阅事件回调(订阅B)
    sourceB提取数据
    sourceB提取数据
    空数组

    因为combineLatest的特性,两个必须同时存在数据才有返回
    所以A最后的结尾值是2,而B会按顺序触发3,4,自然得到了2+3,2+4两个值
    那么我们就彻底理解了Scheduler理论~

    结语

    撒花~

    混的人。
    ------------------------------------------
    進撃!永遠の帝国の破壊虎---李恒道

    入驻了爱发电https://afdian.net/a/lihengdao666
    个人宣言:この世界で私に胜てる人とコードはまだ生まれていません。死ぬのが怖くなければ来てください。

    发表回复

    本版积分规则

    快速回复 返回顶部 返回列表