Skip to content
Snippets Groups Projects
Unverified Commit 1bfd4617 authored by Timothy Carambat's avatar Timothy Carambat Committed by GitHub
Browse files

Patch PPLX streaming for timeouts (#3130)

Add in-text citations as well for PPLX token streaming
handle timeouts for stream/buffer hanging
parent df8d34d3
No related branches found
No related tags found
No related merge requests found
const { v4: uuidv4 } = require("uuid");
const { NativeEmbedder } = require("../../EmbeddingEngines/native");
const {
handleDefaultStreamResponseV2,
writeResponseChunk,
clientAbortedHandler,
} = require("../../helpers/chat/responses");
const {
LLMPerformanceMonitor,
......@@ -137,8 +139,137 @@ class PerplexityLLM {
return measuredStreamRequest;
}
enrichToken(token, citations) {
if (Array.isArray(citations) && citations.length !== 0) {
return token.replace(/\[(\d+)\]/g, (match, index) => {
const citationIndex = parseInt(index) - 1;
return citations[citationIndex]
? `[[${index}](${citations[citationIndex]})]`
: match;
});
}
return token;
}
handleStream(response, stream, responseProps) {
return handleDefaultStreamResponseV2(response, stream, responseProps);
const timeoutThresholdMs = 800;
const { uuid = uuidv4(), sources = [] } = responseProps;
let hasUsageMetrics = false;
let pplxCitations = []; // Array of links
let usage = {
completion_tokens: 0,
};
return new Promise(async (resolve) => {
let fullText = "";
let lastChunkTime = null;
const handleAbort = () => {
stream?.endMeasurement(usage);
clientAbortedHandler(resolve, fullText);
};
response.on("close", handleAbort);
const timeoutCheck = setInterval(() => {
if (lastChunkTime === null) return;
const now = Number(new Date());
const diffMs = now - lastChunkTime;
if (diffMs >= timeoutThresholdMs) {
console.log(
`Perplexity stream did not self-close and has been stale for >${timeoutThresholdMs}ms. Closing response stream.`
);
writeResponseChunk(response, {
uuid,
sources,
type: "textResponseChunk",
textResponse: "",
close: true,
error: false,
});
clearInterval(timeoutCheck);
response.removeListener("close", handleAbort);
stream?.endMeasurement(usage);
resolve(fullText);
}
}, 500);
// Now handle the chunks from the streamed response and append to fullText.
try {
for await (const chunk of stream) {
lastChunkTime = Number(new Date());
const message = chunk?.choices?.[0];
const token = message?.delta?.content;
if (Array.isArray(chunk.citations) && chunk.citations.length !== 0) {
pplxCitations = chunk.citations;
}
// If we see usage metrics in the chunk, we can use them directly
// instead of estimating them, but we only want to assign values if
// the response object is the exact same key:value pair we expect.
if (
chunk.hasOwnProperty("usage") && // exists
!!chunk.usage && // is not null
Object.values(chunk.usage).length > 0 // has values
) {
if (chunk.usage.hasOwnProperty("prompt_tokens")) {
usage.prompt_tokens = Number(chunk.usage.prompt_tokens);
}
if (chunk.usage.hasOwnProperty("completion_tokens")) {
hasUsageMetrics = true; // to stop estimating counter
usage.completion_tokens = Number(chunk.usage.completion_tokens);
}
}
if (token) {
let enrichedToken = this.enrichToken(token, pplxCitations);
fullText += enrichedToken;
if (!hasUsageMetrics) usage.completion_tokens++;
writeResponseChunk(response, {
uuid,
sources: [],
type: "textResponseChunk",
textResponse: enrichedToken,
close: false,
error: false,
});
}
if (message?.finish_reason) {
console.log("closing");
writeResponseChunk(response, {
uuid,
sources,
type: "textResponseChunk",
textResponse: "",
close: true,
error: false,
});
response.removeListener("close", handleAbort);
stream?.endMeasurement(usage);
clearInterval(timeoutCheck);
resolve(fullText);
break; // Break streaming when a valid finish_reason is first encountered
}
}
} catch (e) {
console.log(`\x1b[43m\x1b[34m[STREAMING ERROR]\x1b[0m ${e.message}`);
writeResponseChunk(response, {
uuid,
type: "abort",
textResponse: null,
sources: [],
close: true,
error: e.message,
});
stream?.endMeasurement(usage);
clearInterval(timeoutCheck);
resolve(fullText); // Return what we currently have - if anything.
}
});
}
// Simple wrapper for dynamic embedder & normalize interface for all LLM implementations
......
const MODELS = {
"sonar-reasoning-pro": {
id: "sonar-reasoning-pro",
name: "sonar-reasoning-pro",
maxLength: 127072,
},
"sonar-reasoning": {
id: "sonar-reasoning",
name: "sonar-reasoning",
maxLength: 127072,
},
"sonar-pro": {
id: "sonar-pro",
name: "sonar-pro",
......
| Model | Parameter Count | Context Length | Model Type |
| :---------------------------------- | :-------------- | :------------- | :-------------- |
| `sonar-reasoning-pro` | 8B | 127,072 | Chat Completion |
| `sonar-reasoning` | 8B | 127,072 | Chat Completion |
| `sonar-pro` | 8B | 200,000 | Chat Completion |
| `sonar` | 8B | 127,072 | Chat Completion |
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment