banner
innei

innei

写代码是因为爱,写到世界充满爱!
github
telegram
twitter

利好 SharedWorker 实现跨页面单例 WebSocket

在之前的文章中,我详细的介绍了站点的实时人数是如何实现的?,在方案中,我留下了一个悬念,如何使用 SharedWorker 在页面之间共享 Socket 实例,实现跨页面单例 WebSocket。

动机#

探索这个问题的背景是无意间在知乎看到了 WebSocket 的一个问题,其中有回答提到 WebSocket 连接过于占用服务器资源。当页面重复打开很多个时,每个页面都会建立 WebSocket 连接,确实无意间的增大了服务器压力。虽说这个问题我之前也有考虑过,但是由于个人站点并不会出现大量的用户同时在线,所以并没有深入研究。我们使用的是 Socket.IO 的方案建立 WebSocket 连接,Socket.IO 本身就是对 WebSocket 的封装,所以从开销上来说,Socket.IO 会比原生 WebSocket 多一些开销。关于 Socket.IO 连接对服务器侧的内存开销,文档中有所提及:Memory Usage | Socket.IO

虽然这点性能优化不痛不痒也没有任何提升,但是既然有这个问题,那我们这次就试着去解决这个问题。

找到方案#

我们要解决的问题很明确,就是当一个浏览器打开两个或以上的我站的页面时(多个 Tab),复用同一个 Socket 实例。

通过搜索关键字,我们了解到一个 API,SharedWorker,SharedWorker 是一个在多个浏览上下文(例如多个窗口、标签或 iframe)之间共享的 Worker。SharedWorker 有一个全局作用域,可以在多个浏览上下文中使用,这样就可以实现我们的目标。

我们只需要把原本在 Socket 实例的代码放到 SharedWorker 中,然后在页面中与 Worker 中的 Socket 实例通信即可。由于之前的 SocketClient 对 Socket 进行了抽象,在此次重构中并不需要修改太多代码,而是只需要实现通信层即可。

SharedWorker 基本使用#

先来看看 SharedWorker 是如何使用的。

let i = 0

const ports = []

onconnect = (e) => {
  const port = e.ports[0];

  ports.push(port)

  port.addEventListener("message", (e) => {
    const workerResult = `Result: ${e.data[0] * e.data[1]}`;
    port.postMessage(workerResult);
  });
  port.start();
};
setInterval(() => {
  i++;
  ports.forEach(port => port.postMessage("Message from worker, time: " + i))
}, 1000);
const worker = new SharedWorker('/shared-worker.js')
worker.port.onmessage = (e) => {
  console.log('Message received from worker', e.data)
}
worker.port.start()

上面的例子中,我们创建了一个 SharedWorker,然后在页面中与 Worker 通信。在 Worker 中,我们监听了 onconnect 事件,当 Worker 连接时,我们监听了 port 的 message 事件,然后在页面中,我们监听了 port 的 message 事件,worker 中的消息在页面中进行进一步处理。

SharedWorker 同 Worker 一样,消息通过 MessageChannel 进行传递。而 worker.port 本质就是对 MessageChannel 的封装。

现在我们运行这个基本的例子,看看效果。

https://codesandbox.io/p/sandbox/ecstatic-jerry-6xsgvs

当我们同时打开两个 Tab 时,我们可以看到两个 Tab 都会收到 Worker 发送的消息。

image

通过 chrome://inspect/#workers 我们可以看到 Worker 的信息。当打开多个 Tab 时,SharedWorker 会在多个 Tab 之间共享;当多个页面都被关闭时,Worker 会被销毁。

在 Next.js 中使用 SharedWorker#

上面的例子中,我们把 worker 的实现放在了 public 目录下,这样的方式并不适用于工程化的项目中,因为在工程化项目中,我们不应该直接引用 public 目录下的文件,而是应该通过 import 的方式引入。第二,我们或许需要在 Worker 实现中引用外部库,或者使用 TypeScript 编写 Worker 实现。而对于这些需求,直接裸写 js 的方式是无法实现的。

