This commit is contained in:
Junhui Chen 2025-08-07 01:16:18 +08:00
parent 918f4da40e
commit 3047215ada
3 changed files with 301 additions and 143 deletions

View File

@ -6,7 +6,7 @@ import { ThemedText } from "@/components/ThemedText";
import { fetchApi } from "@/lib/server-api-util";
import { getWebSocketErrorMessage, getWebSocketManager, WsMessage } from "@/lib/websocket-util";
import { Assistant, Message } from "@/types/ask";
import { useWebSocketSubscription } from "@/hooks/useWebSocketSubscription";
import { useWebSocketStreamHandler } from "@/hooks/useWebSocketStreamHandler";
import { useFocusEffect, useLocalSearchParams, useRouter } from "expo-router";
import { useCallback, useEffect, useRef, useState } from 'react';
import { useTranslation } from "react-i18next";
@ -134,23 +134,26 @@ export default function AskScreen() {
};
}, [isHello, scrollToEnd]);
// 使用新的WebSocket流处理hook使用实时模式
const { subscribeToWebSocket } = useWebSocketStreamHandler({
setUserMessages,
isMounted: isMountedRef.current,
enableBatching: false // AskScreen使用实时模式
});
useFocusEffect(
useCallback(() => {
isMountedRef.current = true;
// 使用新的WebSocket订阅hook
const { subscribeToWebSocket } = useWebSocketSubscription(setUserMessages, isMountedRef.current);
// 订阅WebSocket消息
const unsubscribe = subscribeToWebSocket();
return () => {
// 取消订阅
// 取消订阅和执行清理
unsubscribe();
// 执行清理
cleanup();
};
}, [t, cleanup])
}, [subscribeToWebSocket, cleanup])
);
// 创建动画样式

View File

