李恒道 发表于 2023-1-21 06:48:27

Rxjs concatMap源码解读

concatMap 在前一个内部observable 完成后才会订阅下一个
这次我们来研究一下他的源码
```js
export function concatMap<T, R, O extends ObservableInput<any>>(
project: (value: T, index: number) => O,
resultSelector?: (outerValue: T, innerValue: ObservedValueOf<O>, outerIndex: number, innerIndex: number) => R
): OperatorFunction<T, ObservedValueOf<O> | R> {
return isFunction(resultSelector) ? mergeMap(project, resultSelector, 1) : mergeMap(project, 1);
}
```
一般来说不传第二个
我们直接去看mergeMap(project, 1)
```js
export function mergeMap<T, R, O extends ObservableInput<any>>(
project: (value: T, index: number) => O,
resultSelector?: ((outerValue: T, innerValue: ObservedValueOf<O>, outerIndex: number, innerIndex: number) => R) | number,
concurrent: number = Infinity
): OperatorFunction<T, ObservedValueOf<O> | R> {
if (isFunction(resultSelector)) {
    // DEPRECATED PATH
    return mergeMap((a, i) => map((b: any, ii: number) => resultSelector(a, b, i, ii))(innerFrom(project(a, i))), concurrent);
} else if (typeof resultSelector === 'number') {
    concurrent = resultSelector;
}

return operate((source, subscriber) => mergeInternals(source, subscriber, project, concurrent));
}

```
这里堆resultSelector做了一次挪位,把数据赋值到了concurrent
然后调用operate((source, subscriber) => mergeInternals(source, subscriber, project, concurrent))
我们继续去看mergeInternals(source, subscriber, project, concurrent))
先贴源码再分析
```js
export function mergeInternals<T, R>(
source: Observable<T>,
subscriber: Subscriber<R>,
project: (value: T, index: number) => ObservableInput<R>,
concurrent: number,
onBeforeNext?: (innerValue: R) => void,
expand?: boolean,
innerSubScheduler?: SchedulerLike,
additionalFinalizer?: () => void
) {
// Buffered values, in the event of going over our concurrency limit
const buffer: T[] = [];
// The number of active inner subscriptions.
let active = 0;
// An index to pass to our accumulator function
let index = 0;
// Whether or not the outer source has completed.
let isComplete = false;

/**
   * Checks to see if we can complete our result or not.
   */
const checkComplete = () => {
    // If the outer has completed, and nothing is left in the buffer,
    // and we don't have any active inner subscriptions, then we can
    // Emit the state and complete.
    if (isComplete && !buffer.length && !active) {
      subscriber.complete();
    }
};

// If we're under our concurrency limit, just start the inner subscription, otherwise buffer and wait.
const outerNext = (value: T) => (active < concurrent ? doInnerSub(value) : buffer.push(value));

const doInnerSub = (value: T) => {
    // If we're expanding, we need to emit the outer values and the inner values
    // as the inners will "become outers" in a way as they are recursively fed
    // back to the projection mechanism.
    expand && subscriber.next(value as any);

    // Increment the number of active subscriptions so we can track it
    // against our concurrency limit later.
    active++;

    // A flag used to show that the inner observable completed.
    // This is checked during finalization to see if we should
    // move to the next item in the buffer, if there is on.
    let innerComplete = false;

    // Start our inner subscription.
    innerFrom(project(value, index++)).subscribe(
      createOperatorSubscriber(
      subscriber,
      (innerValue) => {
          // `mergeScan` has additional handling here. For example
          // taking the inner value and updating state.
          onBeforeNext?.(innerValue);

          if (expand) {
            // If we're expanding, then just recurse back to our outer
            // handler. It will emit the value first thing.
            outerNext(innerValue as any);
          } else {
            // Otherwise, emit the inner value.
            subscriber.next(innerValue);
          }
      },
      () => {
          // Flag that we have completed, so we know to check the buffer
          // during finalization.
          innerComplete = true;
      },
      // Errors are passed to the destination.
      undefined,
      () => {
          // During finalization, if the inner completed (it wasn't errored or
          // cancelled), then we want to try the next item in the buffer if
          // there is one.
          if (innerComplete) {
            // We have to wrap this in a try/catch because it happens during
            // finalization, possibly asynchronously, and we want to pass
            // any errors that happen (like in a projection function) to
            // the outer Subscriber.
            try {
            // INNER SOURCE COMPLETE
            // Decrement the active count to ensure that the next time
            // we try to call `doInnerSub`, the number is accurate.
            active--;
            // If we have more values in the buffer, try to process those
            // Note that this call will increment `active` ahead of the
            // next conditional, if there were any more inner subscriptions
            // to start.
            while (buffer.length && active < concurrent) {
                const bufferedValue = buffer.shift()!;
                // Particularly for `expand`, we need to check to see if a scheduler was provided
                // for when we want to start our inner subscription. Otherwise, we just start
                // are next inner subscription.
                if (innerSubScheduler) {
                  executeSchedule(subscriber, innerSubScheduler, () => doInnerSub(bufferedValue));
                } else {
                  doInnerSub(bufferedValue);
                }
            }
            // Check to see if we can complete, and complete if so.
            checkComplete();
            } catch (err) {
            subscriber.error(err);
            }
          }
      }
      )
    );
};

// Subscribe to our source observable.
source.subscribe(
    createOperatorSubscriber(subscriber, outerNext, () => {
      // Outer completed, make a note of it, and check to see if we can complete everything.
      isComplete = true;
      checkComplete();
    })
);

// Additional finalization (for when the destination is torn down).
// Other finalization is added implicitly via subscription above.
return () => {
    additionalFinalizer?.();
};
}
```
首先声明了一些初始化变量和函数,然后调用
```js
// Subscribe to our source observable.
source.subscribe(
    createOperatorSubscriber(subscriber, outerNext, () => {
      // Outer completed, make a note of it, and check to see if we can complete everything.
      isComplete = true;
      checkComplete();
    })
);
```
这里是对父订阅,参数是子订阅者和next函数,以及complete函数
如果父触发了就会回调outerNext,我们去看看源码
```js
const outerNext = (value: T) => (active < concurrent ? doInnerSub(value) : buffer.push(value));
```
这里是一个事件判断,是否活跃事件小于最大事件,如果是则调用doInnerSub,不是则缓存一下
我们直接去看一下doInnerSub
```js
const doInnerSub = (value: T) => {
    // If we're expanding, we need to emit the outer values and the inner values
    // as the inners will "become outers" in a way as they are recursively fed
    // back to the projection mechanism.
    expand && subscriber.next(value as any);

    // Increment the number of active subscriptions so we can track it
    // against our concurrency limit later.
    active++;

    // A flag used to show that the inner observable completed.
    // This is checked during finalization to see if we should
    // move to the next item in the buffer, if there is on.
    let innerComplete = false;

    // Start our inner subscription.
    innerFrom(project(value, index++)).subscribe(
      createOperatorSubscriber(
      subscriber,
      (innerValue) => {
          // `mergeScan` has additional handling here. For example
          // taking the inner value and updating state.
          onBeforeNext?.(innerValue);

          if (expand) {
            // If we're expanding, then just recurse back to our outer
            // handler. It will emit the value first thing.
            outerNext(innerValue as any);
          } else {
            // Otherwise, emit the inner value.
            subscriber.next(innerValue);
          }
      },
      () => {
          // Flag that we have completed, so we know to check the buffer
          // during finalization.
          innerComplete = true;
      },
      // Errors are passed to the destination.
      undefined,
      () => {
          // During finalization, if the inner completed (it wasn't errored or
          // cancelled), then we want to try the next item in the buffer if
          // there is one.
          if (innerComplete) {
            // We have to wrap this in a try/catch because it happens during
            // finalization, possibly asynchronously, and we want to pass
            // any errors that happen (like in a projection function) to
            // the outer Subscriber.
            try {
            // INNER SOURCE COMPLETE
            // Decrement the active count to ensure that the next time
            // we try to call `doInnerSub`, the number is accurate.
            active--;
            // If we have more values in the buffer, try to process those
            // Note that this call will increment `active` ahead of the
            // next conditional, if there were any more inner subscriptions
            // to start.
            while (buffer.length && active < concurrent) {
                const bufferedValue = buffer.shift()!;
                // Particularly for `expand`, we need to check to see if a scheduler was provided
                // for when we want to start our inner subscription. Otherwise, we just start
                // are next inner subscription.
                if (innerSubScheduler) {
                  executeSchedule(subscriber, innerSubScheduler, () => doInnerSub(bufferedValue));
                } else {
                  doInnerSub(bufferedValue);
                }
            }
            // Check to see if we can complete, and complete if so.
            checkComplete();
            } catch (err) {
            subscriber.error(err);
            }
          }
      }
      )
    );
};
```
expand忽略,不属于主流程
active是活跃事件数
innerComplete内部未完成标识
innerFrom(project(value, index++))最后返回了我们自定义函数返回的一个观察者
createOperatorSubscriber内的则较为简单,只是一个函数封装
```js
export function createOperatorSubscriber<T>(
destination: Subscriber<any>,
onNext?: (value: T) => void,
onComplete?: () => void,
onError?: (err: any) => void,
onFinalize?: () => void
): Subscriber<T> {
return new OperatorSubscriber(destination, onNext, onComplete, onError, onFinalize);
}
```
如果我们返回的观察者调用了next则会走到subscriber.next(innerValue);中
```js
      (innerValue) => {
          // `mergeScan` has additional handling here. For example
          // taking the inner value and updating state.
          onBeforeNext?.(innerValue);

          if (expand) {
            // If we're expanding, then just recurse back to our outer
            // handler. It will emit the value first thing.
            outerNext(innerValue as any);
          } else {
            // Otherwise, emit the inner value.
            subscriber.next(innerValue);
          }
      }
```
当观察者流结束则会调用createOperatorSubscriber内部的onComplete函数
```js
function (this: OperatorSubscriber<T>) {
          try {
            onComplete();
          } catch (err) {
            // Send any errors that occur down stream.
            destination.error(err);
          } finally {
            // Ensure finalization.
            this.unsubscribe();
          }
      }
```
发现调用了我们自身的onComplete函数也就是
```js
      () => {
          // Flag that we have completed, so we know to check the buffer
          // during finalization.
          innerComplete = true;
      },
```
然后调用了this.unsubscribe();也就是
```js
unsubscribe() {
    if (!this.shouldUnsubscribe || this.shouldUnsubscribe()) {
      const { closed } = this;
      super.unsubscribe();
      // Execute additional teardown if we have any and we didn't already do so.
      !closed && this.onFinalize?.();
    }
}
```
因为shouldUnsubscribe 没传,所以一定会调用到 this.onFinalize
就会走到
```js
      () => {
          // During finalization, if the inner completed (it wasn't errored or
          // cancelled), then we want to try the next item in the buffer if
          // there is one.
          if (innerComplete) {
            // We have to wrap this in a try/catch because it happens during
            // finalization, possibly asynchronously, and we want to pass
            // any errors that happen (like in a projection function) to
            // the outer Subscriber.
            try {
            // INNER SOURCE COMPLETE
            // Decrement the active count to ensure that the next time
            // we try to call `doInnerSub`, the number is accurate.
            active--;
            // If we have more values in the buffer, try to process those
            // Note that this call will increment `active` ahead of the
            // next conditional, if there were any more inner subscriptions
            // to start.
            while (buffer.length && active < concurrent) {
                const bufferedValue = buffer.shift()!;
                // Particularly for `expand`, we need to check to see if a scheduler was provided
                // for when we want to start our inner subscription. Otherwise, we just start
                // are next inner subscription.
                if (innerSubScheduler) {
                  executeSchedule(subscriber, innerSubScheduler, () => doInnerSub(bufferedValue));
                } else {
                  doInnerSub(bufferedValue);
                }
            }
            // Check to see if we can complete, and complete if so.
            checkComplete();
            } catch (err) {
            subscriber.error(err);
            }
          }
      }
```
判断是否已完成,然后递归buffer,如果存在buffer就开始下一轮,然后最后进行checkComplete收尾工作
```
const checkComplete = () => {
    // If the outer has completed, and nothing is left in the buffer,
    // and we don't have any active inner subscriptions, then we can
    // Emit the state and complete.
    if (isComplete && !buffer.length && !active) {
      subscriber.complete();
    }
};
```
这里进行了活跃判断,buffer缓存长度以及是否订阅了父观察者,都符合则调用完成
注意这个函数还有其他调用
在对父观察者进行调用的时候也进行了判断isComplete为true以及检查checkComplete
核心思路就是必须等待父关闭,以及所有的产生的流也都关闭。
```
// Subscribe to our source observable.
source.subscribe(
    createOperatorSubscriber(subscriber, outerNext, () => {
      // Outer completed, make a note of it, and check to see if we can complete everything.
      isComplete = true;
      checkComplete();
    })
);
```
那到这里我们就阅读了concatMap的源码啦~
# 结语
撒花~
页: [1]
查看完整版本: Rxjs concatMap源码解读