timothycarambat authoredtimothycarambat authored
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
ephemeral.js 13.51 KiB
const AIbitat = require("./aibitat");
const AgentPlugins = require("./aibitat/plugins");
const ImportedPlugin = require("./imported");
const { httpSocket } = require("./aibitat/plugins/http-socket.js");
const { WorkspaceChats } = require("../../models/workspaceChats");
const { safeJsonParse } = require("../http");
const {
} = require("./defaults");
const { AgentHandler } = require(".");
const {
} = require("../../models/workspaceAgentInvocation");
* This is an instance and functional Agent handler, but it does not utilize
* sessions or websocket's and is instead a singular one-off agent run that does
* not persist between invocations
class EphemeralAgentHandler extends AgentHandler {
/** @type {string|null} the unique identifier for the agent invocation */
#invocationUUID = null;
/** @type {import("@prisma/client").workspaces|null} the workspace to use for the agent */
#workspace = null;
/** @type {import("@prisma/client").users|null} the user id to use for the agent */
#userId = null;
/** @type {import("@prisma/client").workspace_threads|null} the workspace thread id to use for the agent */
#threadId = null;
/** @type {string|null} the session id to use for the agent */
#sessionId = null;
/** @type {string|null} the prompt to use for the agent */
#prompt = null;
/** @type {string[]} the functions to load into the agent (Aibitat plugins) */
#funcsToLoad = [];
/** @type {AIbitat|null} */
aibitat = null;
/** @type {string|null} */
channel = null;
/** @type {string|null} */
provider = null;
/** @type {string|null} the model to use for the agent */
model = null;
* @param {{
* uuid: string,
* workspace: import("@prisma/client").workspaces,
* prompt: string,
* userId: import("@prisma/client").users["id"]|null,
* threadId: import("@prisma/client").workspace_threads["id"]|null,
* sessionId: string|null
* }} parameters
userId = null,
threadId = null,
sessionId = null,
}) {
super({ uuid });
this.#invocationUUID = uuid;
this.#workspace = workspace;
this.#prompt = prompt;
this.#userId = userId;
this.#threadId = threadId;
this.#sessionId = sessionId;
log(text, ...args) {
console.log(`\x1b[36m[EphemeralAgentHandler]\x1b[0m ${text}`, ...args);
closeAlert() {
this.log(`End ${this.#invocationUUID}::${this.provider}:${this.model}`);
async #chatHistory(limit = 10) {
try {
const rawHistory = (
await WorkspaceChats.where(
workspaceId: this.#workspace.id,
user_id: this.#userId || null,
thread_id: this.#threadId || null,
api_session_id: this.#sessionId,
include: true,
{ id: "desc" }
const agentHistory = [];
rawHistory.forEach((chatLog) => {
from: USER_AGENT.name,
content: chatLog.prompt,
state: "success",
to: USER_AGENT.name,
content: safeJsonParse(chatLog.response)?.text || "",
state: "success",
return agentHistory;
} catch (e) {
this.log("Error loading chat history", e.message);
return [];
* Attempts to find a fallback provider and model to use if the workspace
* does not have an explicit `agentProvider` and `agentModel` set.
* 1. Fallback to the workspace `chatProvider` and `chatModel` if they exist.
* 2. Fallback to the system `LLM_PROVIDER` and try to load the the associated default model via ENV params or a base available model.
* 3. Otherwise, return null - will likely throw an error the user can act on.
* @returns {object|null} - An object with provider and model keys.
#getFallbackProvider() {
// First, fallback to the workspace chat provider and model if they exist
if (this.#workspace.chatProvider && this.#workspace.chatModel) {
return {
provider: this.#workspace.chatProvider,
model: this.#workspace.chatModel,
// If workspace does not have chat provider and model fallback
// to system provider and try to load provider default model
const systemProvider = process.env.LLM_PROVIDER;
const systemModel = this.providerDefault(systemProvider);
if (systemProvider && systemModel) {
return {
provider: systemProvider,
model: systemModel,
return null;
* Finds or assumes the model preference value to use for API calls.
* If multi-model loading is supported, we use their agent model selection of the workspace
* If not supported, we attempt to fallback to the system provider value for the LLM preference
* and if that fails - we assume a reasonable base model to exist.
* @returns {string|null} the model preference value to use in API calls
#fetchModel() {
// Provider was not explicitly set for workspace, so we are going to run our fallback logic
// that will set a provider and model for us to use.
if (!this.provider) {
const fallback = this.#getFallbackProvider();
if (!fallback) throw new Error("No valid provider found for the agent.");
this.provider = fallback.provider; // re-set the provider to the fallback provider so it is not null.
return fallback.model; // set its defined model based on fallback logic.
// The provider was explicitly set, so check if the workspace has an agent model set.
if (this.#workspace.agentModel) return this.#workspace.agentModel;
// Otherwise, we have no model to use - so guess a default model to use via the provider
// and it's system ENV params and if that fails - we return either a base model or null.
return this.providerDefault();
#providerSetupAndCheck() {
this.provider = this.#workspace.agentProvider ?? null;
this.model = this.#fetchModel();
if (!this.provider)
throw new Error("No valid provider found for the agent.");
this.log(`Start ${this.#invocationUUID}::${this.provider}:${this.model}`);
#attachPlugins(args) {
for (const name of this.#funcsToLoad) {
// Load child plugin
if (name.includes("#")) {
const [parent, childPluginName] = name.split("#");
if (!AgentPlugins.hasOwnProperty(parent)) {
`${parent} is not a valid plugin. Skipping inclusion to agent cluster.`
const childPlugin = AgentPlugins[parent].plugin.find(
(child) => child.name === childPluginName
if (!childPlugin) {
`${parent} does not have child plugin named ${childPluginName}. Skipping inclusion to agent cluster.`
const callOpts = this.parseCallOptions(
`Attached ${parent}:${childPluginName} plugin to Agent cluster`
// Load imported plugin. This is marked by `@@` in the array of functions to load.
// and is the @@hubID of the plugin.
if (name.startsWith("@@")) {
const hubId = name.replace("@@", "");
const valid = ImportedPlugin.validateImportedPluginHandler(hubId);
if (!valid) {
`Imported plugin by hubId ${hubId} not found in plugin directory. Skipping inclusion to agent cluster.`
const plugin = ImportedPlugin.loadPluginByHubId(hubId);
const callOpts = plugin.parseCallOptions();
`Attached ${plugin.name} (${hubId}) imported plugin to Agent cluster`
// Load single-stage plugin.
if (!AgentPlugins.hasOwnProperty(name)) {
`${name} is not a valid plugin. Skipping inclusion to agent cluster.`
const callOpts = this.parseCallOptions(
const AIbitatPlugin = AgentPlugins[name];
this.log(`Attached ${name} plugin to Agent cluster`);
async #loadAgents() {
// Default User agent and workspace agent
this.log(`Attaching user and default agent to Agent cluster.`);
this.aibitat.agent(USER_AGENT.name, await USER_AGENT.getDefinition());
await WORKSPACE_AGENT.getDefinition(this.provider)
this.#funcsToLoad = [
...(await agentSkillsFromSystemSettings()),
...(await ImportedPlugin.activeImportedPlugins()),
async init() {
return this;
async createAIbitat(
args = {
) {
this.aibitat = new AIbitat({
provider: this.provider ?? "openai",
model: this.model ?? "gpt-4o",
chats: await this.#chatHistory(20),
handlerProps: {
invocation: {
workspace: this.#workspace,
workspace_id: this.#workspace.id,
log: this.log,
// Attach HTTP response object if defined for chunk streaming.
this.log(`Attached ${httpSocket.name} plugin to Agent cluster`);
handler: args.handler,
muteUserReply: true,
introspection: true,
// Load required agents (Default + custom)
await this.#loadAgents();
// Attach all required plugins for functions to operate.
startAgentCluster() {
return this.aibitat.start({
from: USER_AGENT.name,
to: this.channel ?? WORKSPACE_AGENT.name,
content: this.#prompt,
* Determine if the message provided is an agent invocation.
* @param {{message:string}} parameters
* @returns {boolean}
static isAgentInvocation({ message }) {
const agentHandles = WorkspaceAgentInvocation.parseAgents(message);
if (agentHandles.length > 0) return true;
return false;
const EventEmitter = require("node:events");
const { writeResponseChunk } = require("../helpers/chat/responses");
* This is a special EventEmitter specifically used in the Aibitat agent handler
* that enables us to use HTTP to relay all .introspect and .send events back to an
* http handler instead of websockets, like we do on the frontend. This interface is meant to
* mock a websocket interface for the methods used and bind them to an HTTP method so that the developer
* API can invoke agent calls.
class EphemeralEventListener extends EventEmitter {
messages = [];
constructor() {
send(jsonData) {
const data = JSON.parse(jsonData);
this.emit("chunk", data);
close() {
* Compacts all messages in class and returns them in a condensed format.
* @returns {{thoughts: string[], textResponse: string}}
packMessages() {
const thoughts = [];
let textResponse = null;
for (let msg of this.messages) {
if (msg.type !== "statusResponse") {
textResponse = msg.content;
} else {
return { thoughts, textResponse };
* Waits on the HTTP plugin to emit the 'closed' event from the agentHandler
* so that we can compact and return all the messages in the current queue.
* @returns {Promise<{thoughts: string[], textResponse: string}>}
async waitForClose() {
return new Promise((resolve) => {
this.once("closed", () => resolve(this.packMessages()));
* Streams the events with `writeResponseChunk` over HTTP chunked encoding
* and returns on the close event emission.
* ----------
* DevNote: Agents do not stream so in here we are simply
* emitting the thoughts and text response as soon as we get them.
* @param {import("express").Response} response
* @param {string} uuid - Unique identifier that is the same across chunks.
* @returns {Promise<{thoughts: string[], textResponse: string}>}
async streamAgentEvents(response, uuid) {
const onChunkHandler = (data) => {
if (data.type === "statusResponse") {
return writeResponseChunk(response, {
id: uuid,
type: "agentThought",
thought: data.content,
sources: [],
attachments: [],
close: false,
error: null,
return writeResponseChunk(response, {
id: uuid,
type: "textResponse",
textResponse: data.content,
sources: [],
attachments: [],
close: true,
error: null,
this.on("chunk", onChunkHandler);
// Wait for close and after remove chunk listener
return this.waitForClose().then((closedResponse) => {
this.removeListener("chunk", onChunkHandler);
return closedResponse;
module.exports = { EphemeralAgentHandler, EphemeralEventListener };