对此,我们需要对 Next.js 的 webpack 配置进行一些修改,以支持 Worker 的引入。

首先安装 worker-loader

npm i -D worker-loader

修改 next.config.js

/** @type {import('next').NextConfig} */

let nextConfig = {
  webpack: (config, { webpack }) => {
    config.module.rules.unshift({
      test: /\.worker\.ts$/,
      loader: 'worker-loader',
      options: {
        publicPath: '/_next/',
        worker: {
          type: 'SharedWorker',
          // https://v4.webpack.js.org/loaders/worker-loader/#worker
          options: {
            name: 'ws-worker',
          },
        },
      },
    })

    return config
  },
}

export default nextConfig

这里我们使用了 worker-loader,并且配置了文件后缀 .worker.tsSharedWorker 实现,这样我们就可以在 Next.js 中使用 SharedWorker 了。

const worker = new SharedWorker("/worker.js"); // ![code --]
import worker from './worker.worker' // ![code ++]

实现 SharedWorker Socket#

现在我们在 Worker 中实现 Socket 的逻辑。

大概梳理一下流程:

  1. 首先在建立 Worker 时候(也就是第一个页面会建立 Worker),传递 Socket 连接配置。(因为 Worker 中无法获得环境变量和主线程变量)
  2. 创建 Socket 实例,然后对 Socket 连接状态和消息事件监听,通过 MessageChannel 传递消息给主线程(页面)。
  3. 在新页面打开时,或者 Socket 连接完成后,传递 Socket 相关信息到主线程,这些数据需要被主进程存储并用于其他组件消费。
  4. 在主进程中,实现和 Worker 的通信,以及对 Socket 实例的操作。比如 emit 方法。
Mermaid Loading...

具体的代码实现如下:

Note

下面的例子中,我们使用了 TypeScript 去编写 Worker,并且可以直接在 Worker 中使用 import 引用外部库,为了更好的支持 Worker Scope 的类型,所以在顶部我们使用 /// <reference lib="webworker" /> 增加相关的类型支持。

import { io } from 'socket.io-client'
import type { Socket } from 'socket.io-client'

/// <reference lib="webworker" />

let ws: Socket | null = null

function setupIo(config: { url: string }) {
  if (ws) return
  // 使用 socket.io
  console.log('Connecting to io, url: ', config.url)

  ws = io(config.url, {
    timeout: 10000,
    reconnectionDelay: 3000,
    autoConnect: false,
    reconnectionAttempts: 3,
    transports: ['websocket'],
  })
  if (!ws) return

  ws.on('disconnect', () => {
    boardcast({
      type: 'disconnect',
    })
  })

  /**
   * @param {any} payload
   */
  ws.on('message', (payload) => {
    console.log('ws', payload)

    boardcast({
      type: 'message',
      payload,
    })
  })

  ws.on('connect', () => {
    console.log('Connected to ws.io server from SharedWorker')

    if (waitingEmitQueue.length > 0) {
      waitingEmitQueue.forEach((payload) => {
        if (!ws) return
        ws.emit('message', payload)
      })
      waitingEmitQueue.length = 0
    }
    boardcast({
      type: 'connect',
      // @ts-expect-error
      payload: ws.id,
    })
  })

  ws.open()
  boardcast({
    type: 'sid',
    payload: ws.id,
  })
}

const ports = [] as MessagePort[]

self.addEventListener('connect', (ev: any) => {
  const event = ev as MessageEvent

  const port = event.ports[0]

  ports.push(port)

  port.onmessage = (event) => {
    const { type, payload } = event.data
    console.log('get message from main', event.data)

    switch (type) {
      case 'config':
        setupIo(payload)
        break
      case 'emit':
        if (ws) {
          if (ws.connected) ws.emit('message', payload)
          else waitingEmitQueue.push(payload)
        }
        break
      case 'reconnect':
        if (ws) ws.open()
        break
      case 'init':
        port.postMessage({ type: 'ping' })

        if (ws) {
          if (ws.connected) port.postMessage({ type: 'connect' })
          port.postMessage({ type: 'sid', payload: ws.id })
        }
        break
      default:
        console.log('Unknown message type:', type)
    }
  }

  port.start()
})

