WebWorker-专用线程-从0到1实现一个小型comlink(仿comlink)
目前Q&A,看看有无需要了解的部分
- Q1:为什么在Proxy中返回一个fn,需要bind?
- Q2:为什么Await一个Proxy会产生对then的一次拦截?
对于webWorker的封装,主要还是使用一些策略方法来处理调用方的请求,即在eventCallback中额外增加一个区别不同请求的key(type),以便于处理不同的任务
第一步,处理worker中的的message事件
expose用于为ep(Worker/MessagePort/etc)增加一个监听事件,一般对worker中需要暴露的对象使用
这里的expose方法不是最终的expose方法,会随着后续而改动
//先不用在意ts类型
const expose = (
rawObject: unknown,
ep: Endpoint = globalThis as any,
allowedOrigins: string[] = ['*']
) => {
ep.addEventListener('message', function call(ev: MessageEvent<MessageValue<WireValue>>) {
if (!ev || !ev.data) return; //TODO catchError;
if (!allowedOrigins.includes(`*`) && !allowedOrigins.includes(ev.origin as (typeof allowedOrigins)[number])) return; //TODO catchError;
const { id, path = [], type, value } = ev.data; //这里是内部用于传递数据的自定义类型
const parentObj = path.slice(0, -1).reduce((obj, prop) => obj[prop], rawObject);//取到其父对象,用其父对象调用方法,保证this指向不变
const propKey = path.slice(-1)[0];
let returnValue: any;
switch (type) {
case MessageType.GET:
{
returnValue = parentObj[propKey];
}
break;
default:
{
returnValue = undefined;
}
break;
}
if ('start' in ep) ep.start();
ep.postMessage(
{
id,
returnValue
}
);
});
if ('start' in ep) {
ep.start();
}
};
第二步,代理调用方的操作
处理完接收方,我们还需要处理调用方的逻辑,在处理调用方逻辑之前,我们还需要封装一个用于发送消息的函数,通过调用createMessage
向另一端发送消息,同时监听另一端处理完成数据之后返回的结果(也就是expose
函数中的ep.postMessage
)将其转换为一个Promise
const createMessage = (ep: Endpoint, obj: Partial<MessageValue>, transfer?: Transferable[]) => {
const id = generateUUID();//创建一个随机ID,用于区分不同的消息,因为存在多个文件调用同一个worker的情况,所以需要一个id去确认使用的双方
return new Promise((_res, _rej) => {
ep.addEventListener('message', function call(ev: MessageEvent<MessageValue>) {
if (!ev || !ev.data || ev.data.id !== id) return _rej();//这里应该直接return,后面会改动,并赋有解释
ep.removeEventListener('message', call);
_res(ev.data);
});
if ('start' in ep) ep.start();
ep.postMessage(
{
id,
...obj,
},
transfer
);
});
};
现在让我们来处理调用方的逻辑,将调用方对worker的调用,变为主线程让proxy去帮他操作worker,而交给调用方的是一个Promise
const wrap = (ep: Endpoint) => {
return createProxy(ep);
};
const createProxy = (ep: Endpoint, path: string[] = []) => {
const proxy = new Proxy(
{},//这里应该function(){},后面会改动,并赋有解释
{
get(_, prop) {
if (prop === 'then') {//这里表示只有 prop 为 then 时才发送消息执行取值操作,体现在 Comlink 上就是针对代理对象的操作都需要使用 await 获取。
//thenable
const r = createMessage(ep, {
type: MessageType.GET,
path: path,
})
return r.then.bind(r);//这里必须要返回bind之后的then
}
return createProxy(ep, [...path, prop]);
},
}
);
return proxy;
};
好了,我们对于wroker的封装已经初具雏形了,现在让我们来试着使用一下,在使用之前我们先用tsc转译成js文件
//main.js
import { wrap } from './harverWorker.js';
const proxy = wrap(new Worker('./worker1.js'));
console.log(await proxy.name);//22
console.log(proxy.name);//proxy
//worker1.js
importScripts('selfWorker.js');
// 要共享的对象或函数
const myObject = {
name: '22',
};
MyWorker.expose(myObject);
结果是符合预期的
Q1:为什么在Proxy中返回一个fn,需要bind?
因为Proxy存在一定的局限性,对于一些JS中的内置对象,比如Map,Promise等等,这些都使用了内部插槽。内部插槽类似于属性,但是仅限内部使用,用于在对象上记录状态或数据。
例如Map将项目存储在[[MapData]],内置的方法就可以直接访问他们,不会通过[[Get]/[[Set]]],所以Proxy拦截不到。
对于proxy,它会将目标对象上的方法和访问器内部的this的指向修改为代理对象,即使Proxy
未代理任何操作(handler为)
这些对象被代理之后,调用proxy.set相当于Map.prototype.set.call(proxy, 1, 2)
,Map.prototype.set
即访问内部槽proxy.[[MapData]]
。由于proxy
对象没有此内部槽,将抛出错误。所以调用内置的方法将会失败
const map = new Map();
const proxy = new Proxy(map, {});
console.log(proxy.set === new Map().set)//true
proxy.set('test', 1); // Error
其实,这个挺好理解,函数的调用者是谁,this就指向谁
我们可以手动将this指向改为正确的对象
const proxy = new Proxy(map, {
get(target, property, receiver) {
const value = Reflect.get(target, property, receiver)
if (target instanceof Map) {
value = value.bind(target)
}
return value
},
})
对于Promise
实例,proxy
上没有 [[PromiseState]]内部槽,所以需要将then进行bind
Q2:为什么Await一个Proxy会产生对then的一次拦截?
thenable:thenable对象指的是具有then方法的对象
thenable = {then: function (resolve, reject) {resolve(42);}};
这是因为 await v
,会将其转换为Promise.resolve(v)
。
而Promise.resolve()
静态方法将给定值“解析”为 Promise
,如果值是 promise,则返回该 promise;如果该值是 thenable,则将调用其then方法,并为其准备的两个回调;
如果要确定一个对象是否是thenable,需要判断其是否具有then方法,所以表现在proxy中,就是对其then的一次拦截
注意一点避免被绕进去,判断具有then方法,指的是拦截then之后返回的得是一个fn,而不是一个obj,如果不是fn,await 得到的不是返回的obj,而是原来的obj(也就是proxy)
第三步,实现SET
能够传递给worker的数据是能够被结构化克隆算法处理的数据,但是函数不具备这种性质,所以目前还不能赋值属性为一个function
const createProxy = (ep: Endpoint, path: string[] = []) => {
const proxy = new Proxy(
{},
{
get(_, prop) {
if (prop === 'then') {
const r = createMessage(ep, {
type: MessageType.GET,
path: path,
});
return r.then.bind(r);
}
return createProxy(ep, [...path, prop]);
},
set(_, prop, newValue) {
return createMessage(ep, {
type: MessageType.SET,
path: [...path, prop].map(p => p.toString()),
value: newValue,
})
},
}
);
return proxy;
};
switch (type) {
case MessageType.GET:
{
returnValue = parentObj[propKey];
}
break;
case MessageType.SET:
{
parentObj[propKey] = value;
returnValue = true;
}
break;
default:
{
returnValue = undefined;
}
break;
}
这里需要注意的一个地方是,在createMessage中不应该对id不相同的消息给reject掉,
原因是因为set操作没有阻塞的(没有使用await),所以如果在set下面再读取另一端的属性(await proxy.name),这时就会在上一个
SET
的事件没removeEventListener
前又addEventListener
一个GET
的事件,这样在任意一个事件解绑前,worker那边发送的消息都会给这两个callback都去发送一次,所以id这时就可以去区分真正需要的消息,因此我们不能给他reject掉,否则可能会导致在正确的消息传过来之前我们对传过来的错误消息throw error,从而让promise达到终态例如await proxy.name 如果我们在其前面有
proxy.name=222
,那么set可以正确的被处理,但是到了get时,get的callback会先收到set的一次message,然后我们如果进行了rej,那么createMessage
所创建的promise会变为终态,这时获取值的操作已经有结果了,后面虽然会再收到一次正确的get message 但是promise的状态已经无法发生更改了,只能继续流程取消callback同时他也不是很符合reject,因为本来使用的过程中就会存在多个
EventListener
,这里直接return是最正确的处理
之前我以为开源作者漏写了rej,结果发现还是自己太年轻。。。
第四步,实现APPLY
在实现之前,有个地方要注意一下Proxy的target应该为一个function,原因是当我们调用例如proxy.getName()时,是会先走get获取getName的值,发现是function然后执行到
()
会被apply捕获。但在我们的实现中,非then都会返回一个Proxy,并非函数,所以当遇到getName的时候会返回一个proxy,然后执行
()
就会报错,提示is not a function
,但是如果我们的target是一个function,相当于就是proxy(),这样是可以成功调用的tsconst proxy1 = new Proxy(function () {}, {}); console.log(proxy1(1, 2));//success run
//expose
switch (type) {
case MessageType.GET:
{
returnValue = parentObj[propKey];
}
break;
case MessageType.SET:
{
parentObj[propKey] = value;
returnValue = true;
}
break;
case MessageType.APPLY:
{
returnValue = parentObj[propKey].apply(parentObj, ...value);
}
break;
default:
{
returnValue = undefined;
}
break;
}
//createProxy
const createProxy = (ep: Endpoint, path:string[] = []) => {
const proxy = new Proxy(function () {}, {
get(_, prop) {
if (prop === 'then') {
//这里表示只有 prop 为 then 时才发送消息执行取值操作,体现在 Comlink 上就是针对代理对象的操作都需要使用 await 获取。
//thenable
const r = createMessage(ep, {
type: MessageType.GET,
path: path,
});
return r.then.bind(r);
}
return createProxy(ep, [...path, prop]);
},
set(_, prop, newValue) {
return createMessage(ep, {
type: MessageType.SET,
path: [...path, prop].map(p => p.toString()),
value: newValue,
}) as any;
},
apply(_, __, rawArgumentList) {
return createMessage(ep, {
type: MessageType.APPLY,
path: [...path].map(p => p.toString()),
value: rawArgumentList,
}) as any;
},
});
return proxy;
};
第五步,整理内置数据传递格式 & 实现可转移对象
是时候统一一下内部通信时的传递格式了,WeakMap是一种弱引用映射,我们可以利用他来实现可转移对象与raw Object的绑定
注意一个地方,在实际使用中,worker中引入的模块和我们在主线程引入模块是两个不同的模块,也就是说transferCacheObject所记录的映射在主线程和worker中是不同的
const transferCacheObject = new WeakMap<any, Transferable[]>();
const fromWireValue = (val: WireValue) => {
switch (val.type) {
case ValueType.RAW: {
return val.value;
}
}
};
const toWireValue = (val: any): [WireValue, Transferable[]] => {
return [
{
type: ValueType.RAW,
value: val,
},
transferCacheObject.get(val) ?? [],
];
};
const transfer = <T>(obj: T, transfers: Transferable[]) => {
transferCacheObject.set(obj, transfers);
return obj;
};
将我们所有有数据传输的地方都给它套上
const createProxy = (ep: Endpoint, path:string[] = []) => {
const proxy = new Proxy(function () {}, {
get(_, prop) {
if (prop === 'then') {
//这里表示只有 prop 为 then 时才发送消息执行取值操作,体现在 Comlink 上就是针对代理对象的操作都需要使用 await 获取。
//thenable
const r = createMessage(ep, {
type: MessageType.GET,
path: path,
}).then(fromWireValue);
return r.then.bind(r);
}
return createProxy(ep, [...path, prop]);
},
set(_, prop, newValue) {
const [wireValue, transfer] = toWireValue(newValue);
return createMessage(
ep,
{
type: MessageType.SET,
path: [...path, prop].map(p => p.toString()),
value: wireValue,
},
transfer
) as any;
},
apply(_, __, rawArgumentList) {
const [wireValues, transfers] = rawArgumentList.map(toWireValue).reduce(
(array, cur) => {
array = [
[...array[0], cur[0]],
[...array[1], ...cur[1]],
];
return array;
},
[[], []]
);
return createMessage(
ep,
{
type: MessageType.APPLY,
path: [...path].map(p => p.toString()),
value: wireValues,
},
transfers
).then(fromWireValue) as any;
},
});
return proxy;
};
const expose = (rawObject: unknown, ep: Endpoint = globalThis as any, allowedOrigins: string[] = ['*']) => {
ep.addEventListener('message', function call(ev: MessageEvent<MessageValue<any>>) {
if (!ev || !ev.data) return; //TODO catchError;
if (!allowedOrigins.includes(`*`) && !allowedOrigins.includes(ev.origin as (typeof allowedOrigins)[number])) return; //TODO catchError;
const { id, path = [], type, value } = ev.data; //这里是内部用于传递数据的自定义类型
const parentObj = path.slice(0, -1).reduce((obj, prop) => obj[prop], rawObject); //取到其父对象,用其父对象调用方法,保证this指向不变
const propKey = path.slice(-1)[0] as any;
let returnValue: any;
switch (type) {
case MessageType.GET:
{
returnValue = parentObj[propKey];
}
break;
case MessageType.SET:
{
parentObj[propKey] = fromWireValue(value);
returnValue = true;
}
break;
case MessageType.APPLY:
{
returnValue = parentObj[propKey].apply(parentObj, value.map(fromWireValue));
}
break;
default:
{
returnValue = undefined;
}
break;
}
if ('start' in ep) ep.start();
Promise.resolve(returnValue).then(value => {
const [wireValue, transfer] = toWireValue(value);
ep.postMessage(
{
...wireValue,
id,
},
transfer
);
});
});
if ('start' in ep) {
ep.start();
}
};
第六步,实现函数传递并保留上下文
目前在我们的的实现中无法实现函数的传递,但是我们可以借助MessageChannel来改变一种实现的方式,MessageChannel可以在不同线程中通信,那么我们也可以仿造代理worker线程一样,对MessageChannel也进行代理,具体来说MessageChannel会给出两个port(port1、port2),我们可以代理port1,接受来自port2的msg并做出处理
const transferHandle: Handle = {
canHandle: (obj: unknown) => isObject(obj) && obj[proxyMarker],
serialize: (obj: unknown): [MessagePort, Transferable[]] => {
const { port1, port2 } = new MessageChannel();
expose(obj, port1); //这里要分清楚是在主线程环境下调用的还是在worker下调用的proxy(),如果是在主线程,那么是worker线程传递消息给主线程,主线程处理消息,反之则是worker线程处理消息
return [port2, [port2]]; //将另一个port传递给信道的另一端
},
deserialize: (port2: MessageChannel['port1']) => {
if ('start' in port2) port2.start();
return wrap(port2); //接受传递来的另一端口,并将其交给proxy
},
};
const handles = new Map<string, Handle>([['proxyMarker', transferHandle]]);
const fromWireValue = (val: WireValue) => {
switch (val.type) {
case ValueType.RAW: {
return val.value;
}
case ValueType.HANDLER: {
return handles.get(val.name).deserialize(val.value);
}
}
};
const toWireValue = (val: any): [WireValue, Transferable[]] => {
for (const [name, handle] of handles) {
if (handle.canHandle(val)) {
const [value, transfer] = transferHandle.serialize(val);
return [
{
type: ValueType.HANDLER,
value,
name,
},
transfer,
];
}
}
return [
{
type: ValueType.RAW,
value: val,
},
transferCacheObject.get(val) ?? [],
];
};
const proxy = (obj: Record<string, any>) => {
return Object.assign(obj, { [proxyMarker]: true });
};
测试一下
function callback(value) {
alert(`Result: ${value}`);//window独有,worker中不存在
}
async function init() {
const remoteFunction = wrap(new Worker('worker1.js'));
await remoteFunction(proxy(callback));
}
init();
//worker
async function remoteFunction(cb) {
await cb('A string from a worker');
}
MyWorker.expose(remoteFunction);
符合我们预期
第七步,回收信道
对于已经完成计算的worker/port,需要关闭ep
FinalizationRegistry可以在对值进行垃圾回收时请求回调,也就是GC时会调用的清理callback,一般调用就表示这个值已经被垃圾回收了
const releaseProxy = Symbol('releaseProxy');
const proxyFinalizers =
'FinalizationRegistry' in globalThis &&
new FinalizationRegistry((ep: Endpoint) => {
proxyCount.set(ep, (proxyCount.get(ep) ?? 1) - 1);
if (!proxyCount.get(ep)) {
releaseEndpoint(ep);
}
});
function isMessagePort(endpoint: Endpoint): endpoint is MessagePort {
return endpoint.constructor.name === 'MessagePort';
}
function closeEndPoint(endpoint: Endpoint) {
if (isMessagePort(endpoint)) endpoint.close();
}
const releaseEndpoint = (ep: Endpoint) => {
return createMessage(ep, { type: MessageType.RELEASE }).then(_ => {
closeEndPoint(ep);
});
};
const unregisterProxy = (proxy: any) => {
if (proxyFinalizers) {
proxyFinalizers.unregister(proxy);
}
};
const registerProxy = (proxy: any, ep: Endpoint) => {
proxyCount.set(ep, (proxyCount.get(ep) ?? 0) + 1);
if (proxyFinalizers) {
proxyFinalizers.register(proxy, ep, proxy);
}
};
const throwIfProxyReleased = (isReleased: boolean) => {
if (isReleased) {
throw new Error('Proxy has been released and is not useable');
}
};
const createProxy = (ep: Endpoint, path: string[] = []) => {
let isProxyReleased = false;
const proxy = new Proxy(function () {}, {
get(_, prop) {
throwIfProxyReleased(isProxyReleased);
if (prop === releaseProxy) {
return () => {
unregisterProxy(proxy);
releaseEndpoint(ep);
isProxyReleased = true;
};
}
if (prop === 'then') {
//这里表示只有 prop 为 then 时才发送消息执行取值操作,体现在 Comlink 上就是针对代理对象的操作都需要使用 await 获取。
//thenable
const r = createMessage(ep, {
type: MessageType.GET,
path: path,
}).then(fromWireValue);
return r.then.bind(r);
}
return createProxy(ep, [...path, prop]);
},
set(_, prop, newValue) {
const [wireValue, transfer] = toWireValue(newValue);
return createMessage(
ep,
{
type: MessageType.SET,
path: [...path, prop].map(p => p.toString()),
value: wireValue,
},
transfer
) as any;
},
apply(_, __, rawArgumentList) {
const [wireValues, transfers] = rawArgumentList.map(toWireValue).reduce(
(array, cur) => {
array = [
[...array[0], cur[0]],
[...array[1], ...cur[1]],
];
return array;
},
[[], []]
);
return createMessage(
ep,
{
type: MessageType.APPLY,
path: [...path].map(p => p.toString()),
value: wireValues,
},
transfers
).then(fromWireValue) as any;
},
});
registerProxy(proxy, ep);
return proxy;
};
const expose = (rawObject: unknown, ep: Endpoint = globalThis as any, allowedOrigins: string[] = ['*']) => {
ep.addEventListener('message', function call(ev: MessageEvent<MessageValue<any>>) {
if (!ev || !ev.data) return; //TODO catchError;
if (!allowedOrigins.includes(`*`) && !allowedOrigins.includes(ev.origin as (typeof allowedOrigins)[number])) return; //TODO catchError;
const { id, path = [], type, value } = ev.data; //这里是内部用于传递数据的自定义类型
const parentObj = path.slice(0, -1).reduce((obj, prop) => obj[prop], rawObject); //取到其父对象,用其父对象调用方法,保证this指向不变
const rawValue = path.reduce((obj, prop) => obj[prop], rawObject) as any;
let returnValue: any;
switch (type) {
case MessageType.GET:
{
returnValue = rawValue;
}
break;
case MessageType.SET:
{
parentObj[path.slice(-1)[0]] = fromWireValue(value);
returnValue = true;
}
break;
case MessageType.APPLY:
{
returnValue = rawValue.apply(parentObj, value.map(fromWireValue));
}
break;
case MessageType.RELEASE:
default:
{
returnValue = undefined;
}
break;
}
if ('start' in ep) ep.start();
Promise.resolve(returnValue).then(value => {
const [wireValue, transfer] = toWireValue(value);
ep.postMessage(
{
...wireValue,
id,
},
transfer
);
if (type === MessageType.RELEASE) {
//释放
ep.removeEventListener('message', call as any);
closeEndPoint(ep);
}
});
});
if ('start' in ep) {
ep.start();
}
};
第八步,错误处理
const throwMarker = Symbol('error');
const throwHandler: Handle<ErrorValue, any> = {
canHandle: value => isObject(value) && obj[throwMarker],
serialize(obj: unknown) {
if (obj instanceof Error) {
return [
{
isError: true,
value: {
message: obj.message,
name: obj.name,
stack: obj.stack,
},
},
[],
];
} else {
return [{ isError: false, value: obj }, []];
}
},
deserialize(serialized) {
if (serialized.isError) {
throw Object.assign(new Error(serialized.value.message), serialized.value);
}
throw serialized.value;
},
};
const handles = new Map<string, Handle<any, any>>([
['proxyMarker', transferHandle],
['throwMarker', throwHandler],
]);
const expose = (rawObject: unknown, ep: Endpoint = globalThis as any, allowedOrigins: string[] = ['*']) => {
ep.addEventListener('message', function call(ev: MessageEvent<MessageValue<any>>) {
if (!ev || !ev.data) return; //TODO catchError;
if (!allowedOrigins.includes(`*`) && !allowedOrigins.includes(ev.origin as (typeof allowedOrigins)[number])) return; //TODO catchError;
const { id, path = [], type, value } = ev.data; //这里是内部用于传递数据的自定义类型
let returnValue: any;
try {
const parentObj = path.slice(0, -1).reduce((obj, prop) => obj[prop], rawObject); //取到其父对象,用其父对象调用方法,保证this指向不变
const rawValue = path.reduce((obj, prop) => obj[prop], rawObject) as any;
switch (type) {
case MessageType.GET:
{
returnValue = rawValue;
}
break;
case MessageType.SET:
{
parentObj[path.slice(-1)[0]] = fromWireValue(value);
returnValue = true;
}
break;
case MessageType.APPLY:
{
returnValue = rawValue.apply(parentObj, value.map(fromWireValue));
}
break;
case MessageType.RELEASE:
default:
{
returnValue = undefined;
}
break;
}
} catch (value) {
returnValue = { value, throwMarker: true };
}
Promise.resolve(returnValue)
.catch(value => {
return { value, [throwMarker]: 0 };
})
.then(value => {
const [wireValue, transfer] = toWireValue(value);
ep.postMessage(
{
...wireValue,
id,
},
transfer
);
if (type === MessageType.RELEASE) {
//释放
ep.removeEventListener('message', call as any);
closeEndPoint(ep);
}
})
.catch(value => {
return { value, [throwMarker]: 0 };
});
});
if ('start' in ep) {
ep.start();
}
};
到此,我们对worker主要的封装处理已经完毕了,其中错误还望指出,后续会及时修改