From 1bfd461719589c92dbfc4135601a7f34480058e5 Mon Sep 17 00:00:00 2001 From: Timothy Carambat <rambat1010@gmail.com> Date: Wed, 5 Feb 2025 16:35:22 -0800 Subject: [PATCH] Patch PPLX streaming for timeouts (#3130) Add in-text citations as well for PPLX token streaming handle timeouts for stream/buffer hanging --- server/utils/AiProviders/perplexity/index.js | 135 +++++++++++++++++- server/utils/AiProviders/perplexity/models.js | 10 ++ .../perplexity/scripts/chat_models.txt | 2 + 3 files changed, 145 insertions(+), 2 deletions(-) diff --git a/server/utils/AiProviders/perplexity/index.js b/server/utils/AiProviders/perplexity/index.js index c365c17eb..7126742d8 100644 --- a/server/utils/AiProviders/perplexity/index.js +++ b/server/utils/AiProviders/perplexity/index.js @@ -1,6 +1,8 @@ +const { v4: uuidv4 } = require("uuid"); const { NativeEmbedder } = require("../../EmbeddingEngines/native"); const { - handleDefaultStreamResponseV2, + writeResponseChunk, + clientAbortedHandler, } = require("../../helpers/chat/responses"); const { LLMPerformanceMonitor, @@ -137,8 +139,137 @@ class PerplexityLLM { return measuredStreamRequest; } + enrichToken(token, citations) { + if (Array.isArray(citations) && citations.length !== 0) { + return token.replace(/\[(\d+)\]/g, (match, index) => { + const citationIndex = parseInt(index) - 1; + return citations[citationIndex] + ? `[[${index}](${citations[citationIndex]})]` + : match; + }); + } + return token; + } + handleStream(response, stream, responseProps) { - return handleDefaultStreamResponseV2(response, stream, responseProps); + const timeoutThresholdMs = 800; + const { uuid = uuidv4(), sources = [] } = responseProps; + let hasUsageMetrics = false; + let pplxCitations = []; // Array of links + let usage = { + completion_tokens: 0, + }; + + return new Promise(async (resolve) => { + let fullText = ""; + let lastChunkTime = null; + + const handleAbort = () => { + stream?.endMeasurement(usage); + clientAbortedHandler(resolve, fullText); + }; + response.on("close", handleAbort); + + const timeoutCheck = setInterval(() => { + if (lastChunkTime === null) return; + + const now = Number(new Date()); + const diffMs = now - lastChunkTime; + if (diffMs >= timeoutThresholdMs) { + console.log( + `Perplexity stream did not self-close and has been stale for >${timeoutThresholdMs}ms. Closing response stream.` + ); + writeResponseChunk(response, { + uuid, + sources, + type: "textResponseChunk", + textResponse: "", + close: true, + error: false, + }); + clearInterval(timeoutCheck); + response.removeListener("close", handleAbort); + stream?.endMeasurement(usage); + resolve(fullText); + } + }, 500); + + // Now handle the chunks from the streamed response and append to fullText. + try { + for await (const chunk of stream) { + lastChunkTime = Number(new Date()); + const message = chunk?.choices?.[0]; + const token = message?.delta?.content; + + if (Array.isArray(chunk.citations) && chunk.citations.length !== 0) { + pplxCitations = chunk.citations; + } + + // If we see usage metrics in the chunk, we can use them directly + // instead of estimating them, but we only want to assign values if + // the response object is the exact same key:value pair we expect. + if ( + chunk.hasOwnProperty("usage") && // exists + !!chunk.usage && // is not null + Object.values(chunk.usage).length > 0 // has values + ) { + if (chunk.usage.hasOwnProperty("prompt_tokens")) { + usage.prompt_tokens = Number(chunk.usage.prompt_tokens); + } + + if (chunk.usage.hasOwnProperty("completion_tokens")) { + hasUsageMetrics = true; // to stop estimating counter + usage.completion_tokens = Number(chunk.usage.completion_tokens); + } + } + + if (token) { + let enrichedToken = this.enrichToken(token, pplxCitations); + fullText += enrichedToken; + if (!hasUsageMetrics) usage.completion_tokens++; + + writeResponseChunk(response, { + uuid, + sources: [], + type: "textResponseChunk", + textResponse: enrichedToken, + close: false, + error: false, + }); + } + + if (message?.finish_reason) { + console.log("closing"); + writeResponseChunk(response, { + uuid, + sources, + type: "textResponseChunk", + textResponse: "", + close: true, + error: false, + }); + response.removeListener("close", handleAbort); + stream?.endMeasurement(usage); + clearInterval(timeoutCheck); + resolve(fullText); + break; // Break streaming when a valid finish_reason is first encountered + } + } + } catch (e) { + console.log(`\x1b[43m\x1b[34m[STREAMING ERROR]\x1b[0m ${e.message}`); + writeResponseChunk(response, { + uuid, + type: "abort", + textResponse: null, + sources: [], + close: true, + error: e.message, + }); + stream?.endMeasurement(usage); + clearInterval(timeoutCheck); + resolve(fullText); // Return what we currently have - if anything. + } + }); } // Simple wrapper for dynamic embedder & normalize interface for all LLM implementations diff --git a/server/utils/AiProviders/perplexity/models.js b/server/utils/AiProviders/perplexity/models.js index 53d3ff748..f035ae28d 100644 --- a/server/utils/AiProviders/perplexity/models.js +++ b/server/utils/AiProviders/perplexity/models.js @@ -1,4 +1,14 @@ const MODELS = { + "sonar-reasoning-pro": { + id: "sonar-reasoning-pro", + name: "sonar-reasoning-pro", + maxLength: 127072, + }, + "sonar-reasoning": { + id: "sonar-reasoning", + name: "sonar-reasoning", + maxLength: 127072, + }, "sonar-pro": { id: "sonar-pro", name: "sonar-pro", diff --git a/server/utils/AiProviders/perplexity/scripts/chat_models.txt b/server/utils/AiProviders/perplexity/scripts/chat_models.txt index a19b067b1..44ab9ac30 100644 --- a/server/utils/AiProviders/perplexity/scripts/chat_models.txt +++ b/server/utils/AiProviders/perplexity/scripts/chat_models.txt @@ -1,4 +1,6 @@ | Model | Parameter Count | Context Length | Model Type | | :---------------------------------- | :-------------- | :------------- | :-------------- | +| `sonar-reasoning-pro` | 8B | 127,072 | Chat Completion | +| `sonar-reasoning` | 8B | 127,072 | Chat Completion | | `sonar-pro` | 8B | 200,000 | Chat Completion | | `sonar` | 8B | 127,072 | Chat Completion | \ No newline at end of file -- GitLab