function boardcast(payload: any) {
  console.log('[ws] boardcast', payload)
  ports.forEach((port) => {
    port.postMessage(payload)
  })
}

const waitingEmitQueue: any[] = []

Worker 写完了,那么现在就要写主线程和 Worker 通信的代码了。

先看看流程:

Mermaid Loading...

具体的代码实现如下:

Important

下面的例子中,我们在 constructor 中异步加载去初始化 SharedWorker 是为了避免在 Server Side 不存在 Worker 的情况下报错。

interface WorkerSocket {
  sid: string
}

class SocketWorker {
  private socket: WorkerSocket | null = null

  worker: SharedWorker | null = null

  constructor() {
    if (isServerSide) return
    // @ts-expect-error
    import('./io.worker').then(({ default: SharedWorker }) => {
      if (isServerSide) return
      const worker = new SharedWorker()

      this.prepare(worker)
      this.worker = worker
    })
  }

  async getSid() {
    return this.socket?.sid
  }

  private setSid(sid: string) {
    this.socket = {
      ...this.socket,
      sid,
    }
  }
  bindMessageHandler = (worker: SharedWorker) => {
    worker.port.onmessage = (event: MessageEvent) => {
      const { data } = event
      const { type, payload } = data

      switch (type) {
        case 'ping': {
          worker?.port.postMessage({
            type: 'pong',
          })
          console.log('[ws worker] pong')
          break
        }
        case 'connect': {
          window.dispatchEvent(new SocketConnectedEvent())
          setSocketIsConnect(true)

          const sid = payload
          this.setSid(sid)
          break
        }
        case 'disconnect': {
          window.dispatchEvent(new SocketDisconnectedEvent())
          setSocketIsConnect(false)
          break
        }
        case 'sid': {
          const sid = payload
          this.setSid(sid)
          break
        }
        case 'message': {
          const typedPayload = payload as string | Record<'type' | 'data', any>
          if (typeof typedPayload !== 'string') {
            return this.handleEvent(
              typedPayload.type,
              camelcaseKeys(typedPayload.data),
            )
          }
          const { data, type } = JSON.parse(typedPayload) as {
            data: any
            type: EventTypes
          }
          this.handleEvent(type, camelcaseKeys(data))
        }
      }
    }
  }

  prepare(worker: SharedWorker) {
    const gatewayUrlWithoutTrailingSlash = GATEWAY_URL.replace(/\/$/, '')
    this.bindMessageHandler(worker)
    worker.port.postMessage({
      type: 'config',

      payload: {
        url: `${gatewayUrlWithoutTrailingSlash}/web`,
      },
    })

    worker.port.start()

    worker.port.postMessage({
      type: 'init',
    })
  }
  handleEvent(type: EventTypes, data: any) {
    // Handle biz event
  }

  emit(event: SocketEmitEnum, payload: any) {
    this.worker?.port.postMessage({
      type: 'emit',
      payload: { type: event, payload },
    })
  }

  reconnect() {
    this.worker?.port.postMessage({
      type: 'reconnect',
    })
  }

  static shared = new SocketWorker()
}

export const socketWorker = SocketWorker.shared
export type TSocketClient = SocketWorker

那么就大功告成了,SocketWorker 基本还是从原有的 SocketClient 中抽象出来的,基本实现了相同的方法,所以在业务中使用没有太大的变化,迁移过程也非常平滑。

对了,这次的重构位于 Shiro/550abd。可供参考。

完成的实现位于:

https://github.com/Innei/Shiro/blob/c399372f7cc1bff55f842ff68342ffb0071b5ae6/src/socket/io.worker.ts

此文由 Mix Space 同步更新至 xLog
原始链接为 https://innei.in/posts/tech/using-sharedworker-singleton-websocket-in-nextjs


Loading...
Ownership of this post data is guaranteed by blockchain and smart contracts to the creator alone.