Skip to content
Snippets Groups Projects
Commit b4b4ff57 authored by thucpn's avatar thucpn
Browse files

feat: @llamaindex/server

parent 19c90bda
No related branches found
No related tags found
No related merge requests found
{
"name": "@llamaindex/server",
"description": "LlamaIndex Server",
"version": "0.0.1",
"type": "module",
"main": "./dist/index.cjs",
"module": "./dist/index.js",
"exports": {
".": {
"require": {
"types": "./dist/index.d.cts",
"default": "./dist/index.cjs"
},
"import": {
"types": "./dist/index.d.ts",
"default": "./dist/index.js"
}
}
},
"files": [
"dist"
],
"repository": {
"type": "git",
"url": "git+https://github.com/run-llama/LlamaIndexTS.git",
"directory": "packages/server"
},
"scripts": {
"build": "bunchee",
"dev": "bunchee --watch",
"test": "vitest run"
},
"devDependencies": {
"bunchee": "6.4.0",
"vitest": "^2.1.5",
"@types/node": "^22.9.0",
"@types/express": "^4.17.21"
},
"dependencies": {
"@llamaindex/core": "workspace:*",
"@llamaindex/env": "workspace:*",
"@llamaindex/tools": "workspace:*",
"llamaindex": "workspace:*",
"ai": "^4.0.3",
"@llamaindex/openai": "^0.1.52",
"@llamaindex/readers": "^2.0.0"
},
"peerDependencies": {
"express": "^4.18.2"
}
}
import type { Message } from "ai";
import type { Response as ExpressResponse } from "express";
export async function pipeExpressResponse(
expressResponse: ExpressResponse,
streamResponse: Response,
) {
if (!streamResponse.body) return;
const reader = streamResponse.body.getReader();
while (true) {
const { done, value } = await reader.read();
if (done) return expressResponse.end();
expressResponse.write(value);
}
}
export function getUserMessageContent(messages: Message[]): string {
const userMessageContent = messages[messages.length - 1]?.content;
if (!userMessageContent) {
throw new Error(
"Messages are required and the last message must be from the user",
);
}
return userMessageContent;
}
export * from "./server";
export * from "./workflow/tools"; // helper functions for tools
export * from "./workflow/type"; // type definitions for the workflow
import type { Message } from "ai";
import express from "express";
import { getUserMessageContent, pipeExpressResponse } from "./helper";
import { chatWithWorkflow } from "./workflow/stream";
import type { ServerWorkflow } from "./workflow/type";
export interface LlamaIndexServerParams {
workflow: ServerWorkflow;
port?: number;
}
export class LlamaIndexServer {
app: express.Application;
workflow: ServerWorkflow;
port: number;
constructor({ workflow, port = 8000 }: LlamaIndexServerParams) {
this.app = express();
this.workflow = workflow;
this.port = port;
this.setupRoutes();
}
public chatController = async (
req: express.Request,
res: express.Response,
) => {
try {
const { messages } = req.body as { messages: Message[] };
const userMessageContent = getUserMessageContent(messages);
const streamResponse = await chatWithWorkflow(
userMessageContent,
this.workflow,
);
await pipeExpressResponse(res, streamResponse);
} catch (error) {
console.error("Chat error:", error);
res.status(500).json({ error: "Internal server error" });
}
};
protected setupRoutes() {
this.app.use(express.json());
this.app.post("/chat", this.chatController);
}
public start() {
this.app.listen(this.port, () => {
console.log(`LlamaIndex server running on port ${this.port}`);
});
}
}
import { LlamaIndexAdapter, StreamData, type JSONValue } from "ai";
import {
EngineResponse,
StopEvent,
Workflow,
WorkflowContext,
WorkflowEvent,
type ChatResponseChunk,
} from "llamaindex";
import { ReadableStream } from "stream/web";
import { AgentRunEvent, type AgentInput } from "./type";
export async function chatWithWorkflow(
message: string,
workflow: Workflow<null, AgentInput, ChatResponseChunk>,
): Promise<Response> {
const context = workflow.run({ message });
const { stream, dataStream } = await createStreamFromWorkflowContext(context);
const response = LlamaIndexAdapter.toDataStreamResponse(stream, {
data: dataStream,
});
return response;
}
async function createStreamFromWorkflowContext<Input, Output, Context>(
context: WorkflowContext<Input, Output, Context>,
): Promise<{ stream: ReadableStream<EngineResponse>; dataStream: StreamData }> {
const dataStream = new StreamData();
let generator: AsyncGenerator<ChatResponseChunk> | undefined;
const closeStreams = (controller: ReadableStreamDefaultController) => {
controller.close();
dataStream.close();
};
const stream = new ReadableStream<EngineResponse>({
async start(controller) {
// Kickstart the stream by sending an empty string
controller.enqueue({ delta: "" } as EngineResponse);
},
async pull(controller) {
while (!generator) {
// get next event from workflow context
const { value: event, done } =
await context[Symbol.asyncIterator]().next();
if (done) {
closeStreams(controller);
return;
}
generator = handleEvent(event, dataStream);
}
const { value: chunk, done } = await generator.next();
if (done) {
closeStreams(controller);
return;
}
const delta = chunk.delta ?? "";
if (delta) {
controller.enqueue({ delta } as EngineResponse);
}
},
});
return { stream, dataStream };
}
function handleEvent(
event: WorkflowEvent<unknown>,
dataStream: StreamData,
): AsyncGenerator<ChatResponseChunk> | undefined {
// Handle for StopEvent
if (event instanceof StopEvent) {
return event.data as AsyncGenerator<ChatResponseChunk>;
}
// Handle for AgentRunEvent
if (event instanceof AgentRunEvent) {
dataStream.appendMessageAnnotation({
type: "agent",
data: event.data as JSONValue,
});
}
}
import {
type BaseToolWithCall,
callTool,
type ChatMessage,
type ChatResponse,
type ChatResponseChunk,
type HandlerContext,
type PartialToolCall,
type ToolCall,
ToolCallLLM,
type ToolCallLLMMessageOptions,
} from "llamaindex";
import crypto from "node:crypto";
import { AgentRunEvent } from "./type";
/**
* Call multiple tools and return the tool messages
*/
export const callTools = async <T>({
tools,
toolCalls,
ctx,
agentName,
writeEvent = true,
}: {
toolCalls: ToolCall[];
tools: BaseToolWithCall[];
ctx: HandlerContext<T>;
agentName: string;
writeEvent?: boolean;
}): Promise<ChatMessage[]> => {
const toolMsgs: ChatMessage[] = [];
if (toolCalls.length === 0) {
return toolMsgs;
}
if (toolCalls.length === 1 && toolCalls[0]) {
const tool = tools.find(
(tool) => tool.metadata.name === toolCalls[0]!.name,
);
if (!tool) {
throw new Error(`Tool ${toolCalls[0].name} not found`);
}
return [
await callSingleTool(
tool,
toolCalls[0],
writeEvent
? (msg: string) => {
ctx.sendEvent(
new AgentRunEvent({
agent: agentName,
text: msg,
type: "text",
}),
);
}
: undefined,
),
];
}
// Multiple tool calls, show events in progress
const progressId = crypto.randomUUID();
const totalSteps = toolCalls.length;
let currentStep = 0;
for (const toolCall of toolCalls) {
const tool = tools.find((tool) => tool.metadata.name === toolCall.name);
if (!tool) {
throw new Error(`Tool ${toolCall.name} not found`);
}
const toolMsg = await callSingleTool(tool, toolCall, (msg: string) => {
ctx.sendEvent(
new AgentRunEvent({
agent: agentName,
text: msg,
type: "progress",
data: {
id: progressId,
total: totalSteps,
current: currentStep,
},
}),
);
currentStep++;
});
toolMsgs.push(toolMsg);
}
return toolMsgs;
};
export const callSingleTool = async (
tool: BaseToolWithCall,
toolCall: ToolCall,
eventEmitter?: (msg: string) => void,
): Promise<ChatMessage> => {
if (eventEmitter) {
eventEmitter(
`Calling tool ${toolCall.name} with input: ${JSON.stringify(toolCall.input)}`,
);
}
const toolOutput = await callTool(tool, toolCall, {
log: () => {},
error: (...args: unknown[]) => {
console.error(`Tool ${toolCall.name} got error:`, ...args);
if (eventEmitter) {
eventEmitter(`Tool ${toolCall.name} got error: ${args.join(" ")}`);
}
return {
content: JSON.stringify({
error: args.join(" "),
}),
role: "user",
options: {
toolResult: {
id: toolCall.id,
result: JSON.stringify({
error: args.join(" "),
}),
isError: true,
},
},
};
},
warn: () => {},
});
return {
content: JSON.stringify(toolOutput.output),
role: "user",
options: {
toolResult: {
result: toolOutput.output,
isError: toolOutput.isError,
id: toolCall.id,
},
},
};
};
class ChatWithToolsResponse {
toolCalls: ToolCall[];
toolCallMessage?: ChatMessage;
responseGenerator?: AsyncGenerator<ChatResponseChunk>;
constructor(options: {
toolCalls: ToolCall[];
toolCallMessage?: ChatMessage;
responseGenerator?: AsyncGenerator<ChatResponseChunk>;
}) {
this.toolCalls = options.toolCalls;
if (options.toolCallMessage) {
this.toolCallMessage = options.toolCallMessage;
}
if (options.responseGenerator) {
this.responseGenerator = options.responseGenerator;
}
}
hasMultipleTools() {
const uniqueToolNames = new Set(this.getToolNames());
return uniqueToolNames.size > 1;
}
hasToolCall() {
return this.toolCalls.length > 0;
}
getToolNames() {
return this.toolCalls.map((toolCall) => toolCall.name);
}
async asFullResponse(): Promise<ChatMessage> {
if (!this.responseGenerator) {
throw new Error("No response generator");
}
let fullResponse = "";
for await (const chunk of this.responseGenerator) {
fullResponse += chunk.delta;
}
return {
role: "assistant",
content: fullResponse,
};
}
}
export const chatWithTools = async (
llm: ToolCallLLM,
tools: BaseToolWithCall[],
messages: ChatMessage[],
): Promise<ChatWithToolsResponse> => {
const responseGenerator = async function* (): AsyncGenerator<
boolean | ChatResponseChunk,
void,
unknown
> {
const responseStream = await llm.chat({ messages, tools, stream: true });
let fullResponse = null;
let yieldedIndicator = false;
const toolCallMap = new Map();
for await (const chunk of responseStream) {
const hasToolCalls = chunk.options && "toolCall" in chunk.options;
if (!hasToolCalls) {
if (!yieldedIndicator) {
yield false;
yieldedIndicator = true;
}
yield chunk;
} else if (!yieldedIndicator) {
yield true;
yieldedIndicator = true;
}
if (chunk.options && "toolCall" in chunk.options) {
for (const toolCall of chunk.options.toolCall as PartialToolCall[]) {
if (toolCall.id) {
toolCallMap.set(toolCall.id, toolCall);
}
}
}
if (
hasToolCalls &&
// eslint-disable-next-line @typescript-eslint/no-explicit-any
(chunk.raw as any)?.choices?.[0]?.finish_reason !== null
) {
// Update the fullResponse with the tool calls
const toolCalls = Array.from(toolCallMap.values());
fullResponse = {
...chunk,
options: {
...chunk.options,
toolCall: toolCalls,
},
};
}
}
if (fullResponse) {
yield fullResponse;
}
};
const generator = responseGenerator();
const isToolCall = await generator.next();
if (isToolCall.value) {
// If it's a tool call, we need to wait for the full response
let fullResponse = null;
for await (const chunk of generator) {
fullResponse = chunk;
}
if (fullResponse) {
const responseChunk = fullResponse as ChatResponseChunk;
const toolCalls = getToolCallsFromResponse(responseChunk);
return new ChatWithToolsResponse({
toolCalls,
toolCallMessage: {
options: responseChunk.options,
role: "assistant",
content: "",
},
});
} else {
throw new Error("Cannot get tool calls from response");
}
}
return new ChatWithToolsResponse({
toolCalls: [],
responseGenerator: generator as AsyncGenerator<ChatResponseChunk>,
});
};
export const getToolCallsFromResponse = (
response:
| ChatResponse<ToolCallLLMMessageOptions>
| ChatResponseChunk<ToolCallLLMMessageOptions>,
): ToolCall[] => {
let options;
if ("message" in response) {
options = response.message.options;
} else {
options = response.options;
}
if (options && "toolCall" in options) {
return options.toolCall as ToolCall[];
}
return [];
};
import {
Workflow,
WorkflowEvent,
type ChatResponseChunk,
type MessageContent,
} from "llamaindex";
export type AgentInput = {
message: MessageContent;
streaming?: boolean;
};
export type AgentRunEventType = "text" | "progress";
export type ProgressEventData = {
id: string;
total: number;
current: number;
};
export type AgentRunEventData = ProgressEventData;
export class AgentRunEvent extends WorkflowEvent<{
agent: string;
text: string;
type: AgentRunEventType;
data?: AgentRunEventData;
}> {}
export type ServerWorkflow = Workflow<null, AgentInput, ChatResponseChunk>;
{
"extends": "../../tsconfig.json",
"compilerOptions": {
"rootDir": "./src",
"outDir": "./dist/type",
"tsBuildInfoFile": "./dist/.tsbuildinfo",
"emitDeclarationOnly": true,
"moduleResolution": "Bundler",
"skipLibCheck": true,
"strict": true,
"types": ["node"]
},
"include": ["./src"],
"exclude": ["node_modules"]
}
This diff is collapsed.
...@@ -193,6 +193,9 @@ ...@@ -193,6 +193,9 @@
}, },
{ {
"path": "./packages/tools/tsconfig.json" "path": "./packages/tools/tsconfig.json"
},
{
"path": "./packages/server/tsconfig.json"
} }
] ]
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment