redux-saga源码解析

Redux-saga是redux应用的又一个副作用模型。可以用来替换redux-thunk中间件。
redux-saga 抽象出 Effect (影响, 例如等待action、发出action、fetch数据等等),便于组合与测试。

我想在分析redux-saga之前,先来看看redux-thunk是怎么一回事
redux 在我之前一篇文章中讲过了链接
那我们就先用 redux-thunk 来写一个 asyncTodo 的demo

redux-thunk 分析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import { createStore, applyMiddleware } from 'redux';
const thunk = ({ dispatch, getState }) => next => action => {
if (typeof action === 'function') {
return action(dispatch, getState);
}

return next(action);
}

const logger = ({ getState }) => next => action => {
console.log('will dispatch', getState());
next(action)
console.log('state after dispatch', getState());
}

const todos = (state = [], action) => {
switch (action.type) {
case 'ADD_TODO':
return [
...state,
action.text
];
default:
return state
}
}

const store = createStore(
todos,
['Use Redux'],
applyMiddleware(logger, thunk),
);

store.dispatch(dispatch => {
setTimeout(() => {
dispatch({ type: 'ADD_TODO', text: 'Read the docs' });
}, 1000);
});

原本redux中action只能是 plain object ,redux-thunk使action可以为function。当我们想抛出一个异步的action时,其实我们是把异步的处理放在了actionCreator中。
这样就会导致action形式不统一,并且对于异步的处理将会分散到各个action中,不利于维护。
接下来看看redux-saga是如何实现的

redux-saga

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import { createStore, applyMiddleware } from 'redux';
import createSagaMiddleware from 'redux-saga';
import { put, take, fork, delay } from './redux-saga/effects'
import { delay as delayUtil } from 'redux-saga/utils';

// 获取redux中间件
const sagaMiddleware = createSagaMiddleware({
sagaMonitor: {
// 打印 effect 便于分析redux-saga行为
effectTriggered(options) {
console.log(options);
}
}
})

function* rootSaga() {
const action = yield take('ADD_TODO_SAGA');
// delay(): { type: 'call', payload: { args: [1000], fn }}
yield delay(1000); // or yield call(delayUtil, 1000)

// put(): { type: 'PUT', payload: { action: {}, channel: null }}
yield put({ type: 'ADD_TODO', text: action.text });
}

const store = createStore(
todos,
['Use Redux'],
applyMiddleware(logger, sagaMiddleware),
);

// 启动saga
sagaMiddleware.run(rootSaga);

store.dispatch({ type: 'ADD_TODO_SAGA', text: 'Use Redux-saga' });

可以看到这里抛出的就是一个纯action, saga在启动之后监听 ADD_TODO_SAGA 事件,若事件发生执行后续代码。

源码

stdChannel

在开始createSagaMiddleware之前,先来了解一下 channel
redux-saga 通过 channel 接收与发出action与外部进行数据交换
在redux-saga中有三种 channel,分别是channel、eventChannel、multicastChannel;
在此我们仅仅分析一下用的最多的 multicastChannel

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
export function multicastChannel() {
let closed = false
// 这里taker分为的currentTakers、 nextTakers的原因和redux subscribe类似,防止在遍历taker时,taker发生变化。
let currentTakers = []
let nextTakers = currentTakers

const ensureCanMutateNextTakers = () => {
if (nextTakers !== currentTakers) {
return
}
nextTakers = currentTakers.slice()
}

const close = () => {
closed = true
const takers = (currentTakers = nextTakers)

for (let i = 0; i < takers.length; i++) {
const taker = takers[i]
taker(END)
}

nextTakers = []
}

return {
[MULTICAST]: true,
put(input) {

if (closed) {
return
}

if (isEnd(input)) {
close()
return
}

const takers = (currentTakers = nextTakers)
// 遍历takers,找到与input匹配的taker并执行它。
for (let i = 0; i < takers.length; i++) {
const taker = takers[i]
if (taker[MATCH](input)) {
taker.cancel()
taker(input)
}
}
},
// 存下callback,与配置函数
take(cb, matcher = matchers.wildcard) {
if (closed) {
cb(END)
return
}
cb[MATCH] = matcher
ensureCanMutateNextTakers()
nextTakers.push(cb)

cb.cancel = once(() => {
ensureCanMutateNextTakers()
remove(nextTakers, cb)
})
},
close,
}
}

export function stdChannel() {
const chan = multicastChannel()
const { put } = chan
chan.put = input => {
if (input[SAGA_ACTION]) {
put(input)
return
}
// 暂时不用管
asap(() => put(input))
}
return chan
}

createSagaMiddleware

获取redux-middleware, 同时初始化runsaga函数,为后面启动saga bind 所需的参数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
export default function sagaMiddlewareFactory({ context = {}, ...options } = {}) {
const { sagaMonitor, logger, onError, effectMiddlewares } = options
let boundRunSaga

// redux middleware
function sagaMiddleware({ getState, dispatch }) {
// 新建一个channel
const channel = stdChannel()
channel.put = (options.emitter || identity)(channel.put)

boundRunSaga = runSaga.bind(null, {
context,
channel,
dispatch,
getState,
sagaMonitor,
logger,
onError,
effectMiddlewares,
})

return next => action => {
if (sagaMonitor && sagaMonitor.actionDispatched) {
sagaMonitor.actionDispatched(action)
}
const result = next(action) // hit reducers

// 将事件传递给saga
channel.put(action)
return result
}
}

// 启动saga
sagaMiddleware.run = (...args) => {
// ...

return boundRunSaga(...args)
}

//...

return sagaMiddleware
}

runsaga

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
export function runSaga(options, saga, ...args) {

// generate iterator
const iterator = saga(...args)

const {
channel = stdChannel(),
dispatch,
getState,
context = {},
sagaMonitor,
logger,
effectMiddlewares,
onError,
} = options

const effectId = nextSagaId()

// 一些错误检查
// ...

const log = logger || _log
const logError = err => {
log('error', err)
if (err && err.sagaStack) {
log('error', err.sagaStack)
}
}

const middleware = effectMiddlewares && compose(...effectMiddlewares)

// 可以先理解为 finalizeRunEffect = runEffect => runEffect
const finalizeRunEffect = runEffect => {
if (is.func(middleware)) {
return function finalRunEffect(effect, effectId, currCb) {
const plainRunEffect = eff => runEffect(eff, effectId, currCb)
return middleware(plainRunEffect)(effect)
}
} else {
return runEffect
}
}

const env = {
stdChannel: channel,
dispatch: wrapSagaDispatch(dispatch),
getState,
sagaMonitor,
logError,
onError,
finalizeRunEffect,
}

// 新建task,作用是控制 Generator 流程,类似与自动流程管理,这个后面会讲到
const task = proc(env, iterator, context, effectId, getMetaInfo(saga), null)

return task
}

redux-saga的核心就是task, 控制generator函数saga执行流程。是一个复杂的自动流程管理,我们先看一个简单的自动流程管理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// 一个返回promise的delay函数
const delay = (ms) => {
return new Promise((res) => {
setTimeout(res, ms);
});
}

function *main() {
yield delay(1000);
console.log('1s later');
yield delay(2000);
console.log('done');
}

// 为了达到想要的执行结果,我们必须在promise resolved之后再执行next statement,比如这样
const gen = main();
const r1 = gen.next();
r1.value.then(() => {
const r2 = gen.next();
r2.value.then(() => {
gen.next();
})
})

使用递归实现,自动流程控制

1
2
3
4
5
6
7
8
9
10
11
12
function autoRun(gfunc) {
const gen = gfunc();

function next() {
const res = gen.next();
if (res.done) return;
res.value.then(next);
}

next();
}
autoRun(main);

上面的自动流程控制函数仅仅支持 promise。

proc

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
export default function proc(env, iterator, parentContext, parentEffectId, meta, cont) {
// ...

const task = newTask(parentEffectId, meta, cont)
const mainTask = { meta, cancel: cancelMain, _isRunning: true, _isCancelled: false }

// 构建 task tree
const taskQueue = forkQueue(
mainTask,
function onAbort() {
cancelledDueToErrorTasks.push(...taskQueue.getTaskNames())
},
end,
)

next()

// then return the task descriptor to the caller
return task

function next(arg, isErr) {
let result
if (isErr) {
result = iterator.throw(arg)
} else if (shouldCancel(arg)) {
// ...
} else if (shouldTerminate(arg)) {
// ...
} else {
result = iterator.next(arg)
}

if (!result.done) {
// 如果没结束, 执行相应 effect
digestEffect(result.value, parentEffectId, '', next)
} else {
/**
This Generator has ended, terminate the main task and notify the fork queue
**/
mainTask._isRunning = false
mainTask.cont(result.value)
}
}

function digestEffect(effect, parentEffectId, label = '', cb) {
// 封装了cb函数 增加了事件钩子
function currCb(res, isErr) {
if (effectSettled) {
return
}

effectSettled = true
cb.cancel = noop // defensive measure
if (env.sagaMonitor) {
if (isErr) {
env.sagaMonitor.effectRejected(effectId, res)
} else {
env.sagaMonitor.effectResolved(effectId, res)
}
}
if (isErr) {
crashedEffect = effect
}
cb(res, isErr)
}

runEffect(effect, effectId, currCb)
}

// 每个 effect 的执行函数 这里先看一下常用的几个effect
function runEffect(effect, effectId, currCb) {
if (is.promise(effect)) {
resolvePromise(effect, currCb)
} else if (is.iterator(effect)) {
resolveIterator(effect, effectId, meta, currCb)
} else if (effect && effect[IO]) {
const { type, payload } = effect
if (type === effectTypes.TAKE) runTakeEffect(payload, currCb)
else if (type === effectTypes.PUT) runPutEffect(payload, currCb)
else if (type === effectTypes.CALL) runCallEffect(payload, effectId, currCb)
// 其他所有的effect ...
else currCb(effect)
} else {
// anything else returned as is
currCb(effect)
}
}

// 当返回值是 promise 时,就和之前实现的自动进程控制函数一样嘛
function resolvePromise(promise, cb) {
// ...
promise.then(cb, error => cb(error, true))
}

// 当是generator函数时
function resolveIterator(iterator, effectId, meta, cb) {
proc(env, iterator, taskContext, effectId, meta, cb)
}

// 当是 take 就把callback放在channel里,如果有匹配事件发生,触发 callback
function runTakeEffect({ channel = env.stdChannel, pattern, maybe }, cb) {
const takeCb = input => {
if (input instanceof Error) {
cb(input, true)
return
}
if (isEnd(input) && !maybe) {
cb(TERMINATE)
return
}
cb(input)
}
try {
channel.take(takeCb, is.notUndef(pattern) ? matcher(pattern) : null)
} catch (err) {
cb(err, true)
return
}
cb.cancel = takeCb.cancel
}

function runPutEffect({ channel, action, resolve }, cb) {
asap(() => {
let result
try {
// 发送 action
result = (channel ? channel.put : env.dispatch)(action)
} catch (error) {
cb(error, true)
return
}

if (resolve && is.promise(result)) {
resolvePromise(result, cb)
} else {
cb(result)
}
})
// put 是不能取消的
}

function runCallEffect({ context, fn, args }, effectId, cb) {
let result

try {
result = fn.apply(context, args)
} catch (error) {
cb(error, true)
return
}
return is.promise(result)
? resolvePromise(result, cb)
: is.iterator(result)
? resolveIterator(result, effectId, getMetaInfo(fn), cb)
: cb(result)
}
}

总结

redux-saga 将异步操作抽象为 effect,利用 generator 函数,控制saga流程。
到目前为止,只是涉及了一些基本流程,下一篇会对本篇进行补充。