diff --git a/server/utils/AiProviders/genericOpenAi/index.js b/server/utils/AiProviders/genericOpenAi/index.js index eb020298c424850e1c020c29f6d5dd8c8d9779d0..163780b9e1191cd764390dda03fb60dd76001256 100644 --- a/server/utils/AiProviders/genericOpenAi/index.js +++ b/server/utils/AiProviders/genericOpenAi/index.js @@ -3,8 +3,9 @@ const { LLMPerformanceMonitor, } = require("../../helpers/chat/LLMPerformanceMonitor"); const { - handleDefaultStreamResponseV2, formatChatHistory, + writeResponseChunk, + clientAbortedHandler, } = require("../../helpers/chat/responses"); const { toValidNumber } = require("../../http"); @@ -142,6 +143,21 @@ class GenericOpenAiLLM { ]; } + /** + * Parses and prepends reasoning from the response and returns the full text response. + * @param {Object} response + * @returns {string} + */ + #parseReasoningFromResponse({ message }) { + let textResponse = message?.content; + if ( + !!message?.reasoning_content && + message.reasoning_content.trim().length > 0 + ) + textResponse = `<think>${message.reasoning_content}</think>${textResponse}`; + return textResponse; + } + async getChatCompletion(messages = null, { temperature = 0.7 }) { const result = await LLMPerformanceMonitor.measureAsyncFunction( this.openai.chat.completions @@ -163,7 +179,7 @@ class GenericOpenAiLLM { return null; return { - textResponse: result.output.choices[0].message.content, + textResponse: this.#parseReasoningFromResponse(result.output.choices[0]), metrics: { prompt_tokens: result.output?.usage?.prompt_tokens || 0, completion_tokens: result.output?.usage?.completion_tokens || 0, @@ -191,8 +207,141 @@ class GenericOpenAiLLM { return measuredStreamRequest; } + // TODO: This is a copy of the generic handleStream function in responses.js + // to specifically handle the DeepSeek reasoning model `reasoning_content` field. + // When or if ever possible, we should refactor this to be in the generic function. handleStream(response, stream, responseProps) { - return handleDefaultStreamResponseV2(response, stream, responseProps); + const { uuid = uuidv4(), sources = [] } = responseProps; + let hasUsageMetrics = false; + let usage = { + completion_tokens: 0, + }; + + return new Promise(async (resolve) => { + let fullText = ""; + let reasoningText = ""; + + // Establish listener to early-abort a streaming response + // in case things go sideways or the user does not like the response. + // We preserve the generated text but continue as if chat was completed + // to preserve previously generated content. + const handleAbort = () => { + stream?.endMeasurement(usage); + clientAbortedHandler(resolve, fullText); + }; + response.on("close", handleAbort); + + try { + for await (const chunk of stream) { + const message = chunk?.choices?.[0]; + const token = message?.delta?.content; + const reasoningToken = message?.delta?.reasoning_content; + + if ( + chunk.hasOwnProperty("usage") && // exists + !!chunk.usage && // is not null + Object.values(chunk.usage).length > 0 // has values + ) { + if (chunk.usage.hasOwnProperty("prompt_tokens")) { + usage.prompt_tokens = Number(chunk.usage.prompt_tokens); + } + + if (chunk.usage.hasOwnProperty("completion_tokens")) { + hasUsageMetrics = true; // to stop estimating counter + usage.completion_tokens = Number(chunk.usage.completion_tokens); + } + } + + // Reasoning models will always return the reasoning text before the token text. + if (reasoningToken) { + // If the reasoning text is empty (''), we need to initialize it + // and send the first chunk of reasoning text. + if (reasoningText.length === 0) { + writeResponseChunk(response, { + uuid, + sources: [], + type: "textResponseChunk", + textResponse: `<think>${reasoningToken}`, + close: false, + error: false, + }); + reasoningText += `<think>${reasoningToken}`; + continue; + } else { + writeResponseChunk(response, { + uuid, + sources: [], + type: "textResponseChunk", + textResponse: reasoningToken, + close: false, + error: false, + }); + reasoningText += reasoningToken; + } + } + + // If the reasoning text is not empty, but the reasoning token is empty + // and the token text is not empty we need to close the reasoning text and begin sending the token text. + if (!!reasoningText && !reasoningToken && token) { + writeResponseChunk(response, { + uuid, + sources: [], + type: "textResponseChunk", + textResponse: `</think>`, + close: false, + error: false, + }); + fullText += `${reasoningText}</think>`; + reasoningText = ""; + } + + if (token) { + fullText += token; + // If we never saw a usage metric, we can estimate them by number of completion chunks + if (!hasUsageMetrics) usage.completion_tokens++; + writeResponseChunk(response, { + uuid, + sources: [], + type: "textResponseChunk", + textResponse: token, + close: false, + error: false, + }); + } + + if ( + message?.hasOwnProperty("finish_reason") && // Got valid message and it is an object with finish_reason + message.finish_reason !== "" && + message.finish_reason !== null + ) { + writeResponseChunk(response, { + uuid, + sources, + type: "textResponseChunk", + textResponse: "", + close: true, + error: false, + }); + response.removeListener("close", handleAbort); + stream?.endMeasurement(usage); + resolve(fullText); + break; // Break streaming when a valid finish_reason is first encountered + } + } + } catch (e) { + console.log(`\x1b[43m\x1b[34m[STREAMING ERROR]\x1b[0m ${e.message}`); + writeResponseChunk(response, { + uuid, + type: "abort", + textResponse: null, + sources: [], + close: true, + error: e.message, + }); + stream?.endMeasurement(usage); + resolve(fullText); + } + }); } // Simple wrapper for dynamic embedder & normalize interface for all LLM implementations