You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

167 lines
4.3 KiB

4 months ago
import ZmqClient from '@/lib/zmq/zmqClient'
import { WorkerCMD, ZmqCMD, type PublishMsg, type PubMsgData, } from './zmq'
const HEARTBEAT_TOPIC = 'HEARTBEAT'
const HEARTBEAT_INTERVAL = 3000
const STATUS_CHECK_INTERVAL = 1000
let messageTimeout = 10000
4 months ago
let heartClient: ZmqClient | null, subClient: ZmqClient | null, pubClient: ZmqClient | null
let subHost = '', pubHost = ''
let lastHeartbeatTime = 0
let statusTimerId: ReturnType<typeof setInterval> | null
let isConnectionError = false
function decodeMessage(data: Uint8Array) {
return new TextDecoder().decode(data)
}
function updateHeartbeat() {
lastHeartbeatTime = Date.now()
}
function changeConnectionStatus(hasError: boolean) {
if (isConnectionError !== hasError) {
isConnectionError = hasError
console.log('ZMQ连接状态更新:', hasError ? '断开' : '连接')
postMessage({ cmd: ZmqCMD.STATUS, community: hasError })
}
}
function monitorConnection() {
if (statusTimerId) return
lastHeartbeatTime = Date.now()
statusTimerId = setInterval(() => {
const currentTime = Date.now()
const hasError = currentTime - lastHeartbeatTime > HEARTBEAT_INTERVAL
changeConnectionStatus(hasError)
}, STATUS_CHECK_INTERVAL)
}
function stopMonitoringConnection() {
if (statusTimerId) {
clearInterval(statusTimerId)
statusTimerId = null
}
changeConnectionStatus(false)
}
function disconnect() {
if (subClient) {
subClient.close(subHost, handleZmqMessage)
subClient = null
}
if (heartClient) {
heartClient.unsubscribe(HEARTBEAT_TOPIC)
heartClient.close(subHost, updateHeartbeat)
heartClient = null
}
if (pubClient) {
pubClient.close(pubHost)
pubClient = null
}
stopMonitoringConnection()
}
function handleZmqMessage(topic: Uint8Array, msg: Uint8Array) {
try {
if (msg instanceof Uint8Array) {
const jsonMessage = decodeMessage(msg) as string
postMessage({
topic: decodeMessage(topic),
cmd: ZmqCMD.JSON_MSG,
msg: jsonMessage
})
const parsedMessage = JSON.parse(jsonMessage) as PubMsgData
// traceMessages.get(parsedMessage.id)
const curTraceMessages = traceMessages.get(parsedMessage.id)
if (parsedMessage.id && !!curTraceMessages) {
if (curTraceMessages.isAlwaysListen || parsedMessage.result === 'progress') {
4 months ago
// 重置消息超时时间
const val = traceMessages.get(parsedMessage.id)
if (val) {
val.timestamp = Date.now()
}
traceMessages.set(parsedMessage.id, val)
} else {
traceMessages.delete(parsedMessage.id)
}
}
}
} catch (e) {
console.error('handleZmqMessage error:', e)
}
}
function connect(host: string) {
disconnect()
subHost = `ws://${host}:15555`
subClient = new ZmqClient('sub')
subClient.zmqSub(subHost, handleZmqMessage)
heartClient = new ZmqClient('sub')
heartClient.zmqSub(subHost, updateHeartbeat)
heartClient.subscribe(HEARTBEAT_TOPIC)
monitorConnection()
pubHost = `ws://${host}:15556`
pubClient = new ZmqClient('pub')
pubClient.zmqPub(pubHost)
setInterval(() => {
const now = Date.now()
traceMessages.forEach((val, id) => {
if (now - val.timestamp > messageTimeout) {
console.warn(`Message ${id} timed out.`)
postMessage({
cmd: ZmqCMD.TIMEOUT,
topic: val.topic.replace(/^web\//, 'server/'),
msg: id
})
traceMessages.delete(id)
}
})
}, 1000)
}
const traceMessages = new Map<string, any>()
self.onmessage = function (event) {
const { cmd, topic, msg, isTimeout = false, isAlwaysListen } = event.data
4 months ago
switch (cmd) {
case WorkerCMD.START:
connect(msg)
break
case WorkerCMD.SET_TIMEOUT:
messageTimeout = msg
break
case WorkerCMD.SUBSCRIBE:
subClient?.subscribe(topic)
break
case WorkerCMD.UNSUBSCRIBE:
subClient?.unsubscribe(topic)
break
case WorkerCMD.PUBLISH:
if (!msg) throw new Error('msg is required')
if (isTimeout) {
const parseMsg = JSON.parse(msg) as PublishMsg<string>
traceMessages.set(parseMsg.id, {
timestamp: Date.now(),
topic: topic,
isAlwaysListen,
4 months ago
})
}
pubClient?.publishStr(topic, msg)
break
case WorkerCMD.STOP:
disconnect()
break
}
}