From 2de9e492ec16ada194512e6aeac77969d69fa950 Mon Sep 17 00:00:00 2001
From: Timothy Carambat <rambat1010@gmail.com>
Date: Thu, 22 Aug 2024 13:12:09 -0700
Subject: [PATCH] Enabled use of `@agent` (and skills) via dev API calls
 (#2161)

* Use `@agent` via dev API

* Move EphemeralEventListener to same file as agent
---
 .../agents/aibitat/plugins/http-socket.js     |  87 +++++
 server/utils/agents/defaults.js               |  58 +--
 server/utils/agents/ephemeral.js              | 351 ++++++++++++++++++
 server/utils/agents/index.js                  |  24 +-
 server/utils/chats/apiChatHandler.js          | 110 ++++++
 5 files changed, 595 insertions(+), 35 deletions(-)
 create mode 100644 server/utils/agents/aibitat/plugins/http-socket.js
 create mode 100644 server/utils/agents/ephemeral.js

diff --git a/server/utils/agents/aibitat/plugins/http-socket.js b/server/utils/agents/aibitat/plugins/http-socket.js
new file mode 100644
index 000000000..30c68f694
--- /dev/null
+++ b/server/utils/agents/aibitat/plugins/http-socket.js
@@ -0,0 +1,87 @@
+const chalk = require("chalk");
+const { RetryError } = require("../error");
+const { Telemetry } = require("../../../../models/telemetry");
+
+/**
+ * HTTP Interface plugin for Aibitat to emulate a websocket interface in the agent
+ * framework so we dont have to modify the interface for passing messages and responses
+ * in REST or WSS.
+ */
+const httpSocket = {
+  name: "httpSocket",
+  startupConfig: {
+    params: {
+      handler: {
+        required: true,
+      },
+      muteUserReply: {
+        required: false,
+        default: true,
+      },
+      introspection: {
+        required: false,
+        default: true,
+      },
+    },
+  },
+  plugin: function ({
+    handler,
+    muteUserReply = true, // Do not post messages to "USER" back to frontend.
+    introspection = false, // when enabled will attach socket to Aibitat object with .introspect method which reports status updates to frontend.
+  }) {
+    return {
+      name: this.name,
+      setup(aibitat) {
+        aibitat.onError(async (error) => {
+          if (!!error?.message) {
+            console.error(chalk.red(`   error: ${error.message}`), error);
+            aibitat.introspect(
+              `Error encountered while running: ${error.message}`
+            );
+          }
+
+          if (error instanceof RetryError) {
+            console.error(chalk.red(`   retrying in 60 seconds...`));
+            setTimeout(() => {
+              aibitat.retry();
+            }, 60_000);
+            return;
+          }
+        });
+
+        aibitat.introspect = (messageText) => {
+          if (!introspection) return; // Dump thoughts when not wanted.
+          handler.send(
+            JSON.stringify({ type: "statusResponse", content: messageText })
+          );
+        };
+
+        // expose function for sockets across aibitat
+        // type param must be set or else msg will not be shown or handled in UI.
+        aibitat.socket = {
+          send: (type = "__unhandled", content = "") => {
+            handler.send(JSON.stringify({ type, content }));
+          },
+        };
+
+        // We can only receive one message response with HTTP
+        // so we end on first response.
+        aibitat.onMessage((message) => {
+          if (message.from !== "USER")
+            Telemetry.sendTelemetry("agent_chat_sent");
+          if (message.from === "USER" && muteUserReply) return;
+          handler.send(JSON.stringify(message));
+          handler.close();
+        });
+
+        aibitat.onTerminate(() => {
+          handler.close();
+        });
+      },
+    };
+  },
+};
+
+module.exports = {
+  httpSocket,
+};
diff --git a/server/utils/agents/defaults.js b/server/utils/agents/defaults.js
index 796a7bbcb..a6d30ca15 100644
--- a/server/utils/agents/defaults.js
+++ b/server/utils/agents/defaults.js
@@ -22,36 +22,48 @@ const WORKSPACE_AGENT = {
       AgentPlugins.webScraping.name, // Collector web-scraping
     ];
 
-    const _setting = (
-      await SystemSettings.get({ label: "default_agent_skills" })
-    )?.value;
-
-    safeJsonParse(_setting, []).forEach((skillName) => {
-      if (!AgentPlugins.hasOwnProperty(skillName)) return;
-
-      // This is a plugin module with many sub-children plugins who
-      // need to be named via `${parent}#${child}` naming convention
-      if (Array.isArray(AgentPlugins[skillName].plugin)) {
-        for (const subPlugin of AgentPlugins[skillName].plugin) {
-          defaultFunctions.push(
-            `${AgentPlugins[skillName].name}#${subPlugin.name}`
-          );
-        }
-        return;
-      }
-
-      // This is normal single-stage plugin
-      defaultFunctions.push(AgentPlugins[skillName].name);
-    });
-
     return {
       role: Provider.systemPrompt(provider),
-      functions: defaultFunctions,
+      functions: [
+        ...defaultFunctions,
+        ...(await agentSkillsFromSystemSettings()),
+      ],
     };
   },
 };
 
