From f5bb064dee7c9c7c28f4071051f7247dbc42a79f Mon Sep 17 00:00:00 2001
From: Timothy Carambat <rambat1010@gmail.com>
Date: Tue, 16 Jan 2024 10:37:46 -0800
Subject: [PATCH] Implement streaming for workspace chats via API (#604)

---
 server/endpoints/api/workspace/index.js | 148 +++++++++++++++++++++++-
 server/endpoints/chat.js                |  15 +++
 server/swagger/openapi.json             |  99 ++++++++++++++++
 server/utils/chats/stream.js            |   2 +
 4 files changed, 263 insertions(+), 1 deletion(-)

diff --git a/server/endpoints/api/workspace/index.js b/server/endpoints/api/workspace/index.js
index ffead3adb..365e8b014 100644
--- a/server/endpoints/api/workspace/index.js
+++ b/server/endpoints/api/workspace/index.js
@@ -11,6 +11,11 @@ const {
 const { getVectorDbClass } = require("../../../utils/helpers");
 const { multiUserMode, reqBody } = require("../../../utils/http");
 const { validApiKey } = require("../../../utils/middleware/validApiKey");
+const {
+  streamChatWithWorkspace,
+  writeResponseChunk,
+  VALID_CHAT_MODE,
+} = require("../../../utils/chats/stream");
 
 function apiWorkspaceEndpoints(app) {
   if (!app) return;
@@ -483,7 +488,28 @@ function apiWorkspaceEndpoints(app) {
         const workspace = await Workspace.get({ slug });
 
         if (!workspace) {
-          response.sendStatus(400).end();
+          response.status(400).json({
+            id: uuidv4(),
+            type: "abort",
+            textResponse: null,
+            sources: [],
+            close: true,
+            error: `Workspace ${slug} is not a valid workspace.`,
+          });
+          return;
+        }
+
+        if (!message?.length || !VALID_CHAT_MODE.includes(mode)) {
+          response.status(400).json({
+            id: uuidv4(),
+            type: "abort",
+            textResponse: null,
+            sources: [],
+            close: true,
+            error: !message?.length
+              ? "message parameter cannot be empty."
+              : `${mode} is not a valid mode.`,
+          });
           return;
         }
 
@@ -506,6 +532,126 @@ function apiWorkspaceEndpoints(app) {
       }
     }
   );
+
+  app.post(
+    "/v1/workspace/:slug/stream-chat",
+    [validApiKey],
+    async (request, response) => {
+      /*
+   #swagger.tags = ['Workspaces']
+   #swagger.description = 'Execute a streamable chat with a workspace'
+   #swagger.requestBody = {
+       description: 'Send a prompt to the workspace and the type of conversation (query or chat).<br/><b>Query:</b> Will not use LLM unless there are relevant sources from vectorDB & does not recall chat history.<br/><b>Chat:</b> Uses LLM general knowledge w/custom embeddings to produce output, uses rolling chat history.',
+       required: true,
+       type: 'object',
+       content: {
+         "application/json": {
+           example: {
+             message: "What is AnythingLLM?",
+             mode: "query | chat"
+           }
+         }
+       }
+     }
+   #swagger.responses[200] = {
+     content: {
+       "text/event-stream": {
+         schema: {
+           type: 'array',
+           example: [
+            {
+              id: 'uuid-123',
+              type: "abort | textResponseChunk",
+              textResponse: "First chunk",
+              sources: [],
+              close: false,
+              error: "null | text string of the failure mode."
+            },
+            {
+              id: 'uuid-123',
+              type: "abort | textResponseChunk",
+              textResponse: "chunk two",
+              sources: [],
+              close: false,
+              error: "null | text string of the failure mode."
+            },
+             {
+              id: 'uuid-123',
+              type: "abort | textResponseChunk",
+              textResponse: "final chunk of LLM output!",
+              sources: [{title: "anythingllm.txt", chunk: "This is a context chunk used in the answer of the prompt by the LLM. This will only return in the final chunk."}],
+              close: true,
+              error: "null | text string of the failure mode."
+            }
+          ]
+         }
+       }
+     }
+   }
+   #swagger.responses[403] = {
+     schema: {
+       "$ref": "#/definitions/InvalidAPIKey"
+     }
+   }
+   */
+      try {
+        const { slug } = request.params;
+        const { message, mode = "query" } = reqBody(request);
+        const workspace = await Workspace.get({ slug });
+
+        if (!workspace) {
+          response.status(400).json({
+            id: uuidv4(),
+            type: "abort",
+            textResponse: null,
+            sources: [],
+            close: true,
+            error: `Workspace ${slug} is not a valid workspace.`,
+          });
+          return;
+        }
+
+        if (!message?.length || !VALID_CHAT_MODE.includes(mode)) {
+          response.status(400).json({
+            id: uuidv4(),
+            type: "abort",
+            textResponse: null,
+            sources: [],
+            close: true,
+            error: !message?.length
+              ? "Message is empty"
+              : `${mode} is not a valid mode.`,
+          });
+          return;
+        }
+
+        response.setHeader("Cache-Control", "no-cache");
+        response.setHeader("Content-Type", "text/event-stream");
+        response.setHeader("Access-Control-Allow-Origin", "*");
+        response.setHeader("Connection", "keep-alive");
+        response.flushHeaders();
+
+        await streamChatWithWorkspace(response, workspace, message, mode);
+        await Telemetry.sendTelemetry("sent_chat", {
+          LLMSelection: process.env.LLM_PROVIDER || "openai",
+          Embedder: process.env.EMBEDDING_ENGINE || "inherit",
+          VectorDbSelection: process.env.VECTOR_DB || "pinecone",
+        });
+        response.end();
+      } catch (e) {
+        console.error(e);
+        writeResponseChunk(response, {
+          id: uuidv4(),
+          type: "abort",
+          textResponse: null,
+          sources: [],
+          close: true,
+          error: e.message,
+        });
+        response.end();
+      }
+    }
+  );
 }
 
 module.exports = { apiWorkspaceEndpoints };
diff --git a/server/endpoints/chat.js b/server/endpoints/chat.js
index 79fc10132..adfec0ec3 100644
--- a/server/endpoints/chat.js
+++ b/server/endpoints/chat.js
@@ -8,6 +8,7 @@ const { Telemetry } = require("../models/telemetry");
 const {
   streamChatWithWorkspace,
   writeResponseChunk,
+  VALID_CHAT_MODE,
 } = require("../utils/chats/stream");
 
 function chatEndpoints(app) {
@@ -31,6 +32,20 @@ function chatEndpoints(app) {
           return;
         }
 
+        if (!message?.length || !VALID_CHAT_MODE.includes(mode)) {
+          response.status(400).json({
+            id: uuidv4(),
+            type: "abort",
+            textResponse: null,
+            sources: [],
+            close: true,
+            error: !message?.length
+              ? "Message is empty."
+              : `${mode} is not a valid mode.`,
+          });
+          return;
+        }
+
         response.setHeader("Cache-Control", "no-cache");
         response.setHeader("Content-Type", "text/event-stream");
         response.setHeader("Access-Control-Allow-Origin", "*");
diff --git a/server/swagger/openapi.json b/server/swagger/openapi.json
index 7b675c44b..e7b07484a 100644
--- a/server/swagger/openapi.json
+++ b/server/swagger/openapi.json
@@ -1612,6 +1612,105 @@
         }
       }
     },
