import ZmqClient from '@/lib/zmq/zmqClient' import { WorkerCMD, ZmqCMD, type PublishMsg, type PubMsgData, } from './zmq' const HEARTBEAT_TOPIC = 'HEARTBEAT' const HEARTBEAT_INTERVAL = 3000 const STATUS_CHECK_INTERVAL = 1000 let messageTimeout = 20000 let heartClient: ZmqClient | null, subClient: ZmqClient | null, pubClient: ZmqClient | null let subHost = '', pubHost = '' let lastHeartbeatTime = 0 let statusTimerId: ReturnType | null let isConnectionError = false function decodeMessage(data: Uint8Array) { return new TextDecoder().decode(data) } function updateHeartbeat() { lastHeartbeatTime = Date.now() } function changeConnectionStatus(hasError: boolean) { if (isConnectionError !== hasError) { isConnectionError = hasError console.log('ZMQ连接状态更新:', hasError ? '断开' : '连接') postMessage({ cmd: ZmqCMD.STATUS, community: hasError }) } } function monitorConnection() { if (statusTimerId) return lastHeartbeatTime = Date.now() statusTimerId = setInterval(() => { const currentTime = Date.now() const hasError = currentTime - lastHeartbeatTime > HEARTBEAT_INTERVAL changeConnectionStatus(hasError) }, STATUS_CHECK_INTERVAL) } function stopMonitoringConnection() { if (statusTimerId) { clearInterval(statusTimerId) statusTimerId = null } changeConnectionStatus(false) } function disconnect() { if (subClient) { subClient.close(subHost, handleZmqMessage) subClient = null } if (heartClient) { heartClient.unsubscribe(HEARTBEAT_TOPIC) heartClient.close(subHost, updateHeartbeat) heartClient = null } if (pubClient) { pubClient.close(pubHost) pubClient = null } stopMonitoringConnection() } function handleZmqMessage(topic: Uint8Array, msg: Uint8Array) { try { if (msg instanceof Uint8Array) { const jsonMessage = decodeMessage(msg) as string postMessage({ topic: decodeMessage(topic), cmd: ZmqCMD.JSON_MSG, msg: jsonMessage }) const parsedMessage = JSON.parse(jsonMessage) as PubMsgData // traceMessages.get(parsedMessage.id) const curTraceMessages = traceMessages.get(parsedMessage.id) if (parsedMessage.id && !!curTraceMessages) { if (curTraceMessages.isAlwaysListen || parsedMessage.result === 'progress') { // 重置消息超时时间 const val = traceMessages.get(parsedMessage.id) if (val) { val.timestamp = Date.now() } traceMessages.set(parsedMessage.id, val) } else { traceMessages.delete(parsedMessage.id) } } } } catch (e) { console.error('handleZmqMessage error:', e) } } function connect(host: string) { disconnect() subHost = `ws://${host}:15555` subClient = new ZmqClient('sub') subClient.zmqSub(subHost, handleZmqMessage) heartClient = new ZmqClient('sub') heartClient.zmqSub(subHost, updateHeartbeat) heartClient.subscribe(HEARTBEAT_TOPIC) monitorConnection() pubHost = `ws://${host}:15556` pubClient = new ZmqClient('pub') pubClient.zmqPub(pubHost) setInterval(() => { const now = Date.now() traceMessages.forEach((val, id) => { if (now - val.timestamp > messageTimeout) { console.warn(`Message ${id} timed out.`) postMessage({ cmd: ZmqCMD.TIMEOUT, topic: val.topic.replace(/^web\//, 'server/'), msg: id }) traceMessages.delete(id) } }) }, 1000) } const traceMessages = new Map() self.onmessage = function (event) { const { cmd, topic, msg, isTimeout = false, isAlwaysListen } = event.data switch (cmd) { case WorkerCMD.START: connect(msg) break case WorkerCMD.SET_TIMEOUT: messageTimeout = msg break case WorkerCMD.SUBSCRIBE: subClient?.subscribe(topic) break case WorkerCMD.UNSUBSCRIBE: subClient?.unsubscribe(topic) break case WorkerCMD.PUBLISH: if (!msg) throw new Error('msg is required') if (isTimeout) { const parseMsg = JSON.parse(msg) as PublishMsg traceMessages.set(parseMsg.id, { timestamp: Date.now(), topic: topic, isAlwaysListen, }) } pubClient?.publishStr(topic, msg) break case WorkerCMD.STOP: disconnect() break } }