memowake-front/lib/websocket-util.ts
Junhui Chen 665ccc6132
All checks were successful
Dev Deploy / Explore-Gitea-Actions (push) Successful in 31s
enhance: ws util健壮性优化
2025-08-09 15:17:22 +08:00

269 lines
9.1 KiB
TypeScript
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import Constants from 'expo-constants';
import * as SecureStore from 'expo-secure-store';
import { TFunction } from 'i18next';
import { Platform } from 'react-native';
// 从环境变量或默认值中定义 WebSocket 端点
export const WEBSOCKET_ENDPOINT = process.env.EXPO_PUBLIC_WEBSOCKET_ENDPOINT || Constants.expoConfig?.extra?.WEBSOCKET_ENDPOINT;
export type WebSocketStatus = 'connecting' | 'connected' | 'disconnected' | 'reconnecting';
type StatusListener = (status: WebSocketStatus) => void;
// 消息监听器类型
type MessageListener = (data: any) => void;
// 根据后端 Rust 定义的 WsMessage 枚举创建 TypeScript 类型
export type WsMessage =
| { type: 'Chat', session_id: string, message: string, image_material_ids?: string[], video_material_ids?: string[] }
| { type: 'ChatResponse', session_id: string, message: any, message_id?: string }
| { type: 'ChatStream', session_id: string, chunk: string }
| { type: 'ChatStreamEnd', session_id: string, message: any }
| { type: 'Error', code: string, message: string }
| { type: 'Ping' }
| { type: 'Pong' }
| { type: 'Connected', user_id: number };
class WebSocketManager {
private ws: WebSocket | null = null;
private status: WebSocketStatus = 'disconnected';
private messageListeners: Map<string, Set<(message: WsMessage) => void>> = new Map();
private statusListeners: Set<StatusListener> = new Set();
private reconnectAttempts = 0;
private readonly maxReconnectAttempts = 1;
private readonly reconnectInterval = 1000; // 初始重连间隔为1秒
private pingIntervalId: ReturnType<typeof setInterval> | null = null;
private readonly pingInterval = 30000; // 30秒发送一次心跳
private isConnecting = false; // 防止并发重复连接
private connectTimeoutId: ReturnType<typeof setTimeout> | null = null; // 连接超时
constructor() {
// 这是一个单例类,连接通过调用 connect() 方法来启动
}
/**
* 获取当前 WebSocket 连接状态。
*/
public getStatus(): WebSocketStatus {
return this.status;
}
/**
* 启动 WebSocket 连接。
* 会自动获取并使用存储的认证 token。
*/
public async connect() {
// 已连或正在连,直接返回(基于状态的幂等)
if (this.status === 'connected' || this.status === 'connecting' || this.isConnecting) {
return;
}
this.isConnecting = true;
this.setStatus('connecting');
let token = "";
try {
if (Platform.OS === 'web') {
token = localStorage.getItem('token') || "";
} else {
token = await SecureStore.getItemAsync('token') || "";
}
} catch (e) {
console.error('WebSocket: 获取 token 失败:', e);
}
if (!token) {
console.error('WebSocket: 未找到认证 token无法连接。');
this.isConnecting = false;
this.setStatus('disconnected');
return;
}
const url = `${WEBSOCKET_ENDPOINT}?token=${encodeURIComponent(token)}`;
console.log('WebSocket: 开始连接到服务器');
const ws = new WebSocket(url);
this.ws = ws;
// 连接超时15s
if (this.connectTimeoutId) {
clearTimeout(this.connectTimeoutId);
this.connectTimeoutId = null;
}
this.connectTimeoutId = setTimeout(() => {
if (this.ws === ws && ws.readyState !== WebSocket.OPEN) {
try { ws.close(); } catch { /* noop */ }
}
}, 15000);
ws.onopen = () => {
if (this.connectTimeoutId) {
clearTimeout(this.connectTimeoutId);
this.connectTimeoutId = null;
}
console.log('WebSocket connected');
this.setStatus('connected');
this.reconnectAttempts = 0; // 重置重连尝试次数
this.isConnecting = false;
this.startPing();
};
ws.onmessage = (event) => {
try {
const message: WsMessage = JSON.parse(event.data);
const eventListeners = this.messageListeners.get(message.type);
if (eventListeners) {
eventListeners.forEach(callback => callback(message));
}
} catch (error) {
console.error('处理 WebSocket 消息失败:', error);
}
};
ws.onerror = (error) => {
if (this.connectTimeoutId) {
clearTimeout(this.connectTimeoutId);
this.connectTimeoutId = null;
}
console.error('WebSocket 发生错误:', error);
this.stopPing();
this.isConnecting = false;
};
ws.onclose = () => {
if (this.connectTimeoutId) {
clearTimeout(this.connectTimeoutId);
this.connectTimeoutId = null;
}
console.log('WebSocket disconnected');
this.ws = null;
this.stopPing();
this.isConnecting = false;
// 只有在不是手动断开连接时才重连
if (this.status !== 'disconnected') {
this.setStatus('reconnecting');
this.handleReconnect();
}
};
}
/**
* 处理自动重连逻辑,使用指数退避策略。
*/
private handleReconnect() {
if (this.reconnectAttempts < this.maxReconnectAttempts) {
this.reconnectAttempts++;
const delay = this.reconnectInterval * Math.pow(2, this.reconnectAttempts - 1);
console.log(`${delay / 1000}秒后尝试重新连接 (第 ${this.reconnectAttempts} 次)...`);
setTimeout(() => {
this.connect();
}, delay);
} else {
console.error('WebSocket 重连失败,已达到最大尝试次数。');
this.setStatus('disconnected');
}
}
/**
* 发送消息到 WebSocket 服务器。
* @param message 要发送的消息对象,必须包含 type 字段。
*/
public send(message: WsMessage) {
if (!this.ws || this.ws.readyState !== WebSocket.OPEN) {
console.error('WebSocket 未连接或未就绪,无法发送消息。');
return;
}
this.ws.send(JSON.stringify(message));
}
/**
* 订阅指定消息类型的消息。
* @param type 消息类型,例如 'ChatResponse'。
* @param callback 收到消息时的回调函数。
*/
public subscribe(type: WsMessage['type'], callback: (message: WsMessage) => void) {
if (!this.messageListeners.has(type)) {
this.messageListeners.set(type, new Set());
}
this.messageListeners.get(type)?.add(callback);
}
/**
* 取消订阅指定消息类型的消息。
* @param type 消息类型。
* @param callback 要移除的回调函数。
*/
public unsubscribe(type: WsMessage['type'], callback: (message: WsMessage) => void) {
const eventListeners = this.messageListeners.get(type);
if (eventListeners) {
eventListeners.delete(callback);
if (eventListeners.size === 0) {
this.messageListeners.delete(type);
}
}
}
/**
* 手动断开 WebSocket 连接。
*/
public disconnect() {
this.setStatus('disconnected');
if (this.ws) {
this.ws.close();
}
this.stopPing();
}
private setStatus(status: WebSocketStatus) {
if (this.status !== status) {
this.status = status;
this.statusListeners.forEach(listener => listener(status));
}
}
public subscribeStatus(listener: StatusListener) {
this.statusListeners.add(listener);
// Immediately invoke with current status
listener(this.status);
}
public unsubscribeStatus(listener: StatusListener) {
this.statusListeners.delete(listener);
}
/**
* 启动心跳机制。
*/
private startPing() {
this.stopPing(); // 先停止任何可能正在运行的计时器
this.pingIntervalId = setInterval(() => {
this.send({ type: 'Ping' });
}, this.pingInterval);
}
/**
* 停止心跳机制。
*/
private stopPing() {
if (this.pingIntervalId) {
clearInterval(this.pingIntervalId);
this.pingIntervalId = null;
}
}
}
// 导出一个单例,确保整个应用共享同一个 WebSocket 连接
export const webSocketManager = new WebSocketManager();
// webscoket 错误映射
export const getWebSocketErrorMessage = (key: string, t: TFunction) => {
const messages = {
'INSUFFICIENT_POINTS': t('ask:ask.insufficientPoints'),
'INVALID_TOKEN': t('ask:ask.invalidToken'),
'NOT_FOUND': t('ask:ask.notFound'),
'PERMISSION_DENIED': t('ask:ask.permissionDenied'),
'NOT_CONNECTED': t('ask:ask.notConnected'),
};
return messages[key as keyof typeof messages] || t('ask:ask.unknownError');
};