前文
Rxjs的资料还是比较匮乏的
最近挺感兴趣的
就没事研究一下
可能存在事实性错误
正文
我们可以看一段比较有意思的代码
const sourceA$ = scheduled([1, 2], queueScheduler);
const sourceB$ = scheduled([3, 4], queueScheduler);
const combine$ = combineLatest([sourceA$, sourceB$]);
combine$
.pipe(
subscribeOn(queueScheduler),
map(([a, b]) => {
return a + b;
})
)
.subscribe((result) => {
console.log("result2", result);
});
这个执行顺序是1+3|2+3|2+4
但是如果删掉subscribeOn(queueScheduler)
就会变成
2+3|2+4
为什么会导致这样?
queueScheduler是一个排队函数
而subscribeOn是指订阅时所用的方式
为什么订阅可以影响到最后得到的结果?
我们可以看一下他的源码
export function subscribeOn<T>(scheduler: SchedulerLike, delay: number = 0): MonoTypeOperatorFunction<T> {
return operate((source, subscriber) => {
subscriber.add(scheduler.schedule(() =>{
source.subscribe(subscriber)
}, delay));
});
}
这里相当于一个科里化
subscribeOn函数传入的是调度器函数
operate函数内传入的是source和subscriber
source是该operate的父观察者
subscriber是operate的子订阅者
然后执行了
subscriber.add(scheduler.schedule(() =>{
source.subscribe(subscriber)
}, delay));
subscriber.add根据调试时一个null值,内部有一个条件检测,所以add在本次调试无用
可以直接忽略,那剩下的就是
scheduler.schedule(() =>{
source.subscribe(subscriber)
}, delay)
到底是不是这个函数起作用,我们可以试一下
const sourceA$ = scheduled([1, 2], queueScheduler)
const sourceB$ = scheduled([3, 4], queueScheduler)
const combine$ = combineLatest([sourceA$, sourceB$]);
const merge$ = combine$.pipe(
map(([a, b]) => {
return a + b;
})
);
const child = new SafeSubscriber(
(result) => {
console.log("result", result);
},
null,
null
);
queueScheduler.schedule(() => {
merge$.subscribe(child);
}, 0);
发现结果没有变
但是如果把queueScheduler.schedule这些删掉,结果就又错了
所以关键在于queueScheduler.schedule的调度问题
我们去看一下这里的源码
export const queueScheduler = new QueueScheduler(QueueAction);
可以看到初始化了一个QueueScheduler然后传入了QueueAction
class AsyncScheduler extends Scheduler {
constructor(SchedulerAction: typeof Action, now: () => number = Scheduler.now) {
super(SchedulerAction, now);
}
}
再往上看可以看到我们把QueueAction的构造函数赋值到了AsyncScheduler上的schedulerActionCtor上
class Scheduler implements SchedulerLike {
constructor(private schedulerActionCtor: typeof Action, now: () => number = Scheduler.now) {
this.now = now;
}
}
因为调用的是scheduler.schedule
public schedule<T>(work: (this: SchedulerAction<T>, state?: T) => void, delay: number = 0, state?: T): Subscription {
return new this.schedulerActionCtor<T>(this, work).schedule(state, delay);
}
发现初始化了schedulerActionCtor并传入this和回调函数work,然后调用schedule传入state和delay
我们传入的是QueueAction,所以去看QueueAction
export class QueueAction<T> extends AsyncAction<T> {
constructor(protected scheduler: QueueScheduler, protected work: (this: SchedulerAction<T>, state?: T) => void) {
super(scheduler, work);
}
public schedule(state?: T, delay: number = 0): Subscription {
if (delay > 0) {
return super.schedule(state, delay);
}
this.delay = delay;
this.state = state;
this.scheduler.flush(this);
return this;
}
public execute(state: T, delay: number): any {
return delay > 0 || this.closed ? super.execute(state, delay) : this._execute(state, delay);
}
protected requestAsyncId(scheduler: QueueScheduler, id?: TimerHandle, delay: number = 0): TimerHandle {
// If delay exists and is greater than 0, or if the delay is null (the
// action wasn't rescheduled) but was originally scheduled as an async
// action, then recycle as an async action.
if ((delay != null && delay > 0) || (delay == null && this.delay > 0)) {
return super.requestAsyncId(scheduler, id, delay);
}
// Otherwise flush the scheduler starting with this action.
scheduler.flush(this);
// HACK: In the past, this was returning `void`. However, `void` isn't a valid
// `TimerHandle`, and generally the return value here isn't really used. So the
// compromise is to return `0` which is both "falsy" and a valid `TimerHandle`,
// as opposed to refactoring every other instanceo of `requestAsyncId`.
return 0;
}
}
因为调用了他的schedule,可以发现她调用了this.scheduler.flush(this),把自身回传,scheduler则是刚才我们new this.schedulerActionCtor(this, work)传入的this,那再返回去看flush函数
public flush(action: AsyncAction<any>): void {
const { actions } = this;
if (this._active) {
actions.push(action);
return;
}
let error: any;
this._active = true;
do {
if ((error = action.execute(action.state, action.delay))) {
break;
}
} while ((action = actions.shift()!)); // exhaust the scheduler queue
this._active = false;
if (error) {
while ((action = actions.shift()!)) {
action.unsubscribe();
}
throw error;
}
}
总结一下,这里每次调用传入一个action任务
如果正在执行,就投入到数组尾部,如果没有执行,那就直接开始执行完队列
所以我们的
scheduler.schedule(() =>{
source.subscribe(subscriber)
}, delay)
正是影响了这里的flush队列
我们来模拟一下在scheduler.schedule里调用和不在scheduler.schedule调用的情况
在scheduler.schedule里调用
首先插入一个事件,负责订阅,订阅会传导到我们可以看一段比较有意思的代码
const sourceA$ = scheduled([1, 2], queueScheduler);
const sourceB$ = scheduled([3, 4], queueScheduler);
const combine$ = combineLatest([sourceA$, sourceB$]);
combine$
.pipe(
subscribeOn(queueScheduler),
map(([a, b]) => {
return a + b;
})
)
.subscribe((result) => {
console.log("result2", result);
});
这个执行顺序是1+3|2+3|2+4
但是如果删掉subscribeOn(queueScheduler)
就会变成
2+3|2+4
为什么会导致这样?
queueScheduler是一个排队函数
而subscribeOn是指订阅时所用的方式
为什么订阅可以影响到最后得到的结果?
我们可以看一下他的源码
export function subscribeOn<T>(scheduler: SchedulerLike, delay: number = 0): MonoTypeOperatorFunction<T> {
return operate((source, subscriber) => {
subscriber.add(scheduler.schedule(() =>{
source.subscribe(subscriber)
}, delay));
});
}
这里相当于一个科里化
subscribeOn函数传入的是调度器函数
operate函数内传入的是source和subscriber
source是该operate的父观察者
subscriber是operate的子订阅者
然后执行了
subscriber.add(scheduler.schedule(() =>{
source.subscribe(subscriber)
}, delay));
subscriber.add根据调试时一个null值,内部有一个条件检测,所以add在本次调试无用
可以直接忽略,那剩下的就是
scheduler.schedule(() =>{
source.subscribe(subscriber)
}, delay)
到底是不是这个函数起作用,我们可以试一下
const sourceA$ = scheduled([1, 2], queueScheduler)
const sourceB$ = scheduled([3, 4], queueScheduler)
const combine$ = combineLatest([sourceA$, sourceB$]);
const merge$ = combine$.pipe(
map(([a, b]) => {
return a + b;
})
);
const child = new SafeSubscriber(
(result) => {
console.log("result", result);
},
null,
null
);
queueScheduler.schedule(() => {
merge$.subscribe(child);
}, 0);
发现结果没有变
但是如果把queueScheduler.schedule这些删掉,结果就又错了
所以关键在于queueScheduler.schedule的调度问题
我们去看一下这里的源码
export const queueScheduler = new QueueScheduler(QueueAction);
可以看到初始化了一个QueueScheduler然后传入了QueueAction
class AsyncScheduler extends Scheduler {
constructor(SchedulerAction: typeof Action, now: () => number = Scheduler.now) {
super(SchedulerAction, now);
}
}
再往上看可以看到我们把QueueAction的构造函数赋值到了AsyncScheduler上的schedulerActionCtor上
class Scheduler implements SchedulerLike {
constructor(private schedulerActionCtor: typeof Action, now: () => number = Scheduler.now) {
this.now = now;
}
}
因为调用的是scheduler.schedule
public schedule<T>(work: (this: SchedulerAction<T>, state?: T) => void, delay: number = 0, state?: T): Subscription {
return new this.schedulerActionCtor<T>(this, work).schedule(state, delay);
}
发现初始化了schedulerActionCtor并传入this和回调函数work,然后调用schedule传入state和delay
我们传入的是QueueAction,所以去看QueueAction
export class QueueAction<T> extends AsyncAction<T> {
constructor(protected scheduler: QueueScheduler, protected work: (this: SchedulerAction<T>, state?: T) => void) {
super(scheduler, work);
}
public schedule(state?: T, delay: number = 0): Subscription {
if (delay > 0) {
return super.schedule(state, delay);
}
this.delay = delay;
this.state = state;
this.scheduler.flush(this);
return this;
}
public execute(state: T, delay: number): any {
return delay > 0 || this.closed ? super.execute(state, delay) : this._execute(state, delay);
}
protected requestAsyncId(scheduler: QueueScheduler, id?: TimerHandle, delay: number = 0): TimerHandle {
// If delay exists and is greater than 0, or if the delay is null (the
// action wasn't rescheduled) but was originally scheduled as an async
// action, then recycle as an async action.
if ((delay != null && delay > 0) || (delay == null && this.delay > 0)) {
return super.requestAsyncId(scheduler, id, delay);
}
// Otherwise flush the scheduler starting with this action.
scheduler.flush(this);
// HACK: In the past, this was returning `void`. However, `void` isn't a valid
// `TimerHandle`, and generally the return value here isn't really used. So the
// compromise is to return `0` which is both "falsy" and a valid `TimerHandle`,
// as opposed to refactoring every other instanceo of `requestAsyncId`.
return 0;
}
}
因为调用了他的schedule,可以发现她调用了this.scheduler.flush(this),把自身回传,scheduler则是刚才我们new this.schedulerActionCtor(this, work)传入的this,那再返回去看flush函数
public flush(action: AsyncAction<any>): void {
const { actions } = this;
if (this._active) {
actions.push(action);
return;
}
let error: any;
this._active = true;
do {
if ((error = action.execute(action.state, action.delay))) {
break;
}
} while ((action = actions.shift()!)); // exhaust the scheduler queue
this._active = false;
if (error) {
while ((action = actions.shift()!)) {
action.unsubscribe();
}
throw error;
}
}
总结一下,这里每次调用传入一个action任务
如果正在执行,就投入到数组尾部,如果没有执行,那就直接开始执行完队列
所以我们的
scheduler.schedule(() =>{
source.subscribe(subscriber)
}, delay)
正是影响了这里的flush队列
我们来模拟一下在scheduler.schedule里调用和不在scheduler.schedule调用的情况
在scheduler.schedule里调用
首先插入一个事件,负责订阅,订阅会传导到sourceA和sourceB
因为A和B都使用了scheduled(xxx, queueScheduler)
所以也会使用queueScheduler来做递归任务
那么此时会出现任务队列
订阅事件回调(订阅A以及B)
sourceA提取数据
sourceB提取数据
执行完订阅就会删除,并且按顺序一个一个执行
因为sourceA提取后还会继续插入队列
所以接下来的是
订阅事件回调(删除)
sourceA提取数据
sourceB提取数据
sourceA提取数据
sourceB提取数据
空数组
按照我们所想的顺序执行了~
不在scheduler.schedule里调用
直接在主线程进行订阅,而在combineLatest([sourceA$, sourceB$])会按顺序进行订阅,先订阅A
因为此时不存在action,所以立即执行a的第一个任务
a执行第一个任务后会立刻插入一个任务
直到执行完成后,才开始订阅B任务,所以是
订阅事件回调(订阅A)
sourceA提取数据
sourceA提取数据
订阅事件回调(订阅B)
sourceB提取数据
sourceB提取数据
空数组
因为combineLatest的特性,两个必须同时存在数据才有返回
所以A最后的结尾值是2,而B会按顺序触发3,4,自然得到了2+3,2+4两个值
那么我们就彻底理解了Scheduler理论~
结语
撒花~