+    "/v1/workspace/{slug}/stream-chat": {
+      "post": {
+        "tags": [
+          "Workspaces"
+        ],
+        "description": "Execute a streamable chat with a workspace",
+        "parameters": [
+          {
+            "name": "slug",
+            "in": "path",
+            "required": true,
+            "schema": {
+              "type": "string"
+            }
+          },
+          {
+            "name": "Authorization",
+            "in": "header",
+            "schema": {
+              "type": "string"
+            }
+          }
+        ],
+        "responses": {
+          "200": {
+            "content": {
+              "text/event-stream": {
+                "schema": {
+                  "type": "array",
+                  "example": [
+                    {
+                      "id": "uuid-123",
+                      "type": "abort | textResponseChunk",
+                      "textResponse": "First chunk",
+                      "sources": [],
+                      "close": false,
+                      "error": "null | text string of the failure mode."
+                    },
+                    {
+                      "id": "uuid-123",
+                      "type": "abort | textResponseChunk",
+                      "textResponse": "chunk two",
+                      "sources": [],
+                      "close": false,
+                      "error": "null | text string of the failure mode."
+                    },
+                    {
+                      "id": "uuid-123",
+                      "type": "abort | textResponseChunk",
+                      "textResponse": "final chunk of LLM output!",
+                      "sources": [
+                        {
+                          "title": "anythingllm.txt",
+                          "chunk": "This is a context chunk used in the answer of the prompt by the LLM. This will only return in the final chunk."
+                        }
+                      ],
+                      "close": true,
+                      "error": "null | text string of the failure mode."
+                    }
+                  ]
+                }
+              }
+            },
+            "description": "OK"
+          },
+          "400": {
+            "description": "Bad Request"
+          },
+          "403": {
+            "description": "Forbidden",
+            "content": {
+              "application/json": {
+                "schema": {
+                  "$ref": "#/components/schemas/InvalidAPIKey"
+                }
+              },
+              "application/xml": {
+                "schema": {
+                  "$ref": "#/components/schemas/InvalidAPIKey"
+                }
+              }
+            }
+          }
+        },
+        "requestBody": {
+          "description": "Send a prompt to the workspace and the type of conversation (query or chat).<br/><b>Query:</b> Will not use LLM unless there are relevant sources from vectorDB & does not recall chat history.<br/><b>Chat:</b> Uses LLM general knowledge w/custom embeddings to produce output, uses rolling chat history.",
+          "required": true,
+          "type": "object",
+          "content": {
+            "application/json": {
+              "example": {
+                "message": "What is AnythingLLM?",
+                "mode": "query | chat"
+              }
+            }
+          }
+        }
+      }
+    },
     "/v1/system/env-dump": {
       "get": {
         "tags": [
diff --git a/server/utils/chats/stream.js b/server/utils/chats/stream.js
index 11d4effd7..04bb72b90 100644
--- a/server/utils/chats/stream.js
+++ b/server/utils/chats/stream.js
@@ -8,6 +8,7 @@ const {
   chatPrompt,
 } = require(".");
 
+const VALID_CHAT_MODE = ["chat", "query"];
 function writeResponseChunk(response, data) {
   response.write(`data: ${JSON.stringify(data)}\n\n`);
   return;
@@ -503,6 +504,7 @@ function handleStreamResponses(response, stream, responseProps) {
 }
 
 module.exports = {
+  VALID_CHAT_MODE,
   streamChatWithWorkspace,
   writeResponseChunk,
 };
-- 
GitLab