diff --git a/packages/server-next/next/index.ts b/packages/server-next/next/index.ts new file mode 100644 index 0000000000000000000000000000000000000000..364a925708ce78c5848761474dba363584df841b --- /dev/null +++ b/packages/server-next/next/index.ts @@ -0,0 +1 @@ +export * from "./server"; diff --git a/packages/server/package.json b/packages/server/package.json deleted file mode 100644 index 459abb782305bac7f3943b046e37ca205e0319dc..0000000000000000000000000000000000000000 --- a/packages/server/package.json +++ /dev/null @@ -1,51 +0,0 @@ -{ - "name": "@llamaindex/server", - "description": "LlamaIndex Server", - "version": "0.0.1", - "type": "module", - "main": "./dist/index.cjs", - "module": "./dist/index.js", - "exports": { - ".": { - "require": { - "types": "./dist/index.d.cts", - "default": "./dist/index.cjs" - }, - "import": { - "types": "./dist/index.d.ts", - "default": "./dist/index.js" - } - } - }, - "files": [ - "dist" - ], - "repository": { - "type": "git", - "url": "git+https://github.com/run-llama/LlamaIndexTS.git", - "directory": "packages/server" - }, - "scripts": { - "build": "bunchee", - "dev": "bunchee --watch", - "test": "vitest run" - }, - "devDependencies": { - "bunchee": "6.4.0", - "vitest": "^2.1.5", - "@types/node": "^22.9.0", - "@types/express": "^4.17.21" - }, - "dependencies": { - "@llamaindex/core": "workspace:*", - "@llamaindex/env": "workspace:*", - "@llamaindex/tools": "workspace:*", - "llamaindex": "workspace:*", - "ai": "^4.0.3", - "@llamaindex/openai": "^0.1.52", - "@llamaindex/readers": "^2.0.0" - }, - "peerDependencies": { - "express": "^4.18.2" - } -} diff --git a/packages/server/src/helper.ts b/packages/server/src/helper.ts deleted file mode 100644 index 88e30256f7f4e9cc4cdbba2b43a15c23974b71aa..0000000000000000000000000000000000000000 --- a/packages/server/src/helper.ts +++ /dev/null @@ -1,25 +0,0 @@ -import type { Message } from "ai"; -import type { Response as ExpressResponse } from "express"; - -export async function pipeExpressResponse( - expressResponse: ExpressResponse, - streamResponse: Response, -) { - if (!streamResponse.body) return; - const reader = streamResponse.body.getReader(); - while (true) { - const { done, value } = await reader.read(); - if (done) return expressResponse.end(); - expressResponse.write(value); - } -} - -export function getUserMessageContent(messages: Message[]): string { - const userMessageContent = messages[messages.length - 1]?.content; - if (!userMessageContent) { - throw new Error( - "Messages are required and the last message must be from the user", - ); - } - return userMessageContent; -} diff --git a/packages/server/src/index.ts b/packages/server/src/index.ts deleted file mode 100644 index 14aebb5ec478c66228eadb3bed3e31914ed970bc..0000000000000000000000000000000000000000 --- a/packages/server/src/index.ts +++ /dev/null @@ -1,4 +0,0 @@ -export * from "./server"; - -export * from "./workflow/tools"; // helper functions for tools -export * from "./workflow/type"; // type definitions for the workflow diff --git a/packages/server/src/server.ts b/packages/server/src/server.ts deleted file mode 100644 index 338a17ad610004892ae6116c9812dec3bfc74036..0000000000000000000000000000000000000000 --- a/packages/server/src/server.ts +++ /dev/null @@ -1,48 +0,0 @@ -import express from "express"; -import type { ChatMessage } from "llamaindex"; -import { pipeExpressResponse } from "./helper"; -import { chatWithWorkflow } from "./workflow/stream"; -import type { ServerWorkflow } from "./workflow/type"; - -export interface LlamaIndexServerParams { - workflow: ServerWorkflow; - port?: number; -} - -export class LlamaIndexServer { - app: express.Application; - workflow: ServerWorkflow; - port: number; - - constructor({ workflow, port = 3000 }: LlamaIndexServerParams) { - this.app = express(); - this.workflow = workflow; - this.port = port; - this.setupRoutes(); - } - - public chatController = async ( - req: express.Request, - res: express.Response, - ) => { - try { - const { messages } = req.body as { messages: ChatMessage[] }; - const streamResponse = await chatWithWorkflow(this.workflow, messages); - await pipeExpressResponse(res, streamResponse); - } catch (error) { - console.error("Chat error:", error); - res.status(500).json({ error: "Internal server error" }); - } - }; - - protected setupRoutes() { - this.app.use(express.json()); - this.app.post("/api/chat", this.chatController); - } - - public start() { - this.app.listen(this.port, () => { - console.log(`LlamaIndex server running on port ${this.port}`); - }); - } -} diff --git a/packages/server/src/workflow/stream.ts b/packages/server/src/workflow/stream.ts deleted file mode 100644 index 6996d8d467eb75d3cf92258ef8e80f81abbdc869..0000000000000000000000000000000000000000 --- a/packages/server/src/workflow/stream.ts +++ /dev/null @@ -1,84 +0,0 @@ -import { LlamaIndexAdapter, StreamData, type JSONValue } from "ai"; -import { - EngineResponse, - StopEvent, - Workflow, - WorkflowContext, - WorkflowEvent, - type ChatMessage, - type ChatResponseChunk, -} from "llamaindex"; -import { ReadableStream } from "stream/web"; -import { AgentRunEvent, type AgentInput } from "./type"; - -export async function chatWithWorkflow( - workflow: Workflow<null, AgentInput, ChatResponseChunk>, - messages: ChatMessage[], -): Promise<Response> { - const context = workflow.run({ messages }); - const { stream, dataStream } = await createStreamFromWorkflowContext(context); - const response = LlamaIndexAdapter.toDataStreamResponse(stream, { - data: dataStream, - }); - return response; -} - -async function createStreamFromWorkflowContext<Input, Output, Context>( - context: WorkflowContext<Input, Output, Context>, -): Promise<{ stream: ReadableStream<EngineResponse>; dataStream: StreamData }> { - const dataStream = new StreamData(); - let generator: AsyncGenerator<ChatResponseChunk> | undefined; - - const closeStreams = (controller: ReadableStreamDefaultController) => { - controller.close(); - dataStream.close(); - }; - - const stream = new ReadableStream<EngineResponse>({ - async start(controller) { - // Kickstart the stream by sending an empty string - controller.enqueue({ delta: "" } as EngineResponse); - }, - async pull(controller) { - while (!generator) { - // get next event from workflow context - const { value: event, done } = - await context[Symbol.asyncIterator]().next(); - if (done) { - closeStreams(controller); - return; - } - generator = handleEvent(event, dataStream); - } - - const { value: chunk, done } = await generator.next(); - if (done) { - closeStreams(controller); - return; - } - const delta = chunk.delta ?? ""; - if (delta) { - controller.enqueue({ delta } as EngineResponse); - } - }, - }); - - return { stream, dataStream }; -} - -function handleEvent( - event: WorkflowEvent<unknown>, - dataStream: StreamData, -): AsyncGenerator<ChatResponseChunk> | undefined { - // Handle for StopEvent - if (event instanceof StopEvent) { - return event.data as AsyncGenerator<ChatResponseChunk>; - } - // Handle for AgentRunEvent - if (event instanceof AgentRunEvent) { - dataStream.appendMessageAnnotation({ - type: "agent", - data: event.data as JSONValue, - }); - } -} diff --git a/packages/server/src/workflow/tools.ts b/packages/server/src/workflow/tools.ts deleted file mode 100644 index 4e46336616efb52bc9c94ada10ab72d1ce3cf2f1..0000000000000000000000000000000000000000 --- a/packages/server/src/workflow/tools.ts +++ /dev/null @@ -1,294 +0,0 @@ -import { - type BaseToolWithCall, - callTool, - type ChatMessage, - type ChatResponse, - type ChatResponseChunk, - type HandlerContext, - type PartialToolCall, - type ToolCall, - ToolCallLLM, - type ToolCallLLMMessageOptions, -} from "llamaindex"; -import crypto from "node:crypto"; -import { AgentRunEvent } from "./type"; - -/** - * Call multiple tools and return the tool messages - */ -export const callTools = async <T>({ - tools, - toolCalls, - ctx, - agentName, - writeEvent = true, -}: { - toolCalls: ToolCall[]; - tools: BaseToolWithCall[]; - ctx: HandlerContext<T>; - agentName: string; - writeEvent?: boolean; -}): Promise<ChatMessage[]> => { - const toolMsgs: ChatMessage[] = []; - if (toolCalls.length === 0) { - return toolMsgs; - } - if (toolCalls.length === 1 && toolCalls[0]) { - const tool = tools.find( - (tool) => tool.metadata.name === toolCalls[0]!.name, - ); - if (!tool) { - throw new Error(`Tool ${toolCalls[0].name} not found`); - } - return [ - await callSingleTool( - tool, - toolCalls[0], - writeEvent - ? (msg: string) => { - ctx.sendEvent( - new AgentRunEvent({ - agent: agentName, - text: msg, - type: "text", - }), - ); - } - : undefined, - ), - ]; - } - // Multiple tool calls, show events in progress - const progressId = crypto.randomUUID(); - const totalSteps = toolCalls.length; - let currentStep = 0; - for (const toolCall of toolCalls) { - const tool = tools.find((tool) => tool.metadata.name === toolCall.name); - if (!tool) { - throw new Error(`Tool ${toolCall.name} not found`); - } - const toolMsg = await callSingleTool(tool, toolCall, (msg: string) => { - ctx.sendEvent( - new AgentRunEvent({ - agent: agentName, - text: msg, - type: "progress", - data: { - id: progressId, - total: totalSteps, - current: currentStep, - }, - }), - ); - currentStep++; - }); - toolMsgs.push(toolMsg); - } - return toolMsgs; -}; - -export const callSingleTool = async ( - tool: BaseToolWithCall, - toolCall: ToolCall, - eventEmitter?: (msg: string) => void, -): Promise<ChatMessage> => { - if (eventEmitter) { - eventEmitter( - `Calling tool ${toolCall.name} with input: ${JSON.stringify(toolCall.input)}`, - ); - } - - const toolOutput = await callTool(tool, toolCall, { - log: () => {}, - error: (...args: unknown[]) => { - console.error(`Tool ${toolCall.name} got error:`, ...args); - if (eventEmitter) { - eventEmitter(`Tool ${toolCall.name} got error: ${args.join(" ")}`); - } - return { - content: JSON.stringify({ - error: args.join(" "), - }), - role: "user", - options: { - toolResult: { - id: toolCall.id, - result: JSON.stringify({ - error: args.join(" "), - }), - isError: true, - }, - }, - }; - }, - warn: () => {}, - }); - - return { - content: JSON.stringify(toolOutput.output), - role: "user", - options: { - toolResult: { - result: toolOutput.output, - isError: toolOutput.isError, - id: toolCall.id, - }, - }, - }; -}; - -class ChatWithToolsResponse { - toolCalls: ToolCall[]; - toolCallMessage?: ChatMessage; - responseGenerator?: AsyncGenerator<ChatResponseChunk>; - - constructor(options: { - toolCalls: ToolCall[]; - toolCallMessage?: ChatMessage; - responseGenerator?: AsyncGenerator<ChatResponseChunk>; - }) { - this.toolCalls = options.toolCalls; - if (options.toolCallMessage) { - this.toolCallMessage = options.toolCallMessage; - } - if (options.responseGenerator) { - this.responseGenerator = options.responseGenerator; - } - } - - hasMultipleTools() { - const uniqueToolNames = new Set(this.getToolNames()); - return uniqueToolNames.size > 1; - } - - hasToolCall() { - return this.toolCalls.length > 0; - } - - getToolNames() { - return this.toolCalls.map((toolCall) => toolCall.name); - } - - async asFullResponse(): Promise<ChatMessage> { - if (!this.responseGenerator) { - throw new Error("No response generator"); - } - let fullResponse = ""; - for await (const chunk of this.responseGenerator) { - fullResponse += chunk.delta; - } - return { - role: "assistant", - content: fullResponse, - }; - } -} - -export const chatWithTools = async ( - llm: ToolCallLLM, - tools: BaseToolWithCall[], - messages: ChatMessage[], -): Promise<ChatWithToolsResponse> => { - const responseGenerator = async function* (): AsyncGenerator< - boolean | ChatResponseChunk, - void, - unknown - > { - const responseStream = await llm.chat({ messages, tools, stream: true }); - - let fullResponse = null; - let yieldedIndicator = false; - const toolCallMap = new Map(); - for await (const chunk of responseStream) { - const hasToolCalls = chunk.options && "toolCall" in chunk.options; - if (!hasToolCalls) { - if (!yieldedIndicator) { - yield false; - yieldedIndicator = true; - } - yield chunk; - } else if (!yieldedIndicator) { - yield true; - yieldedIndicator = true; - } - - if (chunk.options && "toolCall" in chunk.options) { - for (const toolCall of chunk.options.toolCall as PartialToolCall[]) { - if (toolCall.id) { - toolCallMap.set(toolCall.id, toolCall); - } - } - } - - if ( - hasToolCalls && - // eslint-disable-next-line @typescript-eslint/no-explicit-any - (chunk.raw as any)?.choices?.[0]?.finish_reason !== null - ) { - // Update the fullResponse with the tool calls - const toolCalls = Array.from(toolCallMap.values()); - fullResponse = { - ...chunk, - options: { - ...chunk.options, - toolCall: toolCalls, - }, - }; - } - } - - if (fullResponse) { - yield fullResponse; - } - }; - - const generator = responseGenerator(); - const isToolCall = await generator.next(); - - if (isToolCall.value) { - // If it's a tool call, we need to wait for the full response - let fullResponse = null; - for await (const chunk of generator) { - fullResponse = chunk; - } - - if (fullResponse) { - const responseChunk = fullResponse as ChatResponseChunk; - const toolCalls = getToolCallsFromResponse(responseChunk); - return new ChatWithToolsResponse({ - toolCalls, - toolCallMessage: { - options: responseChunk.options, - role: "assistant", - content: "", - }, - }); - } else { - throw new Error("Cannot get tool calls from response"); - } - } - - return new ChatWithToolsResponse({ - toolCalls: [], - responseGenerator: generator as AsyncGenerator<ChatResponseChunk>, - }); -}; - -export const getToolCallsFromResponse = ( - response: - | ChatResponse<ToolCallLLMMessageOptions> - | ChatResponseChunk<ToolCallLLMMessageOptions>, -): ToolCall[] => { - let options; - - if ("message" in response) { - options = response.message.options; - } else { - options = response.options; - } - - if (options && "toolCall" in options) { - return options.toolCall as ToolCall[]; - } - return []; -}; diff --git a/packages/server/src/workflow/type.ts b/packages/server/src/workflow/type.ts deleted file mode 100644 index 3f08d1f314e21213c5bed836f0eeb3f4c790fdc0..0000000000000000000000000000000000000000 --- a/packages/server/src/workflow/type.ts +++ /dev/null @@ -1,29 +0,0 @@ -import { - Workflow, - WorkflowEvent, - type ChatMessage, - type ChatResponseChunk, -} from "llamaindex"; - -export type AgentInput = { - messages: ChatMessage[]; -}; - -export type AgentRunEventType = "text" | "progress"; - -export type ProgressEventData = { - id: string; - total: number; - current: number; -}; - -export type AgentRunEventData = ProgressEventData; - -export class AgentRunEvent extends WorkflowEvent<{ - agent: string; - text: string; - type: AgentRunEventType; - data?: AgentRunEventData; -}> {} - -export type ServerWorkflow = Workflow<null, AgentInput, ChatResponseChunk>; diff --git a/packages/server/tsconfig.json b/packages/server/tsconfig.json deleted file mode 100644 index a93775d954ab510bbae6d3aaabe2c7204f557e2b..0000000000000000000000000000000000000000 --- a/packages/server/tsconfig.json +++ /dev/null @@ -1,15 +0,0 @@ -{ - "extends": "../../tsconfig.json", - "compilerOptions": { - "rootDir": "./src", - "outDir": "./dist/type", - "tsBuildInfoFile": "./dist/.tsbuildinfo", - "emitDeclarationOnly": true, - "moduleResolution": "Bundler", - "skipLibCheck": true, - "strict": true, - "types": ["node"] - }, - "include": ["./src"], - "exclude": ["node_modules"] -} diff --git a/tsconfig.json b/tsconfig.json index 46fc7a20eb7bca789b75dfeb0d52c53b5689dbd9..187b9ee5a9a537bfb8d41b379f71e9e80ec1c929 100644 --- a/tsconfig.json +++ b/tsconfig.json @@ -194,9 +194,6 @@ { "path": "./packages/tools/tsconfig.json" }, - { - "path": "./packages/server/tsconfig.json" - }, { "path": "./packages/server-next/tsconfig.json" }