From c59ab9da0a4996c76b3504347197cd280e227192 Mon Sep 17 00:00:00 2001 From: Timothy Carambat <rambat1010@gmail.com> Date: Wed, 14 Feb 2024 12:32:07 -0800 Subject: [PATCH] Refactor LLM chat backend (#717) * refactor stream/chat/embed-stram to be a single execution logic path so that it is easier to maintain and build upon * no thread in sync chat since only api uses it adjust import locations --- server/endpoints/api/workspace/index.js | 10 +- server/endpoints/chat.js | 2 +- server/endpoints/embed/index.js | 6 +- server/endpoints/workspaceThreads.js | 2 +- server/endpoints/workspaces.js | 2 +- server/utils/AiProviders/azureOpenAi/index.js | 2 +- server/utils/AiProviders/gemini/index.js | 2 +- server/utils/AiProviders/huggingface/index.js | 2 +- server/utils/AiProviders/lmStudio/index.js | 2 +- server/utils/AiProviders/localAi/index.js | 2 +- server/utils/AiProviders/mistral/index.js | 2 +- server/utils/AiProviders/native/index.js | 2 +- server/utils/AiProviders/ollama/index.js | 2 +- server/utils/AiProviders/openAi/index.js | 2 +- server/utils/AiProviders/togetherAi/index.js | 2 +- server/utils/chats/embed.js | 123 +++------ server/utils/chats/index.js | 191 ++++--------- server/utils/chats/stream.js | 253 +++--------------- server/utils/helpers/chat/index.js | 2 +- server/utils/helpers/chat/responses.js | 144 ++++++++++ 20 files changed, 287 insertions(+), 468 deletions(-) create mode 100644 server/utils/helpers/chat/responses.js diff --git a/server/endpoints/api/workspace/index.js b/server/endpoints/api/workspace/index.js index b846f3f8d..122602873 100644 --- a/server/endpoints/api/workspace/index.js +++ b/server/endpoints/api/workspace/index.js @@ -4,19 +4,19 @@ const { Telemetry } = require("../../../models/telemetry"); const { DocumentVectors } = require("../../../models/vectors"); const { Workspace } = require("../../../models/workspace"); const { WorkspaceChats } = require("../../../models/workspaceChats"); -const { - convertToChatHistory, - chatWithWorkspace, -} = require("../../../utils/chats"); +const { chatWithWorkspace } = require("../../../utils/chats"); const { getVectorDbClass } = require("../../../utils/helpers"); const { multiUserMode, reqBody } = require("../../../utils/http"); const { validApiKey } = require("../../../utils/middleware/validApiKey"); const { streamChatWithWorkspace, - writeResponseChunk, VALID_CHAT_MODE, } = require("../../../utils/chats/stream"); const { EventLogs } = require("../../../models/eventLogs"); +const { + convertToChatHistory, + writeResponseChunk, +} = require("../../../utils/helpers/chat/responses"); function apiWorkspaceEndpoints(app) { if (!app) return; diff --git a/server/endpoints/chat.js b/server/endpoints/chat.js index d45ad7b4d..756944bea 100644 --- a/server/endpoints/chat.js +++ b/server/endpoints/chat.js @@ -7,7 +7,6 @@ const { SystemSettings } = require("../models/systemSettings"); const { Telemetry } = require("../models/telemetry"); const { streamChatWithWorkspace, - writeResponseChunk, VALID_CHAT_MODE, } = require("../utils/chats/stream"); const { @@ -18,6 +17,7 @@ const { EventLogs } = require("../models/eventLogs"); const { validWorkspaceAndThreadSlug, } = require("../utils/middleware/validWorkspace"); +const { writeResponseChunk } = require("../utils/helpers/chat/responses"); function chatEndpoints(app) { if (!app) return; diff --git a/server/endpoints/embed/index.js b/server/endpoints/embed/index.js index 087467282..0631a655d 100644 --- a/server/endpoints/embed/index.js +++ b/server/endpoints/embed/index.js @@ -1,15 +1,17 @@ const { v4: uuidv4 } = require("uuid"); const { reqBody, multiUserMode } = require("../../utils/http"); const { Telemetry } = require("../../models/telemetry"); -const { writeResponseChunk } = require("../../utils/chats/stream"); const { streamChatWithForEmbed } = require("../../utils/chats/embed"); -const { convertToChatHistory } = require("../../utils/chats"); const { EmbedChats } = require("../../models/embedChats"); const { validEmbedConfig, canRespond, setConnectionMeta, } = require("../../utils/middleware/embedMiddleware"); +const { + convertToChatHistory, + writeResponseChunk, +} = require("../../utils/helpers/chat/responses"); function embeddedEndpoints(app) { if (!app) return; diff --git a/server/endpoints/workspaceThreads.js b/server/endpoints/workspaceThreads.js index d1d0909cc..6ebf2ef4a 100644 --- a/server/endpoints/workspaceThreads.js +++ b/server/endpoints/workspaceThreads.js @@ -12,7 +12,7 @@ const { validWorkspaceAndThreadSlug, } = require("../utils/middleware/validWorkspace"); const { WorkspaceChats } = require("../models/workspaceChats"); -const { convertToChatHistory } = require("../utils/chats"); +const { convertToChatHistory } = require("../utils/helpers/chat/responses"); function workspaceThreadEndpoints(app) { if (!app) return; diff --git a/server/endpoints/workspaces.js b/server/endpoints/workspaces.js index c6a3ad937..944be1de7 100644 --- a/server/endpoints/workspaces.js +++ b/server/endpoints/workspaces.js @@ -3,7 +3,6 @@ const { Workspace } = require("../models/workspace"); const { Document } = require("../models/documents"); const { DocumentVectors } = require("../models/vectors"); const { WorkspaceChats } = require("../models/workspaceChats"); -const { convertToChatHistory } = require("../utils/chats"); const { getVectorDbClass } = require("../utils/helpers"); const { setupMulter } = require("../utils/files/multer"); const { @@ -22,6 +21,7 @@ const { WorkspaceSuggestedMessages, } = require("../models/workspacesSuggestedMessages"); const { validWorkspaceSlug } = require("../utils/middleware/validWorkspace"); +const { convertToChatHistory } = require("../utils/helpers/chat/responses"); const { handleUploads } = setupMulter(); function workspaceEndpoints(app) { diff --git a/server/utils/AiProviders/azureOpenAi/index.js b/server/utils/AiProviders/azureOpenAi/index.js index eac47f0ef..2ac6de3a1 100644 --- a/server/utils/AiProviders/azureOpenAi/index.js +++ b/server/utils/AiProviders/azureOpenAi/index.js @@ -1,6 +1,6 @@ const { AzureOpenAiEmbedder } = require("../../EmbeddingEngines/azureOpenAi"); const { chatPrompt } = require("../../chats"); -const { writeResponseChunk } = require("../../chats/stream"); +const { writeResponseChunk } = require("../../helpers/chat/responses"); class AzureOpenAiLLM { constructor(embedder = null, _modelPreference = null) { diff --git a/server/utils/AiProviders/gemini/index.js b/server/utils/AiProviders/gemini/index.js index 36c63df72..bd84a3856 100644 --- a/server/utils/AiProviders/gemini/index.js +++ b/server/utils/AiProviders/gemini/index.js @@ -1,5 +1,5 @@ const { chatPrompt } = require("../../chats"); -const { writeResponseChunk } = require("../../chats/stream"); +const { writeResponseChunk } = require("../../helpers/chat/responses"); class GeminiLLM { constructor(embedder = null, modelPreference = null) { diff --git a/server/utils/AiProviders/huggingface/index.js b/server/utils/AiProviders/huggingface/index.js index 8fcc2b47e..416e622a3 100644 --- a/server/utils/AiProviders/huggingface/index.js +++ b/server/utils/AiProviders/huggingface/index.js @@ -1,7 +1,7 @@ const { NativeEmbedder } = require("../../EmbeddingEngines/native"); const { OpenAiEmbedder } = require("../../EmbeddingEngines/openAi"); const { chatPrompt } = require("../../chats"); -const { writeResponseChunk } = require("../../chats/stream"); +const { writeResponseChunk } = require("../../helpers/chat/responses"); class HuggingFaceLLM { constructor(embedder = null, _modelPreference = null) { diff --git a/server/utils/AiProviders/lmStudio/index.js b/server/utils/AiProviders/lmStudio/index.js index fe8064b68..455edce3a 100644 --- a/server/utils/AiProviders/lmStudio/index.js +++ b/server/utils/AiProviders/lmStudio/index.js @@ -1,5 +1,5 @@ const { chatPrompt } = require("../../chats"); -const { handleDefaultStreamResponse } = require("../../chats/stream"); +const { handleDefaultStreamResponse } = require("../../helpers/chat/responses"); // hybrid of openAi LLM chat completion for LMStudio class LMStudioLLM { diff --git a/server/utils/AiProviders/localAi/index.js b/server/utils/AiProviders/localAi/index.js index 2717c47f3..3e5cfbf77 100644 --- a/server/utils/AiProviders/localAi/index.js +++ b/server/utils/AiProviders/localAi/index.js @@ -1,5 +1,5 @@ const { chatPrompt } = require("../../chats"); -const { handleDefaultStreamResponse } = require("../../chats/stream"); +const { handleDefaultStreamResponse } = require("../../helpers/chat/responses"); class LocalAiLLM { constructor(embedder = null, modelPreference = null) { diff --git a/server/utils/AiProviders/mistral/index.js b/server/utils/AiProviders/mistral/index.js index 785a8dd08..21b17f01a 100644 --- a/server/utils/AiProviders/mistral/index.js +++ b/server/utils/AiProviders/mistral/index.js @@ -1,5 +1,5 @@ const { chatPrompt } = require("../../chats"); -const { handleDefaultStreamResponse } = require("../../chats/stream"); +const { handleDefaultStreamResponse } = require("../../helpers/chat/responses"); class MistralLLM { constructor(embedder = null, modelPreference = null) { diff --git a/server/utils/AiProviders/native/index.js b/server/utils/AiProviders/native/index.js index 4b96d02ef..157fb7520 100644 --- a/server/utils/AiProviders/native/index.js +++ b/server/utils/AiProviders/native/index.js @@ -2,7 +2,7 @@ const fs = require("fs"); const path = require("path"); const { NativeEmbedder } = require("../../EmbeddingEngines/native"); const { chatPrompt } = require("../../chats"); -const { writeResponseChunk } = require("../../chats/stream"); +const { writeResponseChunk } = require("../../helpers/chat/responses"); // Docs: https://api.js.langchain.com/classes/chat_models_llama_cpp.ChatLlamaCpp.html const ChatLlamaCpp = (...args) => diff --git a/server/utils/AiProviders/ollama/index.js b/server/utils/AiProviders/ollama/index.js index efd4649f1..035d4a9d0 100644 --- a/server/utils/AiProviders/ollama/index.js +++ b/server/utils/AiProviders/ollama/index.js @@ -1,6 +1,6 @@ const { chatPrompt } = require("../../chats"); const { StringOutputParser } = require("langchain/schema/output_parser"); -const { writeResponseChunk } = require("../../chats/stream"); +const { writeResponseChunk } = require("../../helpers/chat/responses"); // Docs: https://github.com/jmorganca/ollama/blob/main/docs/api.md class OllamaAILLM { diff --git a/server/utils/AiProviders/openAi/index.js b/server/utils/AiProviders/openAi/index.js index dffd36824..c3c983f8d 100644 --- a/server/utils/AiProviders/openAi/index.js +++ b/server/utils/AiProviders/openAi/index.js @@ -1,6 +1,6 @@ const { OpenAiEmbedder } = require("../../EmbeddingEngines/openAi"); const { chatPrompt } = require("../../chats"); -const { handleDefaultStreamResponse } = require("../../chats/stream"); +const { handleDefaultStreamResponse } = require("../../helpers/chat/responses"); class OpenAiLLM { constructor(embedder = null, modelPreference = null) { diff --git a/server/utils/AiProviders/togetherAi/index.js b/server/utils/AiProviders/togetherAi/index.js index 7291622a7..15b254a15 100644 --- a/server/utils/AiProviders/togetherAi/index.js +++ b/server/utils/AiProviders/togetherAi/index.js @@ -1,5 +1,5 @@ const { chatPrompt } = require("../../chats"); -const { writeResponseChunk } = require("../../chats/stream"); +const { writeResponseChunk } = require("../../helpers/chat/responses"); function togetherAiModels() { const { MODELS } = require("./models.js"); diff --git a/server/utils/chats/embed.js b/server/utils/chats/embed.js index 5a7b93b49..7a4c52d17 100644 --- a/server/utils/chats/embed.js +++ b/server/utils/chats/embed.js @@ -1,8 +1,11 @@ const { v4: uuidv4 } = require("uuid"); const { getVectorDbClass, getLLMProvider } = require("../helpers"); -const { chatPrompt, convertToPromptHistory } = require("."); -const { writeResponseChunk } = require("./stream"); +const { chatPrompt } = require("./index"); const { EmbedChats } = require("../../models/embedChats"); +const { + convertToPromptHistory, + writeResponseChunk, +} = require("../helpers/chat/responses"); async function streamChatWithForEmbed( response, @@ -44,30 +47,20 @@ async function streamChatWithForEmbed( const messageLimit = 20; const hasVectorizedSpace = await VectorDb.hasNamespace(embed.workspace.slug); const embeddingsCount = await VectorDb.namespaceCount(embed.workspace.slug); - if (!hasVectorizedSpace || embeddingsCount === 0) { - if (chatMode === "query") { - writeResponseChunk(response, { - id: uuid, - type: "textResponse", - textResponse: - "I do not have enough information to answer that. Try another question.", - sources: [], - close: true, - error: null, - }); - return; - } - // If there are no embeddings - chat like a normal LLM chat interface. - return await streamEmptyEmbeddingChat({ - response, - uuid, - sessionId, - message, - embed, - messageLimit, - LLMConnector, + // User is trying to query-mode chat a workspace that has no data in it - so + // we should exit early as no information can be found under these conditions. + if ((!hasVectorizedSpace || embeddingsCount === 0) && chatMode === "query") { + writeResponseChunk(response, { + id: uuid, + type: "textResponse", + textResponse: + "I do not have enough information to answer that. Try another question.", + sources: [], + close: true, + error: null, }); + return; } let completeText; @@ -77,17 +70,24 @@ async function streamChatWithForEmbed( messageLimit, chatMode ); + const { contextTexts = [], sources = [], message: error, - } = await VectorDb.performSimilaritySearch({ - namespace: embed.workspace.slug, - input: message, - LLMConnector, - similarityThreshold: embed.workspace?.similarityThreshold, - topN: embed.workspace?.topN, - }); + } = embeddingsCount !== 0 // if there no embeddings don't bother searching. + ? await VectorDb.performSimilaritySearch({ + namespace: embed.workspace.slug, + input: message, + LLMConnector, + similarityThreshold: embed.workspace?.similarityThreshold, + topN: embed.workspace?.topN, + }) + : { + contextTexts: [], + sources: [], + message: null, + }; // Failed similarity search. if (!!error) { @@ -176,7 +176,7 @@ async function recentEmbedChatHistory( messageLimit = 20, chatMode = null ) { - if (chatMode === "query") return []; + if (chatMode === "query") return { rawHistory: [], chatHistory: [] }; const rawHistory = ( await EmbedChats.forEmbedByUser(embed.id, sessionId, messageLimit, { id: "desc", @@ -185,65 +185,6 @@ async function recentEmbedChatHistory( return { rawHistory, chatHistory: convertToPromptHistory(rawHistory) }; } -async function streamEmptyEmbeddingChat({ - response, - uuid, - sessionId, - message, - embed, - messageLimit, - LLMConnector, -}) { - let completeText; - const { rawHistory, chatHistory } = await recentEmbedChatHistory( - sessionId, - embed, - messageLimit - ); - - if (LLMConnector.streamingEnabled() !== true) { - console.log( - `\x1b[31m[STREAMING DISABLED]\x1b[0m Streaming is not available for ${LLMConnector.constructor.name}. Will use regular chat method.` - ); - completeText = await LLMConnector.sendChat( - chatHistory, - message, - embed.workspace, - rawHistory - ); - writeResponseChunk(response, { - uuid, - type: "textResponseChunk", - textResponse: completeText, - sources: [], - close: true, - error: false, - }); - } - - const stream = await LLMConnector.streamChat( - chatHistory, - message, - embed.workspace, - rawHistory - ); - completeText = await LLMConnector.handleStream(response, stream, { - uuid, - sources: [], - }); - - await EmbedChats.new({ - embedId: embed.id, - prompt: message, - response: { text: completeText, type: "chat" }, - connection_information: response.locals.connection - ? { ...response.locals.connection } - : {}, - sessionId, - }); - return; -} - module.exports = { streamChatWithForEmbed, }; diff --git a/server/utils/chats/index.js b/server/utils/chats/index.js index d25d2a93b..6d8cccf9a 100644 --- a/server/utils/chats/index.js +++ b/server/utils/chats/index.js @@ -1,46 +1,8 @@ const { v4: uuidv4 } = require("uuid"); const { WorkspaceChats } = require("../../models/workspaceChats"); const { resetMemory } = require("./commands/reset"); -const moment = require("moment"); const { getVectorDbClass, getLLMProvider } = require("../helpers"); - -function convertToChatHistory(history = []) { - const formattedHistory = []; - history.forEach((history) => { - const { prompt, response, createdAt, feedbackScore = null, id } = history; - const data = JSON.parse(response); - formattedHistory.push([ - { - role: "user", - content: prompt, - sentAt: moment(createdAt).unix(), - }, - { - role: "assistant", - content: data.text, - sources: data.sources || [], - chatId: id, - sentAt: moment(createdAt).unix(), - feedbackScore, - }, - ]); - }); - - return formattedHistory.flat(); -} - -function convertToPromptHistory(history = []) { - const formattedHistory = []; - history.forEach((history) => { - const { prompt, response } = history; - const data = JSON.parse(response); - formattedHistory.push([ - { role: "user", content: prompt }, - { role: "assistant", content: data.text }, - ]); - }); - return formattedHistory.flat(); -} +const { convertToPromptHistory } = require("../helpers/chat/responses"); const VALID_COMMANDS = { "/reset": resetMemory, @@ -64,7 +26,8 @@ async function chatWithWorkspace( workspace, message, chatMode = "chat", - user = null + user = null, + thread = null ) { const uuid = uuidv4(); const command = grepCommand(message); @@ -92,49 +55,51 @@ async function chatWithWorkspace( const messageLimit = workspace?.openAiHistory || 20; const hasVectorizedSpace = await VectorDb.hasNamespace(workspace.slug); const embeddingsCount = await VectorDb.namespaceCount(workspace.slug); - if (!hasVectorizedSpace || embeddingsCount === 0) { - if (chatMode === "query") { - return { - id: uuid, - type: "textResponse", - sources: [], - close: true, - error: null, - textResponse: - "There is no relevant information in this workspace to answer your query.", - }; - } - // If there are no embeddings - chat like a normal LLM chat interface. - return await emptyEmbeddingChat({ - uuid, - user, - message, - workspace, - messageLimit, - LLMConnector, - }); + // User is trying to query-mode chat a workspace that has no data in it - so + // we should exit early as no information can be found under these conditions. + if ((!hasVectorizedSpace || embeddingsCount === 0) && chatMode === "query") { + return { + id: uuid, + type: "textResponse", + sources: [], + close: true, + error: null, + textResponse: + "There is no relevant information in this workspace to answer your query.", + }; } - const { rawHistory, chatHistory } = await recentChatHistory( + // If we are here we know that we are in a workspace that is: + // 1. Chatting in "chat" mode and may or may _not_ have embeddings + // 2. Chatting in "query" mode and has at least 1 embedding + const { rawHistory, chatHistory } = await recentChatHistory({ user, workspace, + thread, messageLimit, - chatMode - ); + chatMode, + }); + const { contextTexts = [], sources = [], message: error, - } = await VectorDb.performSimilaritySearch({ - namespace: workspace.slug, - input: message, - LLMConnector, - similarityThreshold: workspace?.similarityThreshold, - topN: workspace?.topN, - }); + } = embeddingsCount !== 0 // if there no embeddings don't bother searching. + ? await VectorDb.performSimilaritySearch({ + namespace: workspace.slug, + input: message, + LLMConnector, + similarityThreshold: workspace?.similarityThreshold, + topN: workspace?.topN, + }) + : { + contextTexts: [], + sources: [], + message: null, + }; - // Failed similarity search. + // Failed similarity search if it was run at all and failed. if (!!error) { return { id: uuid, @@ -147,7 +112,7 @@ async function chatWithWorkspace( } // If in query mode and no sources are found, do not - // let the LLM try to hallucinate a response or use general knowledge + // let the LLM try to hallucinate a response or use general knowledge and exit early if (chatMode === "query" && sources.length === 0) { return { id: uuid, @@ -160,7 +125,7 @@ async function chatWithWorkspace( }; } - // Compress message to ensure prompt passes token limit with room for response + // Compress & Assemble message to ensure prompt passes token limit with room for response // and build system messages based on inputs and history. const messages = await LLMConnector.compressMessages( { @@ -187,10 +152,12 @@ async function chatWithWorkspace( error: "No text completion could be completed with this input.", }; } + const { chat } = await WorkspaceChats.new({ workspaceId: workspace.id, prompt: message, response: { text: textResponse, sources, type: chatMode }, + threadId: thread?.id || null, user, }); return { @@ -204,41 +171,14 @@ async function chatWithWorkspace( }; } -// On query we dont return message history. All other chat modes and when chatting -// with no embeddings we return history. -// TODO: Refactor to just run a .where on WorkspaceChat to simplify what is going on here. -// see recentThreadChatHistory -async function recentChatHistory( +async function recentChatHistory({ user = null, workspace, + thread = null, messageLimit = 20, - chatMode = null -) { - if (chatMode === "query") return []; - const rawHistory = ( - user - ? await WorkspaceChats.forWorkspaceByUser( - workspace.id, - user.id, - messageLimit, - { id: "desc" } - ) - : await WorkspaceChats.forWorkspace(workspace.id, messageLimit, { - id: "desc", - }) - ).reverse(); - return { rawHistory, chatHistory: convertToPromptHistory(rawHistory) }; -} - -// Extension of recentChatHistory that supports threads -async function recentThreadChatHistory( - user = null, - workspace, - thread, - messageLimit = 20, - chatMode = null -) { - if (chatMode === "query") return []; + chatMode = null, +}) { + if (chatMode === "query") return { rawHistory: [], chatHistory: [] }; const rawHistory = ( await WorkspaceChats.where( { @@ -254,42 +194,6 @@ async function recentThreadChatHistory( return { rawHistory, chatHistory: convertToPromptHistory(rawHistory) }; } -async function emptyEmbeddingChat({ - uuid, - user, - message, - workspace, - messageLimit, - LLMConnector, -}) { - const { rawHistory, chatHistory } = await recentChatHistory( - user, - workspace, - messageLimit - ); - const textResponse = await LLMConnector.sendChat( - chatHistory, - message, - workspace, - rawHistory - ); - const { chat } = await WorkspaceChats.new({ - workspaceId: workspace.id, - prompt: message, - response: { text: textResponse, sources: [], type: "chat" }, - user, - }); - return { - id: uuid, - type: "textResponse", - sources: [], - close: true, - error: null, - chatId: chat.id, - textResponse, - }; -} - function chatPrompt(workspace) { return ( workspace?.openAiPrompt ?? @@ -299,9 +203,6 @@ function chatPrompt(workspace) { module.exports = { recentChatHistory, - recentThreadChatHistory, - convertToPromptHistory, - convertToChatHistory, chatWithWorkspace, chatPrompt, grepCommand, diff --git a/server/utils/chats/stream.js b/server/utils/chats/stream.js index 0fe5a7eaf..4f86c49d6 100644 --- a/server/utils/chats/stream.js +++ b/server/utils/chats/stream.js @@ -1,19 +1,15 @@ const { v4: uuidv4 } = require("uuid"); const { WorkspaceChats } = require("../../models/workspaceChats"); const { getVectorDbClass, getLLMProvider } = require("../helpers"); +const { writeResponseChunk } = require("../helpers/chat/responses"); const { grepCommand, - recentChatHistory, VALID_COMMANDS, chatPrompt, - recentThreadChatHistory, -} = require("."); + recentChatHistory, +} = require("./index"); const VALID_CHAT_MODE = ["chat", "query"]; -function writeResponseChunk(response, data) { - response.write(`data: ${JSON.stringify(data)}\n\n`); - return; -} async function streamChatWithWorkspace( response, @@ -58,59 +54,53 @@ async function streamChatWithWorkspace( const messageLimit = workspace?.openAiHistory || 20; const hasVectorizedSpace = await VectorDb.hasNamespace(workspace.slug); const embeddingsCount = await VectorDb.namespaceCount(workspace.slug); - if (!hasVectorizedSpace || embeddingsCount === 0) { - if (chatMode === "query") { - writeResponseChunk(response, { - id: uuid, - type: "textResponse", - textResponse: - "There is no relevant information in this workspace to answer your query.", - sources: [], - close: true, - error: null, - }); - return; - } - // If there are no embeddings - chat like a normal LLM chat interface. - // no need to pass in chat mode - because if we are here we are in - // "chat" mode + have embeddings. - return await streamEmptyEmbeddingChat({ - response, - uuid, - user, - message, - workspace, - messageLimit, - LLMConnector, - thread, + // User is trying to query-mode chat a workspace that has no data in it - so + // we should exit early as no information can be found under these conditions. + if ((!hasVectorizedSpace || embeddingsCount === 0) && chatMode === "query") { + writeResponseChunk(response, { + id: uuid, + type: "textResponse", + textResponse: + "There is no relevant information in this workspace to answer your query.", + sources: [], + close: true, + error: null, }); + return; } + // If we are here we know that we are in a workspace that is: + // 1. Chatting in "chat" mode and may or may _not_ have embeddings + // 2. Chatting in "query" mode and has at least 1 embedding let completeText; - const { rawHistory, chatHistory } = thread - ? await recentThreadChatHistory( - user, - workspace, - thread, - messageLimit, - chatMode - ) - : await recentChatHistory(user, workspace, messageLimit, chatMode); + const { rawHistory, chatHistory } = await recentChatHistory({ + user, + workspace, + thread, + messageLimit, + chatMode, + }); const { contextTexts = [], sources = [], message: error, - } = await VectorDb.performSimilaritySearch({ - namespace: workspace.slug, - input: message, - LLMConnector, - similarityThreshold: workspace?.similarityThreshold, - topN: workspace?.topN, - }); + } = embeddingsCount !== 0 // if there no embeddings don't bother searching. + ? await VectorDb.performSimilaritySearch({ + namespace: workspace.slug, + input: message, + LLMConnector, + similarityThreshold: workspace?.similarityThreshold, + topN: workspace?.topN, + }) + : { + contextTexts: [], + sources: [], + message: null, + }; - // Failed similarity search. + // Failed similarity search if it was run at all and failed. if (!!error) { writeResponseChunk(response, { id: uuid, @@ -124,7 +114,7 @@ async function streamChatWithWorkspace( } // If in query mode and no sources are found, do not - // let the LLM try to hallucinate a response or use general knowledge + // let the LLM try to hallucinate a response or use general knowledge and exit early if (chatMode === "query" && sources.length === 0) { writeResponseChunk(response, { id: uuid, @@ -138,7 +128,7 @@ async function streamChatWithWorkspace( return; } - // Compress message to ensure prompt passes token limit with room for response + // Compress & Assemble message to ensure prompt passes token limit with room for response // and build system messages based on inputs and history. const messages = await LLMConnector.compressMessages( { @@ -181,7 +171,7 @@ async function streamChatWithWorkspace( workspaceId: workspace.id, prompt: message, response: { text: completeText, sources, type: chatMode }, - threadId: thread?.id, + threadId: thread?.id || null, user, }); @@ -195,166 +185,7 @@ async function streamChatWithWorkspace( return; } -async function streamEmptyEmbeddingChat({ - response, - uuid, - user, - message, - workspace, - messageLimit, - LLMConnector, - thread = null, -}) { - let completeText; - const { rawHistory, chatHistory } = thread - ? await recentThreadChatHistory(user, workspace, thread, messageLimit) - : await recentChatHistory(user, workspace, messageLimit); - - // If streaming is not explicitly enabled for connector - // we do regular waiting of a response and send a single chunk. - if (LLMConnector.streamingEnabled() !== true) { - console.log( - `\x1b[31m[STREAMING DISABLED]\x1b[0m Streaming is not available for ${LLMConnector.constructor.name}. Will use regular chat method.` - ); - completeText = await LLMConnector.sendChat( - chatHistory, - message, - workspace, - rawHistory - ); - writeResponseChunk(response, { - uuid, - type: "textResponseChunk", - textResponse: completeText, - sources: [], - close: true, - error: false, - }); - } else { - const stream = await LLMConnector.streamChat( - chatHistory, - message, - workspace, - rawHistory - ); - completeText = await LLMConnector.handleStream(response, stream, { - uuid, - sources: [], - }); - } - - const { chat } = await WorkspaceChats.new({ - workspaceId: workspace.id, - prompt: message, - response: { text: completeText, sources: [], type: "chat" }, - threadId: thread?.id, - user, - }); - - writeResponseChunk(response, { - uuid, - type: "finalizeResponseStream", - close: true, - error: false, - chatId: chat.id, - }); - return; -} - -// The default way to handle a stream response. Functions best with OpenAI. -function handleDefaultStreamResponse(response, stream, responseProps) { - const { uuid = uuidv4(), sources = [] } = responseProps; - - return new Promise((resolve) => { - let fullText = ""; - let chunk = ""; - stream.data.on("data", (data) => { - const lines = data - ?.toString() - ?.split("\n") - .filter((line) => line.trim() !== ""); - - for (const line of lines) { - let validJSON = false; - const message = chunk + line.replace(/^data: /, ""); - - // JSON chunk is incomplete and has not ended yet - // so we need to stitch it together. You would think JSON - // chunks would only come complete - but they don't! - try { - JSON.parse(message); - validJSON = true; - } catch {} - - if (!validJSON) { - // It can be possible that the chunk decoding is running away - // and the message chunk fails to append due to string length. - // In this case abort the chunk and reset so we can continue. - // ref: https://github.com/Mintplex-Labs/anything-llm/issues/416 - try { - chunk += message; - } catch (e) { - console.error(`Chunk appending error`, e); - chunk = ""; - } - continue; - } else { - chunk = ""; - } - - if (message == "[DONE]") { - writeResponseChunk(response, { - uuid, - sources, - type: "textResponseChunk", - textResponse: "", - close: true, - error: false, - }); - resolve(fullText); - } else { - let finishReason = null; - let token = ""; - try { - const json = JSON.parse(message); - token = json?.choices?.[0]?.delta?.content; - finishReason = json?.choices?.[0]?.finish_reason || null; - } catch { - continue; - } - - if (token) { - fullText += token; - writeResponseChunk(response, { - uuid, - sources: [], - type: "textResponseChunk", - textResponse: token, - close: false, - error: false, - }); - } - - if (finishReason !== null) { - writeResponseChunk(response, { - uuid, - sources, - type: "textResponseChunk", - textResponse: "", - close: true, - error: false, - }); - resolve(fullText); - } - } - } - }); - }); -} - module.exports = { VALID_CHAT_MODE, streamChatWithWorkspace, - writeResponseChunk, - handleDefaultStreamResponse, }; diff --git a/server/utils/helpers/chat/index.js b/server/utils/helpers/chat/index.js index ed7eab90f..7292c422e 100644 --- a/server/utils/helpers/chat/index.js +++ b/server/utils/helpers/chat/index.js @@ -1,5 +1,5 @@ -const { convertToPromptHistory } = require("../../chats"); const { TokenManager } = require("../tiktoken"); +const { convertToPromptHistory } = require("./responses"); /* What is the message Array compressor? diff --git a/server/utils/helpers/chat/responses.js b/server/utils/helpers/chat/responses.js new file mode 100644 index 000000000..c4371d818 --- /dev/null +++ b/server/utils/helpers/chat/responses.js @@ -0,0 +1,144 @@ +const { v4: uuidv4 } = require("uuid"); +const moment = require("moment"); + +// The default way to handle a stream response. Functions best with OpenAI. +// Currently used for LMStudio, LocalAI, Mistral API, and OpenAI +function handleDefaultStreamResponse(response, stream, responseProps) { + const { uuid = uuidv4(), sources = [] } = responseProps; + + return new Promise((resolve) => { + let fullText = ""; + let chunk = ""; + stream.data.on("data", (data) => { + const lines = data + ?.toString() + ?.split("\n") + .filter((line) => line.trim() !== ""); + + for (const line of lines) { + let validJSON = false; + const message = chunk + line.replace(/^data: /, ""); + + // JSON chunk is incomplete and has not ended yet + // so we need to stitch it together. You would think JSON + // chunks would only come complete - but they don't! + try { + JSON.parse(message); + validJSON = true; + } catch {} + + if (!validJSON) { + // It can be possible that the chunk decoding is running away + // and the message chunk fails to append due to string length. + // In this case abort the chunk and reset so we can continue. + // ref: https://github.com/Mintplex-Labs/anything-llm/issues/416 + try { + chunk += message; + } catch (e) { + console.error(`Chunk appending error`, e); + chunk = ""; + } + continue; + } else { + chunk = ""; + } + + if (message == "[DONE]") { + writeResponseChunk(response, { + uuid, + sources, + type: "textResponseChunk", + textResponse: "", + close: true, + error: false, + }); + resolve(fullText); + } else { + let finishReason = null; + let token = ""; + try { + const json = JSON.parse(message); + token = json?.choices?.[0]?.delta?.content; + finishReason = json?.choices?.[0]?.finish_reason || null; + } catch { + continue; + } + + if (token) { + fullText += token; + writeResponseChunk(response, { + uuid, + sources: [], + type: "textResponseChunk", + textResponse: token, + close: false, + error: false, + }); + } + + if (finishReason !== null) { + writeResponseChunk(response, { + uuid, + sources, + type: "textResponseChunk", + textResponse: "", + close: true, + error: false, + }); + resolve(fullText); + } + } + } + }); + }); +} + +function convertToChatHistory(history = []) { + const formattedHistory = []; + history.forEach((history) => { + const { prompt, response, createdAt, feedbackScore = null, id } = history; + const data = JSON.parse(response); + formattedHistory.push([ + { + role: "user", + content: prompt, + sentAt: moment(createdAt).unix(), + }, + { + role: "assistant", + content: data.text, + sources: data.sources || [], + chatId: id, + sentAt: moment(createdAt).unix(), + feedbackScore, + }, + ]); + }); + + return formattedHistory.flat(); +} + +function convertToPromptHistory(history = []) { + const formattedHistory = []; + history.forEach((history) => { + const { prompt, response } = history; + const data = JSON.parse(response); + formattedHistory.push([ + { role: "user", content: prompt }, + { role: "assistant", content: data.text }, + ]); + }); + return formattedHistory.flat(); +} + +function writeResponseChunk(response, data) { + response.write(`data: ${JSON.stringify(data)}\n\n`); + return; +} + +module.exports = { + handleDefaultStreamResponse, + convertToChatHistory, + convertToPromptHistory, + writeResponseChunk, +}; -- GitLab