You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
165 lines
4.1 KiB
165 lines
4.1 KiB
4 months ago
|
import ZmqClient from '@/lib/zmq/zmqClient'
|
||
|
import { WorkerCMD, ZmqCMD, type PublishMsg, type PubMsgData, } from './zmq'
|
||
|
|
||
|
|
||
|
const HEARTBEAT_TOPIC = 'HEARTBEAT'
|
||
|
const HEARTBEAT_INTERVAL = 3000
|
||
|
const STATUS_CHECK_INTERVAL = 1000
|
||
|
let messageTimeout = 10000
|
||
|
|
||
|
let heartClient: ZmqClient | null, subClient: ZmqClient | null, pubClient: ZmqClient | null
|
||
|
let subHost = '', pubHost = ''
|
||
|
let lastHeartbeatTime = 0
|
||
|
let statusTimerId: ReturnType<typeof setInterval> | null
|
||
|
let isConnectionError = false
|
||
|
|
||
|
function decodeMessage(data: Uint8Array) {
|
||
|
return new TextDecoder().decode(data)
|
||
|
}
|
||
|
|
||
|
function updateHeartbeat() {
|
||
|
lastHeartbeatTime = Date.now()
|
||
|
}
|
||
|
|
||
|
function changeConnectionStatus(hasError: boolean) {
|
||
|
if (isConnectionError !== hasError) {
|
||
|
isConnectionError = hasError
|
||
|
console.log('ZMQ连接状态更新:', hasError ? '断开' : '连接')
|
||
|
postMessage({ cmd: ZmqCMD.STATUS, community: hasError })
|
||
|
}
|
||
|
}
|
||
|
|
||
|
function monitorConnection() {
|
||
|
if (statusTimerId) return
|
||
|
|
||
|
lastHeartbeatTime = Date.now()
|
||
|
statusTimerId = setInterval(() => {
|
||
|
const currentTime = Date.now()
|
||
|
const hasError = currentTime - lastHeartbeatTime > HEARTBEAT_INTERVAL
|
||
|
changeConnectionStatus(hasError)
|
||
|
}, STATUS_CHECK_INTERVAL)
|
||
|
}
|
||
|
|
||
|
function stopMonitoringConnection() {
|
||
|
if (statusTimerId) {
|
||
|
clearInterval(statusTimerId)
|
||
|
statusTimerId = null
|
||
|
}
|
||
|
changeConnectionStatus(false)
|
||
|
}
|
||
|
|
||
|
function disconnect() {
|
||
|
if (subClient) {
|
||
|
subClient.close(subHost, handleZmqMessage)
|
||
|
subClient = null
|
||
|
}
|
||
|
|
||
|
if (heartClient) {
|
||
|
heartClient.unsubscribe(HEARTBEAT_TOPIC)
|
||
|
heartClient.close(subHost, updateHeartbeat)
|
||
|
heartClient = null
|
||
|
}
|
||
|
|
||
|
if (pubClient) {
|
||
|
pubClient.close(pubHost)
|
||
|
pubClient = null
|
||
|
}
|
||
|
|
||
|
stopMonitoringConnection()
|
||
|
}
|
||
|
|
||
|
function handleZmqMessage(topic: Uint8Array, msg: Uint8Array) {
|
||
|
try {
|
||
|
if (msg instanceof Uint8Array) {
|
||
|
const jsonMessage = decodeMessage(msg) as string
|
||
|
postMessage({
|
||
|
topic: decodeMessage(topic),
|
||
|
cmd: ZmqCMD.JSON_MSG,
|
||
|
msg: jsonMessage
|
||
|
})
|
||
|
const parsedMessage = JSON.parse(jsonMessage) as PubMsgData
|
||
|
if (parsedMessage.id && traceMessages.has(parsedMessage.id)) {
|
||
|
if (parsedMessage.result === 'progress') {
|
||
|
// 重置消息超时时间
|
||
|
const val = traceMessages.get(parsedMessage.id)
|
||
|
if (val) {
|
||
|
val.timestamp = Date.now()
|
||
|
}
|
||
|
traceMessages.set(parsedMessage.id, val)
|
||
|
} else {
|
||
|
traceMessages.delete(parsedMessage.id)
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
} catch (e) {
|
||
|
console.error('handleZmqMessage error:', e)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
function connect(host: string) {
|
||
|
disconnect()
|
||
|
|
||
|
subHost = `ws://${host}:15555`
|
||
|
subClient = new ZmqClient('sub')
|
||
|
subClient.zmqSub(subHost, handleZmqMessage)
|
||
|
|
||
|
heartClient = new ZmqClient('sub')
|
||
|
heartClient.zmqSub(subHost, updateHeartbeat)
|
||
|
heartClient.subscribe(HEARTBEAT_TOPIC)
|
||
|
|
||
|
monitorConnection()
|
||
|
|
||
|
pubHost = `ws://${host}:15556`
|
||
|
pubClient = new ZmqClient('pub')
|
||
|
pubClient.zmqPub(pubHost)
|
||
|
setInterval(() => {
|
||
|
const now = Date.now()
|
||
|
traceMessages.forEach((val, id) => {
|
||
|
if (now - val.timestamp > messageTimeout) {
|
||
|
console.warn(`Message ${id} timed out.`)
|
||
|
postMessage({
|
||
|
cmd: ZmqCMD.TIMEOUT,
|
||
|
topic: val.topic.replace(/^web\//, 'server/'),
|
||
|
msg: id
|
||
|
})
|
||
|
traceMessages.delete(id)
|
||
|
}
|
||
|
})
|
||
|
}, 1000)
|
||
|
}
|
||
|
|
||
|
const traceMessages = new Map<string, any>()
|
||
|
|
||
|
self.onmessage = function (event) {
|
||
|
const { cmd, topic, msg, isTimeout = false } = event.data
|
||
|
|
||
|
switch (cmd) {
|
||
|
case WorkerCMD.START:
|
||
|
connect(msg)
|
||
|
break
|
||
|
case WorkerCMD.SET_TIMEOUT:
|
||
|
messageTimeout = msg
|
||
|
break
|
||
|
case WorkerCMD.SUBSCRIBE:
|
||
|
subClient?.subscribe(topic)
|
||
|
break
|
||
|
case WorkerCMD.UNSUBSCRIBE:
|
||
|
subClient?.unsubscribe(topic)
|
||
|
break
|
||
|
case WorkerCMD.PUBLISH:
|
||
|
if (!msg) throw new Error('msg is required')
|
||
|
if (isTimeout) {
|
||
|
const parseMsg = JSON.parse(msg) as PublishMsg<string>
|
||
|
traceMessages.set(parseMsg.id, {
|
||
|
timestamp: Date.now(),
|
||
|
topic: topic
|
||
|
})
|
||
|
}
|
||
|
pubClient?.publishStr(topic, msg)
|
||
|
break
|
||
|
case WorkerCMD.STOP:
|
||
|
disconnect()
|
||
|
break
|
||
|
}
|
||
|
}
|