Lambda Academy

活用控制反转 -- 解决一个棘手信息传递问题

July 18, 2018

在我初学编程的时候,还没写过完整点的项目就看过了一些高阶概念。在没有实践时,这些概念的神奇和强大之处很难被完全体会的。而一旦自己在摸索中应用了,瞬间觉得打开了一扇大门,技能又提升了一个层次。控制反转(Inversion of Control)就是这些强大概念之一。一年前在 MPJ 老师的频道上了解到了,但一直没自己独立创造场景用过。直到最近在项目中遇到个坑才用起来。

其实控制反转或者依赖注入(这两个感觉是同一个东西,看你从什么角度看)在前端框架中已经大量使用了。最早由 Angular 普及,后续的现代框架都有应用。比如 React 开发中目前最火的组件设计模式 Render Props,就是控制反转的一种应用。离开框架,在日常开发中,应用这种技巧可以帮助我们解决很多棘手的问题,今天就讲下我在开发中的一次应用。

Edit: 这里我认知有误。Render Props 不是对依赖注入的应用。

项目场景是这样的:技术栈是 Nuxt + Vuex。项目需要连接 Web Socket,然后根据 Socket 传来的数据,对 Vuex 里面相应的数据进行修改。公司为了节约成本,将 Socket 数据压缩了,而且不是全量推送,这要求前端收到数据后对数据进行解压,然后对数据进行遍历查找,更新,重新计算和排序,总之对 Socket 数据的处理非常复杂。为了不影响性能,我把 Socket 连接和数据处理放进了 Web Worker。先来看下项目结构。

下面是我封装的一个 Socket 工厂函数:

// utils/socket.js
export default function Socket() {
  let heartBeat; // 心跳记录
  let lost = 0; // 心跳失败次数

  function decLost() {
    lost -= 1;
  }

  function connect() {
    const socket = new WebSocket("wss://xx.com");

    socket.onopen = () => {
      heartBeat = setInterval(() => {
        socket.send(2);
        lost += 1;
        if (lost === 3) {
          // 心跳失败超过 3 次,断开重连
          clearInterval(heartBeat);
          socket.close();
          connect();
        }
      }, 5000);
    };

    socket.onerror = () => {
      clearInterval(heartBeat);
      socket.close();
    };

    socket.onclose = () => {
      setTimeout(() => {
        clearInterval(heart);
        connect();
      }, 3000);
    };
    return socket;
  }
  return Object.freeze({ decLost, connect });
}

Socket 连接实现了心跳机制。onopen 之后,每隔 5 秒向服务器发送 2,并把心跳失败次数加 1;服务器收到 2 之后会返回 3,客户端收到 3 之后再把心跳失败次数减 1。工厂函数暴露的 decLost 方法是为了外部在收到 3 之后把心跳次数减 1.

在 Web Worker 文件里面,调用 Socket 工厂函数,并连接 socket:

// workers/socket.js
import Socket from "~/utils/socket";

const socket = Socket();
const socketConnection = socket.connect();

socketConnection.onmessage = ({ data }) => {
  // 处理 socket 接收到的数据,
  // 处理完后通过 web worker 接口发出去
  postMessage(result);
};

// web worker 收到外部的数据后,把数据发给 socket
onmessage = ({ data }) => {
  socketConnection.send(data);
};

然后在一个 Nuxt 插件里,引入 socket worker,收到 worker 里传来的数据后,把数据交给 Vuex Store,反之,监听到相关 Vuex Mutation 后,把 payload 传给 worker:

// plugins/socket.js
// webpack 下导入 web worker 的方式:
import SocketWorker from "worker-loader!~/workers/socket.js";

const socketWorker = new SocketWorker();

export default ({ store }) => {
  store.subscribe((mutation, state) => {
    // 监听到相关 Vuex Mutation
  });

  socketWorker.onmessage = ({ data }) => {
    //监听 socket 发来的数据,收到数据后,
    // 通过 store.commit() 来把数据存入 vuex store
  };
};

这是我一开始写的 naive 版本,看起来主要功能实现了,而且封装和 Separation of concerns 做的也不错。写完刚跑起来,问题出现了。

挑战一:等 socket 连接成功后再发起订阅

当应用打开后,需要立即订阅推送数据,包括用户登录状态下的私有数据和其它基础数据等。但是当发起订阅时,socket 可能连接成功了,也可能还没连接成功。一开始我想设置个定时器,过两秒后再发起订阅。可是想想这种做法也太挫了。第二个思路是在 socket 连接的 onopen 事件里执行订阅。可是这样子会直接把以前的 onopen 覆盖掉,而且这样做违反了封装原则。剩下就一个办法了,等连接成功后再发请求。来看怎么做的:

// workers/socket.js
// ...
// const socketConnection ...

const waitForConnection = timeout =>
  new Promise((resolve, reject) => {
    const check = () => {
      if (socketConnection.readyState === 1) {
        resolve();
      } else if ((timeout -= 100) < 0) {
        reject("socket connection timed out");
      } else {
        setTimeout(check, 100);
      }
    };
    setTimeout(check, 100);
  });

// ... 其它细节

onmessage = async ({ data }) => {
  try {
    await waitForConnection(2000);
  } catch (e) {
    console.error(e);
    return;
  }
  socketConnection.send(data);
};

waitForConnection 函数会每隔 100 ms 检查 socket 连接状态,如果连接状态是 1(成功),则 resolve Promise,否则一直隔 100 ms 检查一次,直到连接成功或者超过指定时间。