@ -12,7 +12,8 @@ import {
View
} from 'react-native';
import { getWebSocketManager, WsMessage } from '@/lib/websocket-util';
import { useWebSocketStreamHandler } from '@/hooks/useWebSocketStreamHandler';
import { getWebSocketManager } from '@/lib/websocket-util';
import { Message } from '@/types/ask';
import { useTranslation } from 'react-i18next';
import { ThemedText } from '../ThemedText';
@ -39,147 +40,28 @@ export default function SendMessage(props: Props) {
// 添加组件挂载状态跟踪
const isMountedRef = useRef(true);
const isKeyboardVisible = useRef(false);
const chunkQueue = useRef<string[]>([]);
const renderInterval = useRef<ReturnType<typeof setInterval> | null>(null);
// 清理函数
const cleanup = useCallback(() => {
isMountedRef.current = false;
// 清理定时器
if (renderInterval.current) {
clearInterval(renderInterval.current);
renderInterval.current = null;
}
// 清理队列
chunkQueue.current = [];
}, []);
// 使用新的WebSocket流处理hook启用批量处理模式
const { subscribeToWebSocket, cleanup } = useWebSocketStreamHandler({
setUserMessages,
isMounted: isMountedRef.current,
enableBatching: true,
renderInterval: RENDER_INTERVAL
});
// 使用WebSocket订阅
useEffect(() => {
const handleChatStream = (message: WsMessage) => {
if (!isMountedRef.current || message.type !== 'ChatStream' || !message.chunk) return;
chunkQueue.current.push(message.chunk);
if (!renderInterval.current) {
renderInterval.current = setInterval(() => {
if (!isMountedRef.current) {
if (renderInterval.current) {
clearInterval(renderInterval.current);
renderInterval.current = null;
}
return;
}
if (chunkQueue.current.length > 0) {
const textToRender = chunkQueue.current.join('');
chunkQueue.current = [];
setUserMessages(prevMessages => {
try {
if (prevMessages.length === 0) return prevMessages;
const lastMessage = prevMessages[prevMessages.length - 1];
if (lastMessage.role !== 'assistant') return prevMessages;
const updatedContent = (lastMessage.content === 'keepSearchIng' ? '' : lastMessage.content) + textToRender;
const updatedLastMessage = { ...lastMessage, content: updatedContent };
return [...prevMessages.slice(0, -1), updatedLastMessage];
} catch (error) {
console.error('处理流式消息时出错:', error);
return prevMessages;
}
});
} else {
if (renderInterval.current) {
clearInterval(renderInterval.current);
renderInterval.current = null;
}
}
}, RENDER_INTERVAL);
}
};
const handleChatStreamEnd = (message: WsMessage) => {
if (!isMountedRef.current || message.type !== 'ChatStreamEnd') return;
// Stop the timer and process any remaining chunks
if (renderInterval.current) {
clearInterval(renderInterval.current);
renderInterval.current = null;
}
const remainingText = chunkQueue.current.join('');
chunkQueue.current = [];
setUserMessages(prevMessages => {
try {
if (prevMessages.length === 0) return prevMessages;
const lastMessage = prevMessages[prevMessages.length - 1];
if (lastMessage.role !== 'assistant') return prevMessages;
// Apply remaining chunks from the queue
const contentWithQueue = (lastMessage.content === 'keepSearchIng' ? '' : lastMessage.content) + remainingText;
// Create the final updated message object
const updatedLastMessage = {
...lastMessage,
// Use the final message from ChatStreamEnd if available, otherwise use the content with queued text
content: message.message ? message.message.content : contentWithQueue,
timestamp: message.message ? message.message.timestamp : lastMessage.timestamp,
};
return [...prevMessages.slice(0, -1), updatedLastMessage];
} catch (error) {
console.error('处理ChatStreamEnd消息时出错:', error);
return prevMessages;
}
});
};
const handleChatResponse = (message: WsMessage) => {
if (!isMountedRef.current || message.type !== 'ChatResponse') return;
if (message.message) {
setUserMessages(prevMessages => {
try {
const updatedMessages = [...prevMessages];
updatedMessages[updatedMessages.length - 1] = message.message as Message;
return updatedMessages;
} catch (error) {
console.error('处理聊天响应时出错:', error);
return prevMessages;
}
});
}
};
const typedHandleChatStream = handleChatStream as (message: WsMessage) => void;
const typedHandleChatStreamEnd = handleChatStreamEnd as (message: WsMessage) => void;
const typedHandleChatResponse = handleChatResponse as (message: WsMessage) => void;
const webSocketManager = getWebSocketManager();
webSocketManager.subscribe('ChatStream', typedHandleChatStream);
webSocketManager.subscribe('ChatStreamEnd', typedHandleChatStreamEnd);
webSocketManager.subscribe('ChatResponse', typedHandleChatResponse);
const unsubscribe = subscribeToWebSocket();
return () => {
webSocketManager.unsubscribe('ChatStream', handleChatStream);
webSocketManager.unsubscribe('ChatStreamEnd', handleChatStreamEnd);
webSocketManager.unsubscribe('ChatResponse', handleChatResponse);
// 执行清理
cleanup();
unsubscribe();
};
}, [setUserMessages, cleanup]);
}, [subscribeToWebSocket]);
// 组件卸载时的清理
useEffect(() => {
return () => {
isMountedRef.current = false;
cleanup();
};
}, [cleanup]);
@ -270,7 +152,7 @@ export default function SendMessage(props: Props) {
setUserMessages(prev => prev.filter(item => item.content !== 'keepSearchIng'));
}
}
// 将输入框清空
if (isMountedRef.current) {
setInputValue('');
@ -283,7 +165,7 @@ export default function SendMessage(props: Props) {
const handleQuitly = useCallback((type: string) => {
if (!isMountedRef.current) return;
setIsHello(false);
setUserMessages(pre => ([
...pre,

View File

@ -0,0 +1,273 @@
import { useCallback, useEffect, useRef } from 'react';
import { useTranslation } from 'react-i18next';
import { getWebSocketManager, WsMessage, getWebSocketErrorMessage } from '@/lib/websocket-util';
import { Message, Assistant } from '@/types/ask';
import { Dispatch, SetStateAction } from 'react';
interface UseWebSocketStreamHandlerOptions {
setUserMessages: Dispatch<SetStateAction<Message[]>>;
isMounted: boolean;
enableBatching?: boolean; // 是否启用批量处理
renderInterval?: number; // 渲染间隔默认50ms
}
export const useWebSocketStreamHandler = ({
setUserMessages,
isMounted,
enableBatching = false,
renderInterval = 50
}: UseWebSocketStreamHandlerOptions) => {
const { t } = useTranslation();
const isMountedRef = useRef(isMounted);
// 批量处理相关的refs
const chunkQueue = useRef<string[]>([]);
const renderIntervalRef = useRef<ReturnType<typeof setInterval> | null>(null);
// 更新挂载状态
useEffect(() => {
isMountedRef.current = isMounted;
}, [isMounted]);
// 清理函数
const cleanup = useCallback(() => {
if (renderIntervalRef.current) {
clearInterval(renderIntervalRef.current);
renderIntervalRef.current = null;
}
chunkQueue.current = [];
}, []);
// 批量处理流式消息的函数
const processBatchedChunks = useCallback(() => {
if (!isMountedRef.current) {
cleanup();
return;
}
if (chunkQueue.current.length > 0) {
const textToRender = chunkQueue.current.join('');
chunkQueue.current = [];
setUserMessages(prevMessages => {
try {
if (prevMessages.length === 0) return prevMessages;
const lastMessage = prevMessages[prevMessages.length - 1];
if (lastMessage.role !== Assistant) return prevMessages;
const updatedContent = (lastMessage.content === 'keepSearchIng' ? '' : lastMessage.content as string) + textToRender;
const updatedLastMessage = { ...lastMessage, content: updatedContent };
return [...prevMessages.slice(0, -1), updatedLastMessage];
} catch (error) {
console.error('处理批量流式消息时出错:', error);
return prevMessages;
}
});
} else {
cleanup();
}
}, [setUserMessages, cleanup]);
const handleChatStream = useCallback((message: WsMessage) => {
if (!isMountedRef.current || message.type !== 'ChatStream' || !message.chunk) return;
if (enableBatching) {
// 批量处理模式
chunkQueue.current.push(message.chunk);
if (!renderIntervalRef.current) {
renderIntervalRef.current = setInterval(processBatchedChunks, renderInterval);
}
} else {
// 实时处理模式(原有逻辑)
setUserMessages(prevMessages => {
try {
const lastMessage = prevMessages[prevMessages.length - 1];
if (!lastMessage || lastMessage.role !== Assistant) {
return prevMessages;
}
const newMessages = [...prevMessages];
if (typeof lastMessage.content === 'string') {
if (lastMessage.content === 'keepSearchIng') {
newMessages[newMessages.length - 1] = {
...lastMessage,
content: message.chunk
};
} else {
newMessages[newMessages.length - 1] = {
...lastMessage,
content: lastMessage.content + message.chunk
};
}
} else if (Array.isArray(lastMessage.content)) {
const textPartIndex = lastMessage.content.findIndex(p => p.type === 'text');
if (textPartIndex !== -1) {
const updatedContent = [...lastMessage.content];
updatedContent[textPartIndex] = {
...updatedContent[textPartIndex],
text: (updatedContent[textPartIndex].text || '') + message.chunk
};
newMessages[newMessages.length - 1] = {
...lastMessage,
content: updatedContent
};
}
}
return newMessages;
} catch (error) {
console.error('处理 ChatStream 消息时出错:', error);
return prevMessages;
}
});
}
}, [setUserMessages, enableBatching, processBatchedChunks, renderInterval]);
const handleChatStreamEnd = useCallback((message: WsMessage) => {
if (!isMountedRef.current || message.type !== 'ChatStreamEnd') return;
// 如果是批量模式先处理剩余的chunks
if (enableBatching) {
cleanup();
const remainingText = chunkQueue.current.join('');
chunkQueue.current = [];
setUserMessages(prevMessages => {
try {
if (prevMessages.length === 0) return prevMessages;
const lastMessage = prevMessages[prevMessages.length - 1];
if (lastMessage.role !== Assistant) return prevMessages;
const contentWithQueue = (lastMessage.content === 'keepSearchIng' ? '' : lastMessage.content as string) + remainingText;
const updatedLastMessage = {
...lastMessage,
content: message.message ? message.message.content : contentWithQueue,
timestamp: message.message ? message.message.timestamp : lastMessage.timestamp,
};
return [...prevMessages.slice(0, -1), updatedLastMessage];
} catch (error) {
console.error('处理ChatStreamEnd消息时出错:', error);
return prevMessages;
}
});
} else {
// 实时模式的处理逻辑
setUserMessages(prevMessages => {
try {
const lastMessage = prevMessages[prevMessages.length - 1];
if (!lastMessage || lastMessage.role !== Assistant) {
return prevMessages;
}
if (message.message) {
const newMessages = [...prevMessages];
newMessages[newMessages.length - 1] = message.message as Message;
return newMessages;
} else {
return prevMessages.filter(m =>
!(typeof m.content === 'string' && m.content === 'keepSearchIng')
);
}
} catch (error) {
console.error('处理 ChatStreamEnd 消息时出错:', error);
return prevMessages;
}
});
}
}, [setUserMessages, enableBatching, cleanup]);
const handleChatResponse = useCallback((message: WsMessage) => {
if (!isMountedRef.current || message.type !== 'ChatResponse') return;
if (message.message) {
setUserMessages(prevMessages => {
try {
const updatedMessages = [...prevMessages];
updatedMessages[updatedMessages.length - 1] = message.message as Message;
return updatedMessages;
} catch (error) {
console.error('处理聊天响应时出错:', error);
return prevMessages;
}
});
}
}, [setUserMessages]);
const handleError = useCallback((message: WsMessage) => {
if (!isMountedRef.current || message.type !== 'Error') return;
console.log(`WebSocket Error: ${message.code} - ${message.message}`);
setUserMessages(prevMessages => {
try {
const lastMessage = prevMessages[prevMessages.length - 1];
if (!lastMessage ||
lastMessage.role !== Assistant ||
typeof lastMessage.content !== 'string' ||
lastMessage.content !== 'keepSearchIng') {
return prevMessages;
}
const newMessages = [...prevMessages];
newMessages[newMessages.length - 1] = {
...lastMessage,
content: getWebSocketErrorMessage(message.code, t)
};
return newMessages;
} catch (error) {
console.error('处理 Error 消息时出错:', error);
return prevMessages;
}
});
}, [setUserMessages, t]);
const subscribeToWebSocket = useCallback(() => {
const webSocketManager = getWebSocketManager();
webSocketManager.connect();
webSocketManager.subscribe('ChatStream', handleChatStream);
webSocketManager.subscribe('ChatStreamEnd', handleChatStreamEnd);
webSocketManager.subscribe('ChatResponse', handleChatResponse);
webSocketManager.subscribe('Error', handleError);
return () => {
// 清理订阅
webSocketManager.unsubscribe('ChatStream', handleChatStream);
webSocketManager.unsubscribe('ChatStreamEnd', handleChatStreamEnd);
webSocketManager.unsubscribe('ChatResponse', handleChatResponse);
webSocketManager.unsubscribe('Error', handleError);
// 清理批量处理资源
cleanup();
};
}, [handleChatStream, handleChatStreamEnd, handleChatResponse, handleError, cleanup]);
// 组件卸载时的清理
useEffect(() => {
return () => {
cleanup();
};
}, [cleanup]);
return {
subscribeToWebSocket,
handleChatStream,
handleChatStreamEnd,
handleChatResponse,
handleError,
cleanup
};
};