From f886d209248ac7597ed231eba04a8b704a75aac8 Mon Sep 17 00:00:00 2001 From: betaqi <3188864257@qq.com> Date: Wed, 5 Nov 2025 13:45:01 +0800 Subject: [PATCH] =?UTF-8?q?fix:=20ZMQ=20=E5=A4=9A=E4=B8=AA=E6=B6=88?= =?UTF-8?q?=E6=81=AF=E8=AE=A2=E9=98=85=E7=BB=9F=E4=B8=80=E4=B8=AA=E4=B8=BB?= =?UTF-8?q?=E9=A2=98=E6=97=B6=E5=9B=9E=E8=B0=83=E5=87=BD=E6=95=B0=E8=A2=AB?= =?UTF-8?q?=E8=A6=86=E7=9B=96=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/composables/useZMQJsonWorker.ts | 48 ++++++++++++++++++++--------- src/utils/zmqJsonWorker.ts | 2 +- 2 files changed, 34 insertions(+), 16 deletions(-) diff --git a/src/composables/useZMQJsonWorker.ts b/src/composables/useZMQJsonWorker.ts index 7aeb5b1..945d4ff 100644 --- a/src/composables/useZMQJsonWorker.ts +++ b/src/composables/useZMQJsonWorker.ts @@ -1,4 +1,3 @@ -import { WorkerCMD, ZmqCMD, } from '@/utils/zmq' import type { ManualAction, PublishMsg, @@ -7,8 +6,8 @@ import type { TimeoutMsg, ZmqMessage } from '@/utils/zmq' +import { WorkerCMD, ZmqCMD, } from '@/utils/zmq' import webWorker from '@/utils/zmqJsonWorker?worker' -import dayjs from "dayjs"; const env = import.meta.env let defaultHost = env.VITE_ZMQ_BASE_URL @@ -16,10 +15,14 @@ if (env.VITE_APP_ENV === 'local') { defaultHost = window.location.hostname === 'localhost' ? env.VITE_ZMQ_BASE_URL : window.location.hostname } +const SUBDEFAULTKEY = 'default' + +type Handler = (msg: SubMsgData | PubMsgData) => void + class ZMQJsonWorker { private static instance: ZMQJsonWorker | null = null; // ➤ 单例实例 private worker: Worker; - private scribeHandlers: Map void> = new Map(); + private scribeHandlers: Map> = new Map(); private pubTimeoutHandlers: Map void> = new Map(); private readonly host: string; private statusCallback: ((status: string) => void) | null = null; @@ -49,19 +52,32 @@ class ZMQJsonWorker { } subscribe(topic: string, handler: (msg: any) => void, id?: string) { - this.scribeHandlers.set(`${topic}${id ? `-${id}` : ''}`, handler); + const key = id ?? SUBDEFAULTKEY; + let topicMap = this.scribeHandlers.get(topic); + if (!topicMap) { + topicMap = new Map(); + this.scribeHandlers.set(topic, topicMap); + } + + // 添加 handler,不会覆盖其他 id 的 handler + topicMap.set(key, handler); this.worker.postMessage({ cmd: WorkerCMD.SUBSCRIBE, topic }); } - unsubscribe(topic: string) { - // 遍历所有订阅消息,删除包含该主题的订阅 - for (const key in this.scribeHandlers) { - if (key.startsWith(topic)) { - this.scribeHandlers.delete(key); - } + unsubscribe(topic: string, id?: string) { + const topicMap = this.scribeHandlers.get(topic); + if (!topicMap) return; + + if (id) { + topicMap.delete(id); + } else { + topicMap.delete(SUBDEFAULTKEY); } - this.worker.postMessage({ cmd: WorkerCMD.UNSUBSCRIBE, topic }); + if (topicMap.size === 0) { + this.scribeHandlers.delete(topic); + this.worker.postMessage({ cmd: WorkerCMD.UNSUBSCRIBE, topic }); + } } publish(topic: string, msg: PublishMsg, isTimeout: boolean = false, handler?: (msg: TimeoutMsg) => void, isAlwaysListen: boolean = false) { @@ -90,14 +106,16 @@ class ZMQJsonWorker { } private handleSubscribeMessage(topic: string, json: PubMsgData & SubMsgData) { - const handler = this.scribeHandlers.get(topic); - if (handler) { + const topicMap = this.scribeHandlers.get(topic); + if (!topicMap) return; + + topicMap.forEach((handler, id) => { try { handler(json); } catch (error) { - console.error(`主题: ${topic} 处理失败:`, error); + console.error(`主题: ${topic} 的 handler ${id} 执行失败:`, error); } - } + }); } private handleTimeoutMessage(timeoutTopic: string, timeoutId: string) { diff --git a/src/utils/zmqJsonWorker.ts b/src/utils/zmqJsonWorker.ts index 0d63cd0..022b3b0 100644 --- a/src/utils/zmqJsonWorker.ts +++ b/src/utils/zmqJsonWorker.ts @@ -1,5 +1,5 @@ import ZmqClient from '@/lib/zmq/zmqClient' -import { WorkerCMD, ZmqCMD, type PublishMsg, type PubMsgData, } from './zmq' +import { type PublishMsg, type PubMsgData, WorkerCMD, ZmqCMD, } from './zmq' const HEARTBEAT_TOPIC = 'HEARTBEAT'