+/**
+ * Fetches and preloads the names/identifiers for plugins that will be dynamically
+ * loaded later
+ * @returns {Promise<string[]>}
+ */
+async function agentSkillsFromSystemSettings() {
+  const systemFunctions = [];
+  const _setting = (await SystemSettings.get({ label: "default_agent_skills" }))
+    ?.value;
+
+  safeJsonParse(_setting, []).forEach((skillName) => {
+    if (!AgentPlugins.hasOwnProperty(skillName)) return;
+
+    // This is a plugin module with many sub-children plugins who
+    // need to be named via `${parent}#${child}` naming convention
+    if (Array.isArray(AgentPlugins[skillName].plugin)) {
+      for (const subPlugin of AgentPlugins[skillName].plugin) {
+        systemFunctions.push(
+          `${AgentPlugins[skillName].name}#${subPlugin.name}`
+        );
+      }
+      return;
+    }
+
+    // This is normal single-stage plugin
+    systemFunctions.push(AgentPlugins[skillName].name);
+  });
+  return systemFunctions;
+}
+
 module.exports = {
   USER_AGENT,
   WORKSPACE_AGENT,
+  agentSkillsFromSystemSettings,
 };
diff --git a/server/utils/agents/ephemeral.js b/server/utils/agents/ephemeral.js
new file mode 100644
index 000000000..831fa5032
--- /dev/null
+++ b/server/utils/agents/ephemeral.js
@@ -0,0 +1,351 @@
+const AIbitat = require("./aibitat");
+const AgentPlugins = require("./aibitat/plugins");
+const { httpSocket } = require("./aibitat/plugins/http-socket.js");
+const { WorkspaceChats } = require("../../models/workspaceChats");
+const { safeJsonParse } = require("../http");
+const {
+  USER_AGENT,
+  WORKSPACE_AGENT,
+  agentSkillsFromSystemSettings,
+} = require("./defaults");
+const { AgentHandler } = require(".");
+const {
+  WorkspaceAgentInvocation,
+} = require("../../models/workspaceAgentInvocation");
+
+/**
+ * This is an instance and functional Agent handler, but it does not utilize
+ * sessions or websocket's and is instead a singular one-off agent run that does
+ * not persist between invocations
+ */
+class EphemeralAgentHandler extends AgentHandler {
+  #invocationUUID = null;
+  #workspace = null;
+  #userId = null;
+  #threadId = null;
+  #sessionId = null;
+  #prompt = null;
+  #funcsToLoad = [];
+
+  aibitat = null;
+  channel = null;
+  provider = null;
+  model = null;
+
+  constructor({
+    uuid,
+    workspace,
+    prompt,
+    userId = null,
+    threadId = null,
+    sessionId = null,
+  }) {
+    super({ uuid });
+    this.#invocationUUID = uuid;
+    this.#workspace = workspace;
+    this.#prompt = prompt;
+
+    this.#userId = userId;
+    this.#threadId = threadId;
+    this.#sessionId = sessionId;
+  }
+
+  log(text, ...args) {
+    console.log(`\x1b[36m[EphemeralAgentHandler]\x1b[0m ${text}`, ...args);
+  }
+
+  closeAlert() {
+    this.log(`End ${this.#invocationUUID}::${this.provider}:${this.model}`);
+  }
+
+  async #chatHistory(limit = 10) {
+    try {
+      const rawHistory = (
+        await WorkspaceChats.where(
+          {
+            workspaceId: this.#workspace.id,
+            user_id: this.#userId || null,
+            thread_id: this.#threadId || null,
+            api_session_id: this.#sessionId,
+            include: true,
+          },
+          limit,
+          { id: "desc" }
+        )
+      ).reverse();
+
+      const agentHistory = [];
+      rawHistory.forEach((chatLog) => {
+        agentHistory.push(
+          {
+            from: USER_AGENT.name,
+            to: WORKSPACE_AGENT.name,
+            content: chatLog.prompt,
+            state: "success",
+          },
+          {
+            from: WORKSPACE_AGENT.name,
+            to: USER_AGENT.name,
+            content: safeJsonParse(chatLog.response)?.text || "",
+            state: "success",
+          }
+        );
+      });
+      return agentHistory;
+    } catch (e) {
+      this.log("Error loading chat history", e.message);
+      return [];
+    }
+  }
+
+  /**
+   * Finds or assumes the model preference value to use for API calls.
+   * If multi-model loading is supported, we use their agent model selection of the workspace
+   * If not supported, we attempt to fallback to the system provider value for the LLM preference
+   * and if that fails - we assume a reasonable base model to exist.
+   * @returns {string} the model preference value to use in API calls
+   */
+  #fetchModel() {
+    if (!Object.keys(this.noProviderModelDefault).includes(this.provider))
+      return this.#workspace.agentModel || this.providerDefault();
+
+    // Provider has no reliable default (cant load many models) - so we need to look at system
+    // for the model param.
+    const sysModelKey = this.noProviderModelDefault[this.provider];
+    if (!!sysModelKey)
+      return process.env[sysModelKey] ?? this.providerDefault();
+
+    // If all else fails - look at the provider default list
+    return this.providerDefault();
+  }
+
+  #providerSetupAndCheck() {
+    this.provider = this.#workspace.agentProvider;
+    this.model = this.#fetchModel();
+    this.log(`Start ${this.#invocationUUID}::${this.provider}:${this.model}`);
+    this.checkSetup();
+  }
+
+  #attachPlugins(args) {
+    for (const name of this.#funcsToLoad) {
+      // Load child plugin
+      if (name.includes("#")) {
+        const [parent, childPluginName] = name.split("#");
+        if (!AgentPlugins.hasOwnProperty(parent)) {
+          this.log(
+            `${parent} is not a valid plugin. Skipping inclusion to agent cluster.`
+          );
+          continue;
+        }
+
+        const childPlugin = AgentPlugins[parent].plugin.find(
+          (child) => child.name === childPluginName
+        );
+        if (!childPlugin) {
+          this.log(
+            `${parent} does not have child plugin named ${childPluginName}. Skipping inclusion to agent cluster.`
+          );
+          continue;
+        }
+
+        const callOpts = this.parseCallOptions(
+          args,
+          childPlugin?.startupConfig?.params,
+          name
+        );
+        this.aibitat.use(childPlugin.plugin(callOpts));
+        this.log(
+          `Attached ${parent}:${childPluginName} plugin to Agent cluster`
+        );
+        continue;
+      }
+
+      // Load single-stage plugin.
+      if (!AgentPlugins.hasOwnProperty(name)) {
+        this.log(
+          `${name} is not a valid plugin. Skipping inclusion to agent cluster.`
+        );
+        continue;
+      }
+
+      const callOpts = this.parseCallOptions(
+        args,
+        AgentPlugins[name].startupConfig.params
+      );
+      const AIbitatPlugin = AgentPlugins[name];
+      this.aibitat.use(AIbitatPlugin.plugin(callOpts));
+      this.log(`Attached ${name} plugin to Agent cluster`);
+    }
+  }
+
+  async #loadAgents() {
+    // Default User agent and workspace agent
+    this.log(`Attaching user and default agent to Agent cluster.`);
+    this.aibitat.agent(USER_AGENT.name, await USER_AGENT.getDefinition());
+    this.aibitat.agent(
+      WORKSPACE_AGENT.name,
+      await WORKSPACE_AGENT.getDefinition(this.provider)
+    );
+
+    this.#funcsToLoad = [
+      AgentPlugins.docSummarizer.name,
+      AgentPlugins.webScraping.name,
+      ...(await agentSkillsFromSystemSettings()),
+    ];
+  }
+
+  async init() {
+    this.#providerSetupAndCheck();
+    return this;
+  }
+
+  async createAIbitat(
+    args = {
+      handler,
+    }
+  ) {
+    this.aibitat = new AIbitat({
+      provider: this.provider ?? "openai",
+      model: this.model ?? "gpt-4o",
+      chats: await this.#chatHistory(20),
+      handlerProps: {
+        log: this.log,
+      },
+    });
+
+    // Attach HTTP response object if defined for chunk streaming.
+    this.log(`Attached ${httpSocket.name} plugin to Agent cluster`);
+    this.aibitat.use(
+      httpSocket.plugin({
+        handler: args.handler,
+        muteUserReply: true,
+        introspection: true,
+      })
+    );
+
+    // Load required agents (Default + custom)
+    await this.#loadAgents();
+
+    // Attach all required plugins for functions to operate.
+    this.#attachPlugins(args);
+  }
+
+  startAgentCluster() {
+    return this.aibitat.start({
+      from: USER_AGENT.name,
+      to: this.channel ?? WORKSPACE_AGENT.name,
+      content: this.#prompt,
+    });
+  }
+
+  /**
+   * Determine if the message provided is an agent invocation.
+   * @param {{message:string}} parameters
+   * @returns {boolean}
+   */
+  static isAgentInvocation({ message }) {
+    const agentHandles = WorkspaceAgentInvocation.parseAgents(message);
+    if (agentHandles.length > 0) return true;
+    return false;
+  }
+}
+
+const EventEmitter = require("node:events");
+const { writeResponseChunk } = require("../helpers/chat/responses");
+
+/**
+ * This is a special EventEmitter specifically used in the Aibitat agent handler
+ * that enables us to use HTTP to relay all .introspect and .send events back to an
+ * http handler instead of websockets, like we do on the frontend. This interface is meant to
+ * mock a websocket interface for the methods used and bind them to an HTTP method so that the developer
+ * API can invoke agent calls.
+ */
+class EphemeralEventListener extends EventEmitter {
+  messages = [];
+  constructor() {
+    super();
+  }
+
+  send(jsonData) {
+    const data = JSON.parse(jsonData);
+    this.messages.push(data);
+    this.emit("chunk", data);
+  }
+
+  close() {
+    this.emit("closed");
+  }
+
+  /**
+   * Compacts all messages in class and returns them in a condensed format.
+   * @returns {{thoughts: string[], textResponse: string}}
+   */
+  packMessages() {
+    const thoughts = [];
+    let textResponse = null;
+    for (let msg of this.messages) {
+      if (msg.type !== "statusResponse") {
+        textResponse = msg.content;
+      } else {
+        thoughts.push(msg.content);
+      }
+    }
+    return { thoughts, textResponse };
+  }
+
+  /**
+   * Waits on the HTTP plugin to emit the 'closed' event from the agentHandler
+   * so that we can compact and return all the messages in the current queue.
+   * @returns {Promise<{thoughts: string[], textResponse: string}>}
+   */
+  async waitForClose() {
+    return new Promise((resolve) => {
+      this.once("closed", () => resolve(this.packMessages()));
+    });
+  }
+
+  /**
+   * Streams the events with `writeResponseChunk` over HTTP chunked encoding
+   * and returns on the close event emission.
+   * ----------
+   * DevNote: Agents do not stream so in here we are simply
+   * emitting the thoughts and text response as soon as we get them.
+   * @param {import("express").Response} response
+   * @param {string} uuid - Unique identifier that is the same across chunks.
+   * @returns {Promise<{thoughts: string[], textResponse: string}>}
+   */
+  async streamAgentEvents(response, uuid) {
+    const onChunkHandler = (data) => {
+      if (data.type === "statusResponse") {
+        return writeResponseChunk(response, {
+          id: uuid,
+          type: "agentThought",
+          thought: data.content,
+          sources: [],
+          attachments: [],
+          close: false,
+          error: null,
+        });
+      }
+
+      return writeResponseChunk(response, {
+        id: uuid,
+        type: "textResponse",
+        textResponse: data.content,
+        sources: [],
+        attachments: [],
+        close: true,
+        error: null,
+      });
+    };
+    this.on("chunk", onChunkHandler);
+
+    // Wait for close and after remove chunk listener
+    return this.waitForClose().then((closedResponse) => {
+      this.removeListener("chunk", onChunkHandler);
+      return closedResponse;
+    });
+  }
+}
+
+module.exports = { EphemeralAgentHandler, EphemeralEventListener };
diff --git a/server/utils/agents/index.js b/server/utils/agents/index.js
index b0654eae1..86563d185 100644
--- a/server/utils/agents/index.js
+++ b/server/utils/agents/index.js
@@ -10,7 +10,7 @@ const { USER_AGENT, WORKSPACE_AGENT } = require("./defaults");
 class AgentHandler {
   #invocationUUID;
   #funcsToLoad = [];
-  #noProviderModelDefault = {
+  noProviderModelDefault = {
     azure: "OPEN_MODEL_PREF",
     lmstudio: "LMSTUDIO_MODEL_PREF",
     textgenwebui: null, // does not even use `model` in API req
@@ -74,7 +74,7 @@ class AgentHandler {
     }
   }
 
-  #checkSetup() {
+  checkSetup() {
     switch (this.provider) {
       case "openai":
         if (!process.env.OPEN_AI_KEY)
@@ -163,7 +163,7 @@ class AgentHandler {
     }
   }
 
-  #providerDefault() {
+  providerDefault() {
     switch (this.provider) {
       case "openai":
         return "gpt-4o";
@@ -210,24 +210,24 @@ class AgentHandler {
    * @returns {string} the model preference value to use in API calls
    */
   #fetchModel() {
-    if (!Object.keys(this.#noProviderModelDefault).includes(this.provider))
-      return this.invocation.workspace.agentModel || this.#providerDefault();
+    if (!Object.keys(this.noProviderModelDefault).includes(this.provider))
+      return this.invocation.workspace.agentModel || this.providerDefault();
 
     // Provider has no reliable default (cant load many models) - so we need to look at system
     // for the model param.
-    const sysModelKey = this.#noProviderModelDefault[this.provider];
+    const sysModelKey = this.noProviderModelDefault[this.provider];
     if (!!sysModelKey)
-      return process.env[sysModelKey] ?? this.#providerDefault();
+      return process.env[sysModelKey] ?? this.providerDefault();
 
     // If all else fails - look at the provider default list
-    return this.#providerDefault();
+    return this.providerDefault();
   }
 
   #providerSetupAndCheck() {
     this.provider = this.invocation.workspace.agentProvider;
     this.model = this.#fetchModel();
     this.log(`Start ${this.#invocationUUID}::${this.provider}:${this.model}`);
-    this.#checkSetup();
+    this.checkSetup();
   }
 
   async #validInvocation() {
@@ -239,7 +239,7 @@ class AgentHandler {
     this.invocation = invocation ?? null;
   }
 
-  #parseCallOptions(args, config = {}, pluginName) {
+  parseCallOptions(args, config = {}, pluginName) {
     const callOpts = {};
     for (const [param, definition] of Object.entries(config)) {
       if (
@@ -280,7 +280,7 @@ class AgentHandler {
           continue;
         }
 
-        const callOpts = this.#parseCallOptions(
+        const callOpts = this.parseCallOptions(
           args,
           childPlugin?.startupConfig?.params,
           name
@@ -300,7 +300,7 @@ class AgentHandler {
         continue;
       }
 
-      const callOpts = this.#parseCallOptions(
+      const callOpts = this.parseCallOptions(
         args,
         AgentPlugins[name].startupConfig.params
       );
diff --git a/server/utils/chats/apiChatHandler.js b/server/utils/chats/apiChatHandler.js
index bce341bac..3fa475ca1 100644
--- a/server/utils/chats/apiChatHandler.js
+++ b/server/utils/chats/apiChatHandler.js
@@ -4,6 +4,11 @@ const { WorkspaceChats } = require("../../models/workspaceChats");
 const { getVectorDbClass, getLLMProvider } = require("../helpers");
 const { writeResponseChunk } = require("../helpers/chat/responses");
 const { chatPrompt, sourceIdentifier, recentChatHistory } = require("./index");
+const {
+  EphemeralAgentHandler,
+  EphemeralEventListener,
+} = require("../agents/ephemeral");
+const { Telemetry } = require("../../models/telemetry");
 
 /**
  * @typedef ResponseObject
@@ -37,6 +42,59 @@ async function chatSync({
 }) {
   const uuid = uuidv4();
   const chatMode = mode ?? "chat";
+
+  if (EphemeralAgentHandler.isAgentInvocation({ message })) {
+    await Telemetry.sendTelemetry("agent_chat_started");
+
+    // Initialize the EphemeralAgentHandler to handle non-continuous
+    // conversations with agents since this is over REST.
+    const agentHandler = new EphemeralAgentHandler({
+      uuid,
+      workspace,
+      prompt: message,
+      userId: user?.id || null,
+      threadId: thread?.id || null,
+      sessionId,
+    });
+
+    // Establish event listener that emulates websocket calls
+    // in Aibitat so that we can keep the same interface in Aibitat
+    // but use HTTP.
+    const eventListener = new EphemeralEventListener();
+    await agentHandler.init();
+    await agentHandler.createAIbitat({ handler: eventListener });
+    agentHandler.startAgentCluster();
+
+    // The cluster has started and now we wait for close event since
+    // this is a synchronous call for an agent, so we return everything at once.
+    // After this, we conclude the call as we normally do.
+    return await eventListener
+      .waitForClose()
+      .then(async ({ thoughts, textResponse }) => {
+        await WorkspaceChats.new({
+          workspaceId: workspace.id,
+          prompt: String(message),
+          response: {
+            text: textResponse,
+            sources: [],
+            type: chatMode,
+            thoughts,
+          },
+          include: false,
+          apiSessionId: sessionId,
+        });
+        return {
+          id: uuid,
+          type: "textResponse",
+          sources: [],
+          close: true,
+          error: null,
+          textResponse,
+          thoughts,
+        };
+      });
+  }
+
   const LLMConnector = getLLMProvider({
     provider: workspace?.chatProvider,
     model: workspace?.chatModel,
@@ -257,6 +315,58 @@ async function streamChat({
 }) {
   const uuid = uuidv4();
   const chatMode = mode ?? "chat";
+
+  if (EphemeralAgentHandler.isAgentInvocation({ message })) {
+    await Telemetry.sendTelemetry("agent_chat_started");
+
+    // Initialize the EphemeralAgentHandler to handle non-continuous
+    // conversations with agents since this is over REST.
+    const agentHandler = new EphemeralAgentHandler({
+      uuid,
+      workspace,
+      prompt: message,
+      userId: user?.id || null,
+      threadId: thread?.id || null,
+      sessionId,
+    });
+
+    // Establish event listener that emulates websocket calls
+    // in Aibitat so that we can keep the same interface in Aibitat
+    // but use HTTP.
+    const eventListener = new EphemeralEventListener();
+    await agentHandler.init();
+    await agentHandler.createAIbitat({ handler: eventListener });
+    agentHandler.startAgentCluster();
+
+    // The cluster has started and now we wait for close event since
+    // and stream back any results we get from agents as they come in.
+    return eventListener
+      .streamAgentEvents(response, uuid)
+      .then(async ({ thoughts, textResponse }) => {
+        console.log({ thoughts, textResponse });
+        await WorkspaceChats.new({
+          workspaceId: workspace.id,
+          prompt: String(message),
+          response: {
+            text: textResponse,
+            sources: [],
+            type: chatMode,
+            thoughts,
+          },
+          include: false,
+          apiSessionId: sessionId,
+        });
+        writeResponseChunk(response, {
+          uuid,
+          type: "finalizeResponseStream",
+          textResponse,
+          thoughts,
+          close: true,
+          error: false,
+        });
+      });
+  }
+
   const LLMConnector = getLLMProvider({
     provider: workspace?.chatProvider,
     model: workspace?.chatModel,
-- 
GitLab