Browse Source

fix: ZMQ 多个消息订阅统一个主题时回调函数被覆盖问题

main
betaqi 1 day ago
parent
commit
f886d20924
  1. 48
      src/composables/useZMQJsonWorker.ts
  2. 2
      src/utils/zmqJsonWorker.ts

48
src/composables/useZMQJsonWorker.ts

@ -1,4 +1,3 @@ @@ -1,4 +1,3 @@
import { WorkerCMD, ZmqCMD, } from '@/utils/zmq'
import type {
ManualAction,
PublishMsg,
@ -7,8 +6,8 @@ import type { @@ -7,8 +6,8 @@ import type {
TimeoutMsg,
ZmqMessage
} from '@/utils/zmq'
import { WorkerCMD, ZmqCMD, } from '@/utils/zmq'
import webWorker from '@/utils/zmqJsonWorker?worker'
import dayjs from "dayjs";
const env = import.meta.env
let defaultHost = env.VITE_ZMQ_BASE_URL
@ -16,10 +15,14 @@ if (env.VITE_APP_ENV === 'local') { @@ -16,10 +15,14 @@ if (env.VITE_APP_ENV === 'local') {
defaultHost = window.location.hostname === 'localhost' ? env.VITE_ZMQ_BASE_URL : window.location.hostname
}
const SUBDEFAULTKEY = 'default'
type Handler = (msg: SubMsgData | PubMsgData) => void
class ZMQJsonWorker {
private static instance: ZMQJsonWorker | null = null; // ➤ 单例实例
private worker: Worker;
private scribeHandlers: Map<string, (msg: SubMsgData | PubMsgData) => void> = new Map();
private scribeHandlers: Map<string, Map<string, Handler>> = new Map();
private pubTimeoutHandlers: Map<string, (msg: TimeoutMsg) => void> = new Map();
private readonly host: string;
private statusCallback: ((status: string) => void) | null = null;
@ -49,19 +52,32 @@ class ZMQJsonWorker { @@ -49,19 +52,32 @@ class ZMQJsonWorker {
}
subscribe(topic: string, handler: (msg: any) => void, id?: string) {
this.scribeHandlers.set(`${topic}${id ? `-${id}` : ''}`, handler);
const key = id ?? SUBDEFAULTKEY;
let topicMap = this.scribeHandlers.get(topic);
if (!topicMap) {
topicMap = new Map<string, Handler>();
this.scribeHandlers.set(topic, topicMap);
}
// 添加 handler,不会覆盖其他 id 的 handler
topicMap.set(key, handler);
this.worker.postMessage({ cmd: WorkerCMD.SUBSCRIBE, topic });
}
unsubscribe(topic: string) {
// 遍历所有订阅消息,删除包含该主题的订阅
for (const key in this.scribeHandlers) {
if (key.startsWith(topic)) {
this.scribeHandlers.delete(key);
}
unsubscribe(topic: string, id?: string) {
const topicMap = this.scribeHandlers.get(topic);
if (!topicMap) return;
if (id) {
topicMap.delete(id);
} else {
topicMap.delete(SUBDEFAULTKEY);
}
this.worker.postMessage({ cmd: WorkerCMD.UNSUBSCRIBE, topic });
if (topicMap.size === 0) {
this.scribeHandlers.delete(topic);
this.worker.postMessage({ cmd: WorkerCMD.UNSUBSCRIBE, topic });
}
}
publish<T extends ManualAction>(topic: string, msg: PublishMsg<T>, isTimeout: boolean = false, handler?: (msg: TimeoutMsg) => void, isAlwaysListen: boolean = false) {
@ -90,14 +106,16 @@ class ZMQJsonWorker { @@ -90,14 +106,16 @@ class ZMQJsonWorker {
}
private handleSubscribeMessage(topic: string, json: PubMsgData & SubMsgData) {
const handler = this.scribeHandlers.get(topic);
if (handler) {
const topicMap = this.scribeHandlers.get(topic);
if (!topicMap) return;
topicMap.forEach((handler, id) => {
try {
handler(json);
} catch (error) {
console.error(`主题: ${topic} 处理失败:`, error);
console.error(`主题: ${topic} 的 handler ${id} 执行失败:`, error);
}
}
});
}
private handleTimeoutMessage(timeoutTopic: string, timeoutId: string) {

2
src/utils/zmqJsonWorker.ts

@ -1,5 +1,5 @@ @@ -1,5 +1,5 @@
import ZmqClient from '@/lib/zmq/zmqClient'
import { WorkerCMD, ZmqCMD, type PublishMsg, type PubMsgData, } from './zmq'
import { type PublishMsg, type PubMsgData, WorkerCMD, ZmqCMD, } from './zmq'
const HEARTBEAT_TOPIC = 'HEARTBEAT'

Loading…
Cancel
Save