Worker 通信
API
很简单,只有
postMessage
和
onmesssage
两个,通信过程如图所示:
Worker 通信 API 虽然简洁,但是它们是基于事件驱动的,
onmesssage
必须通过回调函数去接收消息,并且多次同类型通信要匹配到各自的响应使用
onmesssage
只能通过某种标志去区分,这就导致代码组织会很凌乱,缺少易用性,增加了调试的困难,降低了排查问题的效率,因此业务使用 Web Worker 一般都会对原生通信 API 进行封装,如封装为 Prmoise 调用。
所有代码组织基于
worker-loader
书写形式,其他形式请根据具体规则进行修改。
postMessage 数据传输方式有多种,文章内容基于 Structured Clone 类型,如需了解更多关于通信的内容,请参考文末文献
通信 Promise 封装
Prmoise 封装思路很简单,那就是把
postMessage
,
onmesssage
装进
new Promise
中,然后在
onmesssage
的回调函数中 resolve 结果,在出错时 reject 错误,即可。
main.js 封装
有了之前的问题分析和上述一次性通信思路,分析下常规封装思路,首先是 main.js:
形式上,需要要调用一个新方法,方法返回一个
Promise
对象,
Promise
对象内部调用
postMessage
发送消息,然后等待
onmesssage
返回结果,根据结果调用
resolve
或
reject
改变
Promise
状态。
逻辑上,创建一个接收消息时调用
resolve
或
reject
的回调函数 callback ,
postMessage
时为了表示回复的是哪一个消息需要创建一个唯一的消息 ID,并且建立 ID 和 callback 的映射关系,全局存储。
消息 ID 经过传递,最终会在
onmesssage
时随着结果 result 一起返回,根据 ID 找到 对应的 callback,并处理 result,最终改变
Promise
状态,返回结果。
代码如下:
import { generateUUID } from '@/utils';
* @description: Promise 化 Worker 类,使用此类需要在 worker.js 中 postMessage 时 带上 messageId,或者使用 registerPromiseWorker 类
* @param { worker } worker 实例
* @return {*}
export const PromiseWorker = class {
#callbacks = {};
worker = null;
constructor(worker) {
this.worker = worker;
worker.addEventListener('message', (e) => {
const msg = e.data;
if (!Array.isArray(msg) || msg.length < 2) return;
const [messageId, error, result] = msg;
const callback = this.#callbacks[messageId];
if (!callback) return;
delete this.#callbacks[messageId];
callback(error, result);
postMessage(initMsg) {
const messageId = generateUUID();
const msg = [messageId, initMsg];
return new Promise((resolve, reject) => {
this.#callbacks[messageId] = function(error, result) {
if (error) return reject(new Error(error));
resolve(result);
this.worker.postMessage(msg);
也可以基于 onmessage
封装,思路大同小异,分析如下:
需要一个新方法 createPromise
,参数接收一个消息ID,返回一个 Promise
, Promise
内部调用 onmessage
。
需要一个新方法 postMessage
,内部先调用 createPromise
返回一个 Promise
,同时触发 onmessage
,然后执行真正的 postMessage
方法发出消息,参数也需要携带唯一的消息 ID,最终把 Promise
返回出去,等待Promise
结果,也就是 onmessage
的结果。
createPromise
内部根据 ID 匹配判断是不是符合规范并且是需要的结果,如果不是需要的就不处理让 Promise
继续处于 pending
状态,反之根据结果把Promise
设置成 fulfilled
状态或者 resolved
状态
代码如下:
import { generateUUID } from '@/utils';
* @description: Promise 化 Worker 类,使用此类需要在 worker.js 中 postMessage 时 带上 messageId,或者使用 registerPromiseWorker 类
* @param { worker } worker 实例
* @return {*}
export const PromiseWorker = class {
worker = null;
constructor(worker) {
this.worker = worker;
_createPromise(id) {
return new Promise((resolve, reject) => {
const listener = (e) => {
const msg = e.data;
if (Array.isArray(msg) && msg.length >= 2 && msg[0] === id) {
const error = msg[1];
const result = msg[2];
if (error) return reject(new Error(error));
resolve(result);
this.worker.removeEventListener('message', listener);
this.worker.addEventListener('message', listener);
postMessage(initMsg) {
const messageId = generateUUID();
const promise = this._createPromise(messageId);
const msg = [messageId, initMsg];
this.worker.postMessage(msg);
return promise;
worker.js 封装
接着是 worker.js,只需要在计算处理完毕后,在响应回复中带上请求的 ID 即可:
export const WorkerThreadController = class {
#callback = null;
constructor(callback) {
this.#callback = callback;
this.worker = self;
this.worker.onmessage = this.onmessage.bind(this);
async onmessage(e) {
const payload = e.data;
if (!Array.isArray(payload) || payload.length !== 2) return;
const [messageId, message] = payload;
const result = await this.#callback(message);
this.worker.postMessage([messageId, result]);
为了增加框架的鲁棒性,worker.js 中我们还应该加入错误处理以及 callback 返回值仍然为 Promise 情况的处理能力。完整版代码如下:
export const WorkerThreadController = class {
#callback = null;
constructor(callback) {
this.#callback = callback;
this.worker = self;
this.worker.onmessage = this.onmessage.bind(this);
* @description: 处理 onmessage 事件
* @param {*} e 传递事件
* @return {*}
async onmessage(e) {
const payload = e.data;
if (!Array.isArray(payload) || payload.length !== 2) return;
const [messageId, message] = payload;
let result = null;
try {
const callbackResult = await this.#callback(message);
result = { res: callbackResult };
} catch (e) {
result = { err: e };
if (result.err) this._postMessage(messageId, result.err);
else if (!this._isPromise(result.res)) this._postMessage(messageId, null, result.res);
else {
result.res
.then(res => this._postMessage(messageId, null, res))
.catch(err => this._postMessage(messageId, err))
* @description: 判断是不是 Promise
* @param {*} Obj 判断对象
* @return {*}
_isPromise(obj) {
return !!obj && (typeof obj === 'object' || typeof obj === 'function') && typeof obj.then === 'function';
* @description: 封装 postMessage 方法
* @param {*} messageId
* @param {*} error
* @param {*} result
* @return {*} [messageId, error, result]
_postMessage(messageId, error, result) {
if (error) {
console.error('Worker caught an error:', error);
this.worker.postMessage([messageId, { message: error.message }]);
} else {
this.worker.postMessage([messageId, null, result])
至此,一个简单好用的 Promise Worker
管理模块则就建立完成了,用户使用和普通代码几乎无差异,对通信过程无感知。使用示例如下:
import { PromiseWorker } from '@/utils/web-worker.js';
import Worker from './xxx.worker.js';
const workerPromise = new PromiseWorker(new Worker());
const workerRes = await workerPromise.postMessage('...');
import { WorkerThreadController } from '@/utils/web-worker.js';
new WorkerThreadController(async (event) => {
return '...';
一次性通信
考虑到现在流行的一些类库都有类似的封装,比如 vue-worker 中的 run
方法,创建一个一次性的 Worker,运行完毕就销毁,此方法类似于 Promise.resolve()
,但在是在另一个线程中。
这里我们也来在PromiseWorker的封装中增加一个 run 方法,如下:
export const PromiseWorker = class {
async run(initMsg) {
try {
const result = await this.postMessage(initMsg);
this.worker.terminate();
return result;
} catch(e) {
this.worker.terminate();
throw e;
这里要注意:浏览器创建销毁线程都是有代价的,线程的频繁新建会消耗资源,而每个 Worker 线程会有约 1M 的固有内存消耗,因此大多数场景下,Worker 线程应该用作常驻的线程,开发中优先复用常驻线程。
一次性执行,只适合特定场景,比如预加载网站数据,多页面应用切换页面加速等等场景。
通信 RPC 封装
基于 Promise 的封装已经大大增加了 Web Worker 的易用性,但是我们还必须考虑 worker.js 中逻辑复杂度的问题,如果一个 Worker 中要处理多种不同的逻辑,那 worker.js 中的代码组织将会变得异常繁杂,难以维护。
理想中我们希望 Web Worker 作为插件与框架之间通过接口定义进行通信,就像我们调用一个后端 API 一样,这样可以保证开发风格的一致化,也可以对同一个 Worker 进行最大限度的扩展,如下以 API 直接调用的形式:
const workerInstance = new Worker('./worker.js');
export async function getData(args) {
await workerInstance.getData(args);
async function xxx() {
const data = await getData()
class RemoteAPI {
constructor() {}
getData() {}
serializeData() {}
如何实现上述效果,说到这里,其实大部分有经验的小伙伴可能已经有答案了,那就是RPC(Remote Procedure Call)远程过程调用,简单概括下就是要像调用本地的函数一样去调远程函数,『远程』通常指服务器,这里是 Worker,即客户端像调用本地的函数一样去调用 Worker 中的函数。
基于 RPC 封装 Worker,过程非常复杂,文章只介绍下原理和简单实现,实际项目中建议大家采用第三方库,如 comlink、workerize-loader 和 alloy-worker 等,避免踩坑。
对 RPC 不了解的小伙伴,可以用 5 分钟左右看下这篇文章,还是讲的非常清晰且浅显易懂的。
RPC 实现有两种路线,一种是中间API描述的代码生成(如GRPC),另一种是利用反射/动态特性,运行时构造远端的调用,基于 javascript 的动态特性,对于第二种实现 RPC 具有优点(实现非常简短)。
下面以 comlink 为例讲解一下实现思路,comlink 基于 es6 的 proxy 特性实现,利用此特性可将 remote.xxx, 转成统一函数调用的方式,
Promise 封装和 RPC 封装都可以提升 Web Worker 的使用体验,大家开发过程中可以视情况选择其中一种。
关于 Web Worker 的多篇文章预备中,感兴趣的小伙伴可以关注下
Comlink 的使用 & 源码解析 | 轻松编写 Web Workers
分布式架构核心RPC原理
Web Worker 文献综述
web系统的插件架构之—— web-worker-rpc