在向 socket 发送数据之前,先调用 waitForConnection,并指定最多等 2 秒,确保连接成功后再发送数据。

问题看起来解决了。奇淫技巧都用上了,让我满意了一会儿。直到……

需求来了!

在 socket 断开重连后,需要续订之前的订阅。而包括用户 token 等订阅参数全都在 Vuex Store 里面。那这下头疼了,Vuex store 里面是没法知道断开重连的,而 worker 里面则根本没法读取 vuex store。知道这个需求后我内心是崩溃的,这根本没法写下去了啊!就在我都快要打算调整架构重写时,一拍脑袋灵光一闪,试试控制反转!

首先要让 Socket 工厂函数有个判断重连的机制。这个简单。

// utils/socket.js
export default function Socket() {
  let connectCount = 0; // 连接成功次数
  // ...细节,见文章前面
  socket.onopen = () => {
    // 每次连接成功,连接次数加1
    connectCount += 1;

    if (connectCount > 1) {
      // 若连接次数超过一次,则说明此次是重连
      // 在这里可以做些重连之后的操作了
    }
  };
  // ...
}

重连之后具体做什么事,这可以用依赖注入来实现。先在 worker 文件里定义要做的事情,然后在调用 Socket 工厂函数时注入方法:

// worker/socket
// 通过 postMessage 通知外部重连
const notifyReconnect = () => {
  postMessage({ type: "reconnect" });
};

const socket = Socket(notifyReconnect);

然后在 Socket 函数里接收一下:

// utils/socket.js
export default function Socket(notifyReconnect) {
  // ...细节,见文章前面
  socket.onopen = () => {
    connectCount += 1;
    if (connectCount > 1) {
      notifyReconnect();
    }
  };
  // ...
}

我以为写到这里应该就可以了的,然而我还是太天真了。运行后,plugins/socket 文件里能接收到重连消息,但是一直连接失败。这个问题很诡异,最后发现还是因为我对 Web Socket 掌握的不深导致的。每次 socket 连接后,生成的连接实例都是新的。而我在 waitForConnection 方法里监听的 socketConnection 在关闭后,readyState 一直是 3(关闭状态),导致 waitForConnection 方法一直报 timeout 错误。

剩下的最后问题是每次重连,都更新连接实例。方法如下:

// worker/socket
let socketConnection;

const notifyReconnect = connection => {
  postMessage({ type: "reconnect" });
  socketConnection = connection;
};

const socket = Socket(notifyReconnect);
socketConnection = socket.connect();
// utils/socket
export default function Socket(notifyReconnect) {
  // ...细节,见文章前面
  socket.onopen = () => {
    connectCount += 1;
    if (connectCount > 1) {
      notifyReconnect(socket);
    }
  };
  // ...
}

Socket 函数在调用 notifyReconnect 时,传入最新的连接实例 socket

至此,功能都实现了。完整代码如下:

// utils/socket.js
export default function Socket(notifyReconnect) {
  let heartBeat;
  let lost = 0;
  let connectCount = 0;

  function decLost() {
    lost -= 1;
  }

  function connect() {
    const socket = new WebSocket("wss://xx.com");

    socket.onopen = () => {
      connectCount += 1;
      if (connectCount > 1) {
        notifyReconnect(socket);
      }
      heartBeat = setInterval(() => {
        socket.send(2);
        lost += 1;
        if (lost === 3) {
          clearInterval(heartBeat);
          socket.close();
          connect();
        }
      }, 5000);
    };

    socket.onerror = () => {
      clearInterval(heartBeat);
      socket.close();
    };

    socket.onclose = () => {
      setTimeout(() => {
        clearInterval(heart);
        connect();
      }, 3000);
    };
    return socket;
  }
  return Object.freeze({ decLost, connect });
}
// workers/socket.js
import Socket from "~/utils/socket";

let socketConnection;

const notifyReconnect = connection => {
  postMessage({ type: "reconnect" });
  socketConnection = connection;
};

const socket = Socket(notifyReconnect);
socketConnection = socket.connect();

const waitForConnection = timeout =>
  new Promise((resolve, reject) => {
    const check = () => {
      if (socketConnection.readyState === 1) {
        resolve();
      } else if ((timeout -= 100) < 0) {
        reject("socket connection timed out");
      } else {
        setTimeout(check, 100);
      }
    };
    setTimeout(check, 100);
  });

socketConnection.onmessage = ({ data }) => {
  // 处理完数据后通过 web worker 接口发出去
  postMessage(result);
};

onmessage = async ({ data }) => {
  try {
    await waitForConnection(2000);
  } catch (e) {
    console.error(e);
    return;
  }
  socketConnection.send(data);
};
// plugins/socket.js
import SocketWorker from "worker-loader!~/workers/socket.js";

const socketWorker = new SocketWorker();

export default ({ store }) => {
  store.subscribe((mutation, state) => {});

  socketWorker.onmessage = ({ data }) => {
    if (data.type === "reconnect") {
      socketWorker.postMessage(/* 订阅参数 */);
    }
  };
};

Edit: 测试时发现上面写法有个问题。即重连后,虽然连接实例更新了,但是 onmessage 事件没有更新,导致重连后的 socket 数据没法处理。解决办法是把重连更新连接实例的逻辑放在 plugins/socket.js 文件。Socket 工厂函数的 connect 方法返回一个 Promise,在新的连接实例的 onopen 事件里 resolve promise。这样调整之后,就用不到 waitForConnection 函数了。


Lambda Academy

Lei Huang

这个博客我主要分享函数式编程和前端开发。我还有一个英文网站