上一主题 下一主题
ScriptCat,新一代的脚本管理器脚本站,与全世界分享你的用户脚本油猴脚本开发指南教程目录
返回列表 发新帖

Rxjs concatMap源码解读

[复制链接]
  • TA的每日心情
    开心
    昨天 23:59
  • 签到天数: 170 天

    [LV.7]常住居民III

    470

    主题

    4281

    帖子

    4322

    积分

    管理员

    非物质文化遗产社会摇传承人

    Rank: 10Rank: 10Rank: 10

    积分
    4322

    喜迎中秋国庆纪念章荣誉开发者管理员油中2周年

    发表于 2023-1-21 06:48:27 | 显示全部楼层 | 阅读模式

    concatMap 在前一个内部observable 完成后才会订阅下一个
    这次我们来研究一下他的源码

    export function concatMap<T, R, O extends ObservableInput<any>>(
      project: (value: T, index: number) => O,
      resultSelector?: (outerValue: T, innerValue: ObservedValueOf<O>, outerIndex: number, innerIndex: number) => R
    ): OperatorFunction<T, ObservedValueOf<O> | R> {
      return isFunction(resultSelector) ? mergeMap(project, resultSelector, 1) : mergeMap(project, 1);
    }

    一般来说不传第二个
    我们直接去看mergeMap(project, 1)

    export function mergeMap<T, R, O extends ObservableInput<any>>(
      project: (value: T, index: number) => O,
      resultSelector?: ((outerValue: T, innerValue: ObservedValueOf<O>, outerIndex: number, innerIndex: number) => R) | number,
      concurrent: number = Infinity
    ): OperatorFunction<T, ObservedValueOf<O> | R> {
      if (isFunction(resultSelector)) {
        // DEPRECATED PATH
        return mergeMap((a, i) => map((b: any, ii: number) => resultSelector(a, b, i, ii))(innerFrom(project(a, i))), concurrent);
      } else if (typeof resultSelector === 'number') {
        concurrent = resultSelector;
      }
    
      return operate((source, subscriber) => mergeInternals(source, subscriber, project, concurrent));
    }
    

    这里堆resultSelector做了一次挪位,把数据赋值到了concurrent
    然后调用operate((source, subscriber) => mergeInternals(source, subscriber, project, concurrent))
    我们继续去看mergeInternals(source, subscriber, project, concurrent))
    先贴源码再分析

    export function mergeInternals<T, R>(
      source: Observable<T>,
      subscriber: Subscriber<R>,
      project: (value: T, index: number) => ObservableInput<R>,
      concurrent: number,
      onBeforeNext?: (innerValue: R) => void,
      expand?: boolean,
      innerSubScheduler?: SchedulerLike,
      additionalFinalizer?: () => void
    ) {
      // Buffered values, in the event of going over our concurrency limit
      const buffer: T[] = [];
      // The number of active inner subscriptions.
      let active = 0;
      // An index to pass to our accumulator function
      let index = 0;
      // Whether or not the outer source has completed.
      let isComplete = false;
    
      /**
       * Checks to see if we can complete our result or not.
       */
      const checkComplete = () => {
        // If the outer has completed, and nothing is left in the buffer,
        // and we don't have any active inner subscriptions, then we can
        // Emit the state and complete.
        if (isComplete && !buffer.length && !active) {
          subscriber.complete();
        }
      };
    
      // If we're under our concurrency limit, just start the inner subscription, otherwise buffer and wait.
      const outerNext = (value: T) => (active < concurrent ? doInnerSub(value) : buffer.push(value));
    
      const doInnerSub = (value: T) => {
        // If we're expanding, we need to emit the outer values and the inner values
        // as the inners will "become outers" in a way as they are recursively fed
        // back to the projection mechanism.
        expand && subscriber.next(value as any);
    
        // Increment the number of active subscriptions so we can track it
        // against our concurrency limit later.
        active++;
    
        // A flag used to show that the inner observable completed.
        // This is checked during finalization to see if we should
        // move to the next item in the buffer, if there is on.
        let innerComplete = false;
    
        // Start our inner subscription.
        innerFrom(project(value, index++)).subscribe(
          createOperatorSubscriber(
            subscriber,
            (innerValue) => {
              // `mergeScan` has additional handling here. For example
              // taking the inner value and updating state.
              onBeforeNext?.(innerValue);
    
              if (expand) {
                // If we're expanding, then just recurse back to our outer
                // handler. It will emit the value first thing.
                outerNext(innerValue as any);
              } else {
                // Otherwise, emit the inner value.
                subscriber.next(innerValue);
              }
            },
            () => {
              // Flag that we have completed, so we know to check the buffer
              // during finalization.
              innerComplete = true;
            },
            // Errors are passed to the destination.
            undefined,
            () => {
              // During finalization, if the inner completed (it wasn't errored or
              // cancelled), then we want to try the next item in the buffer if
              // there is one.
              if (innerComplete) {
                // We have to wrap this in a try/catch because it happens during
                // finalization, possibly asynchronously, and we want to pass
                // any errors that happen (like in a projection function) to
                // the outer Subscriber.
                try {
                  // INNER SOURCE COMPLETE
                  // Decrement the active count to ensure that the next time
                  // we try to call `doInnerSub`, the number is accurate.
                  active--;
                  // If we have more values in the buffer, try to process those
                  // Note that this call will increment `active` ahead of the
                  // next conditional, if there were any more inner subscriptions
                  // to start.
                  while (buffer.length && active < concurrent) {
                    const bufferedValue = buffer.shift()!;
                    // Particularly for `expand`, we need to check to see if a scheduler was provided
                    // for when we want to start our inner subscription. Otherwise, we just start
                    // are next inner subscription.
                    if (innerSubScheduler) {
                      executeSchedule(subscriber, innerSubScheduler, () => doInnerSub(bufferedValue));
                    } else {
                      doInnerSub(bufferedValue);
                    }
                  }
                  // Check to see if we can complete, and complete if so.
                  checkComplete();
                } catch (err) {
                  subscriber.error(err);
                }
              }
            }
          )
        );
      };
    
      // Subscribe to our source observable.
      source.subscribe(
        createOperatorSubscriber(subscriber, outerNext, () => {
          // Outer completed, make a note of it, and check to see if we can complete everything.
          isComplete = true;
          checkComplete();
        })
      );
    
      // Additional finalization (for when the destination is torn down).
      // Other finalization is added implicitly via subscription above.
      return () => {
        additionalFinalizer?.();
      };
    }

    首先声明了一些初始化变量和函数,然后调用

      // Subscribe to our source observable.
      source.subscribe(
        createOperatorSubscriber(subscriber, outerNext, () => {
          // Outer completed, make a note of it, and check to see if we can complete everything.
          isComplete = true;
          checkComplete();
        })
      );

    这里是对父订阅,参数是子订阅者和next函数,以及complete函数
    如果父触发了就会回调outerNext,我们去看看源码

      const outerNext = (value: T) => (active < concurrent ? doInnerSub(value) : buffer.push(value));

    这里是一个事件判断,是否活跃事件小于最大事件,如果是则调用doInnerSub,不是则缓存一下
    我们直接去看一下doInnerSub

      const doInnerSub = (value: T) => {
        // If we're expanding, we need to emit the outer values and the inner values
        // as the inners will "become outers" in a way as they are recursively fed
        // back to the projection mechanism.
        expand && subscriber.next(value as any);
    
        // Increment the number of active subscriptions so we can track it
        // against our concurrency limit later.
        active++;
    
        // A flag used to show that the inner observable completed.
        // This is checked during finalization to see if we should
        // move to the next item in the buffer, if there is on.
        let innerComplete = false;
    
        // Start our inner subscription.
        innerFrom(project(value, index++)).subscribe(
          createOperatorSubscriber(
            subscriber,
            (innerValue) => {
              // `mergeScan` has additional handling here. For example
              // taking the inner value and updating state.
              onBeforeNext?.(innerValue);
    
              if (expand) {
                // If we're expanding, then just recurse back to our outer
                // handler. It will emit the value first thing.
                outerNext(innerValue as any);
              } else {
                // Otherwise, emit the inner value.
                subscriber.next(innerValue);
              }
            },
            () => {
              // Flag that we have completed, so we know to check the buffer
              // during finalization.
              innerComplete = true;
            },
            // Errors are passed to the destination.
            undefined,
            () => {
              // During finalization, if the inner completed (it wasn't errored or
              // cancelled), then we want to try the next item in the buffer if
              // there is one.
              if (innerComplete) {
                // We have to wrap this in a try/catch because it happens during
                // finalization, possibly asynchronously, and we want to pass
                // any errors that happen (like in a projection function) to
                // the outer Subscriber.
                try {
                  // INNER SOURCE COMPLETE
                  // Decrement the active count to ensure that the next time
                  // we try to call `doInnerSub`, the number is accurate.
                  active--;
                  // If we have more values in the buffer, try to process those
                  // Note that this call will increment `active` ahead of the
                  // next conditional, if there were any more inner subscriptions
                  // to start.
                  while (buffer.length && active < concurrent) {
                    const bufferedValue = buffer.shift()!;
                    // Particularly for `expand`, we need to check to see if a scheduler was provided
                    // for when we want to start our inner subscription. Otherwise, we just start
                    // are next inner subscription.
                    if (innerSubScheduler) {
                      executeSchedule(subscriber, innerSubScheduler, () => doInnerSub(bufferedValue));
                    } else {
                      doInnerSub(bufferedValue);
                    }
                  }
                  // Check to see if we can complete, and complete if so.
                  checkComplete();
                } catch (err) {
                  subscriber.error(err);
                }
              }
            }
          )
        );
      };

    expand忽略,不属于主流程
    active是活跃事件数
    innerComplete内部未完成标识
    innerFrom(project(value, index++))最后返回了我们自定义函数返回的一个观察者
    createOperatorSubscriber内的则较为简单,只是一个函数封装

    export function createOperatorSubscriber<T>(
      destination: Subscriber<any>,
      onNext?: (value: T) => void,
      onComplete?: () => void,
      onError?: (err: any) => void,
      onFinalize?: () => void
    ): Subscriber<T> {
      return new OperatorSubscriber(destination, onNext, onComplete, onError, onFinalize);
    }

    如果我们返回的观察者调用了next则会走到subscriber.next(innerValue);中

            (innerValue) => {
              // `mergeScan` has additional handling here. For example
              // taking the inner value and updating state.
              onBeforeNext?.(innerValue);
    
              if (expand) {
                // If we're expanding, then just recurse back to our outer
                // handler. It will emit the value first thing.
                outerNext(innerValue as any);
              } else {
                // Otherwise, emit the inner value.
                subscriber.next(innerValue);
              }
            }

    当观察者流结束则会调用createOperatorSubscriber内部的onComplete函数

    function (this: OperatorSubscriber<T>) {
              try {
                onComplete();
              } catch (err) {
                // Send any errors that occur down stream.
                destination.error(err);
              } finally {
                // Ensure finalization.
                this.unsubscribe();
              }
            }

    发现调用了我们自身的onComplete函数也就是

            () => {
              // Flag that we have completed, so we know to check the buffer
              // during finalization.
              innerComplete = true;
            },

    然后调用了this.unsubscribe();也就是

      unsubscribe() {
        if (!this.shouldUnsubscribe || this.shouldUnsubscribe()) {
          const { closed } = this;
          super.unsubscribe();
          // Execute additional teardown if we have any and we didn't already do so.
          !closed && this.onFinalize?.();
        }
      }

    因为shouldUnsubscribe 没传,所以一定会调用到 this.onFinalize
    就会走到

            () => {
              // During finalization, if the inner completed (it wasn't errored or
              // cancelled), then we want to try the next item in the buffer if
              // there is one.
              if (innerComplete) {
                // We have to wrap this in a try/catch because it happens during
                // finalization, possibly asynchronously, and we want to pass
                // any errors that happen (like in a projection function) to
                // the outer Subscriber.
                try {
                  // INNER SOURCE COMPLETE
                  // Decrement the active count to ensure that the next time
                  // we try to call `doInnerSub`, the number is accurate.
                  active--;
                  // If we have more values in the buffer, try to process those
                  // Note that this call will increment `active` ahead of the
                  // next conditional, if there were any more inner subscriptions
                  // to start.
                  while (buffer.length && active < concurrent) {
                    const bufferedValue = buffer.shift()!;
                    // Particularly for `expand`, we need to check to see if a scheduler was provided
                    // for when we want to start our inner subscription. Otherwise, we just start
                    // are next inner subscription.
                    if (innerSubScheduler) {
                      executeSchedule(subscriber, innerSubScheduler, () => doInnerSub(bufferedValue));
                    } else {
                      doInnerSub(bufferedValue);
                    }
                  }
                  // Check to see if we can complete, and complete if so.
                  checkComplete();
                } catch (err) {
                  subscriber.error(err);
                }
              }
            }

    判断是否已完成,然后递归buffer,如果存在buffer就开始下一轮,然后最后进行checkComplete收尾工作

      const checkComplete = () => {
        // If the outer has completed, and nothing is left in the buffer,
        // and we don't have any active inner subscriptions, then we can
        // Emit the state and complete.
        if (isComplete && !buffer.length && !active) {
          subscriber.complete();
        }
      };

    这里进行了活跃判断,buffer缓存长度以及是否订阅了父观察者,都符合则调用完成
    注意这个函数还有其他调用
    在对父观察者进行调用的时候也进行了判断isComplete为true以及检查checkComplete
    核心思路就是必须等待父关闭,以及所有的产生的流也都关闭。

      // Subscribe to our source observable.
      source.subscribe(
        createOperatorSubscriber(subscriber, outerNext, () => {
          // Outer completed, make a note of it, and check to see if we can complete everything.
          isComplete = true;
          checkComplete();
        })
      );

    那到这里我们就阅读了concatMap的源码啦~

    结语

    撒花~

    混的人。
    ------------------------------------------
    進撃!永遠の帝国の破壊虎---李恒道
    个人宣言:この世界で私に胜てる人とコードはまだ生まれていません。死ぬのが怖くなければ来てください。

    发表回复

    本版积分规则

    快速回复 返回顶部 返回列表