diff --git a/apps/simple/llmStream.ts b/apps/simple/llmStream.ts new file mode 100644 index 0000000000000000000000000000000000000000..e9ebce4441cb2849d534947061114999a5e3a980 --- /dev/null +++ b/apps/simple/llmStream.ts @@ -0,0 +1,47 @@ +import { ChatMessage, OpenAI, SimpleChatEngine } from "llamaindex"; +import {Anthropic} from "../../packages/core/src/llm/LLM"; +import { stdin as input, stdout as output } from "node:process"; +import readline from "node:readline/promises"; + +async function main() { + const query: string = ` +Where is Istanbul? + `; + + // const llm = new OpenAI({ model: "gpt-3.5-turbo", temperature: 0.1 }); + const llm = new Anthropic(); + const message: ChatMessage = { content: query, role: "user" }; + + //TODO: Add callbacks later + + //Stream Complete + //Note: Setting streaming flag to true or false will auto-set your return type to + //either an AsyncGenerator or a Response. + // Omitting the streaming flag automatically sets streaming to false + + const chatEngine: SimpleChatEngine = new SimpleChatEngine({ + chatHistory: undefined, + llm: llm, + }); + + const rl = readline.createInterface({ input, output }); + while (true) { + const query = await rl.question("Query: "); + + if (!query) { + break; + } + + //Case 1: .chat(query, undefined, true) => Stream + //Case 2: .chat(query, undefined, false) => Response object + //Case 3: .chat(query, undefined) => Response object + const chatStream = await chatEngine.chat(query, undefined, true); + var accumulated_result = ""; + for await (const part of chatStream) { + accumulated_result += part; + process.stdout.write(part); + } + } +} + +main(); diff --git a/examples/llm_stream.ts b/examples/llm_stream.ts deleted file mode 100644 index d0cf8099213d926c89c67657e948161aef605611..0000000000000000000000000000000000000000 --- a/examples/llm_stream.ts +++ /dev/null @@ -1,61 +0,0 @@ -import * as tiktoken from "tiktoken-node"; -import { - CallbackManager, - Event, - EventType, -} from "../packages/core/src/callbacks/CallbackManager"; -import { ChatMessage, MessageType, OpenAI } from "../packages/core/src/llm/LLM"; - -async function main() { - const query: string = "Where is Istanbul?"; - - const llm = new OpenAI({ model: "gpt-3.5-turbo", temperature: 0.1 }); - const message: ChatMessage = { content: query, role: "user" }; - - var accumulated_result: string = ""; - var total_tokens: number = 0; - - //Callback stuff, like logging token usage. - //GPT 3.5 Turbo uses CL100K_Base encodings, check your LLM to see which tokenizer it uses. - const encoding = tiktoken.getEncoding("cl100k_base"); - - const callback: CallbackManager = new CallbackManager(); - callback.onLLMStream = (callback_response) => { - //Token text - const text = callback_response.token.choices[0].delta.content - ? callback_response.token.choices[0].delta.content - : ""; - //Increment total number of tokens - total_tokens += encoding.encode(text).length; - }; - - llm.callbackManager = callback; - - //Create a dummy event to trigger our Stream Callback - const dummy_event: Event = { - id: "something", - type: "intermediate" as EventType, - }; - - //Stream Complete - const stream = llm.stream_complete(query, dummy_event); - - for await (const part of stream) { - //This only gives you the string part of a stream - console.log(part); - accumulated_result += part; - } - - const correct_total_tokens: number = - encoding.encode(accumulated_result).length; - - //Check if our stream token counter works - console.log( - `Output token total using tokenizer on accumulated output: ${correct_total_tokens}`, - ); - console.log( - `Output token total using tokenizer on stream output: ${total_tokens}`, - ); -} - -main(); diff --git a/packages/core/src/ChatEngine.ts b/packages/core/src/ChatEngine.ts index 461304ea0bb257ada2e320d6d1768f8426d340af..bc299356e36263a247d047b1b4b807e843162ee1 100644 --- a/packages/core/src/ChatEngine.ts +++ b/packages/core/src/ChatEngine.ts @@ -23,8 +23,16 @@ export interface ChatEngine { * Send message along with the class's current chat history to the LLM. * @param message * @param chatHistory optional chat history if you want to customize the chat history + * @param streaming optional streaming flag, which auto-sets the return value if True. */ - chat(message: string, chatHistory?: ChatMessage[]): Promise<Response>; + chat< + T extends boolean | undefined = undefined, + R = T extends true ? AsyncGenerator<string, void, unknown> : Response, + >( + message: string, + chatHistory?: ChatMessage[], + streaming?: T, + ): Promise<R>; /** * Resets the chat history so that it's empty. @@ -44,13 +52,45 @@ export class SimpleChatEngine implements ChatEngine { this.llm = init?.llm ?? new OpenAI(); } - async chat(message: string, chatHistory?: ChatMessage[]): Promise<Response> { + async chat< + T extends boolean | undefined = undefined, + R = T extends true ? AsyncGenerator<string, void, unknown> : Response, + >(message: string, chatHistory?: ChatMessage[], streaming?: T): Promise<R> { + //Streaming option + if (streaming) { + return this.streamChat(message, chatHistory) as R; + } + + //Non-streaming option chatHistory = chatHistory ?? this.chatHistory; chatHistory.push({ content: message, role: "user" }); - const response = await this.llm.chat(chatHistory); + const response = await this.llm.chat(chatHistory, undefined); chatHistory.push(response.message); this.chatHistory = chatHistory; - return new Response(response.message.content); + return new Response(response.message.content) as R; + } + + protected async *streamChat( + message: string, + chatHistory?: ChatMessage[], + ): AsyncGenerator<string, void, unknown> { + chatHistory = chatHistory ?? this.chatHistory; + chatHistory.push({ content: message, role: "user" }); + const response_generator = await this.llm.chat( + chatHistory, + undefined, + true, + ); + + var accumulator: string = ""; + for await (const part of response_generator) { + accumulator += part; + yield part; + } + + chatHistory.push({ content: accumulator, role: "assistant" }); + this.chatHistory = chatHistory; + return; } reset() { @@ -99,10 +139,14 @@ export class CondenseQuestionChatEngine implements ChatEngine { ); } - async chat( + async chat< + T extends boolean | undefined = undefined, + R = T extends true ? AsyncGenerator<string, void, unknown> : Response, + >( message: string, chatHistory?: ChatMessage[] | undefined, - ): Promise<Response> { + streaming?: T, + ): Promise<R> { chatHistory = chatHistory ?? this.chatHistory; const condensedQuestion = ( @@ -114,7 +158,7 @@ export class CondenseQuestionChatEngine implements ChatEngine { chatHistory.push({ content: message, role: "user" }); chatHistory.push({ content: response.response, role: "assistant" }); - return response; + return response as R; } reset() { @@ -129,13 +173,13 @@ export class CondenseQuestionChatEngine implements ChatEngine { */ export class ContextChatEngine implements ChatEngine { retriever: BaseRetriever; - chatModel: OpenAI; + chatModel: LLM; chatHistory: ChatMessage[]; contextSystemPrompt: ContextSystemPrompt; constructor(init: { retriever: BaseRetriever; - chatModel?: OpenAI; + chatModel?: LLM; chatHistory?: ChatMessage[]; contextSystemPrompt?: ContextSystemPrompt; }) { @@ -147,9 +191,21 @@ export class ContextChatEngine implements ChatEngine { init?.contextSystemPrompt ?? defaultContextSystemPrompt; } - async chat(message: string, chatHistory?: ChatMessage[] | undefined) { + async chat< + T extends boolean | undefined = undefined, + R = T extends true ? AsyncGenerator<string, void, unknown> : Response, + >( + message: string, + chatHistory?: ChatMessage[] | undefined, + streaming?: T, + ): Promise<R> { chatHistory = chatHistory ?? this.chatHistory; + //Streaming option + if (streaming) { + return this.streamChat(message, chatHistory) as R; + } + const parentEvent: Event = { id: uuidv4(), type: "wrapper", @@ -182,7 +238,52 @@ export class ContextChatEngine implements ChatEngine { return new Response( response.message.content, sourceNodesWithScore.map((r) => r.node), + ) as R; + } + + protected async *streamChat( + message: string, + chatHistory?: ChatMessage[] | undefined, + ): AsyncGenerator<string, void, unknown> { + chatHistory = chatHistory ?? this.chatHistory; + + const parentEvent: Event = { + id: uuidv4(), + type: "wrapper", + tags: ["final"], + }; + const sourceNodesWithScore = await this.retriever.retrieve( + message, + parentEvent, ); + + const systemMessage: ChatMessage = { + content: this.contextSystemPrompt({ + context: sourceNodesWithScore + .map((r) => (r.node as TextNode).text) + .join("\n\n"), + }), + role: "system", + }; + + chatHistory.push({ content: message, role: "user" }); + + const response_stream = await this.chatModel.chat( + [systemMessage, ...chatHistory], + parentEvent, + true, + ); + var accumulator: string = ""; + for await (const part of response_stream) { + accumulator += part; + yield part; + } + + chatHistory.push({ content: accumulator, role: "system" }); + + this.chatHistory = chatHistory; + + return; } reset() { @@ -203,11 +304,42 @@ export class HistoryChatEngine implements ChatEngine { this.llm = init?.llm ?? new OpenAI(); } - async chat(message: string): Promise<Response> { + async chat< + T extends boolean | undefined = undefined, + R = T extends true ? AsyncGenerator<string, void, unknown> : Response, + >( + message: string, + chatHistory?: ChatMessage[] | undefined, + streaming?: T, + ): Promise<R> { + //Streaming option + if (streaming) { + return this.streamChat(message, chatHistory) as R; + } this.chatHistory.addMessage({ content: message, role: "user" }); const response = await this.llm.chat(this.chatHistory.messages); this.chatHistory.addMessage(response.message); - return new Response(response.message.content); + return new Response(response.message.content) as R; + } + + protected async *streamChat( + message: string, + chatHistory?: ChatMessage[] | undefined, + ): AsyncGenerator<string, void, unknown> { + this.chatHistory.addMessage({ content: message, role: "user" }); + const response_stream = await this.llm.chat( + this.chatHistory.messages, + undefined, + true, + ); + + var accumulator = ""; + for await (const part of response_stream) { + accumulator += part; + yield part; + } + this.chatHistory.addMessage({ content: accumulator, role: "user" }); + return; } reset() { diff --git a/packages/core/src/callbacks/CallbackManager.ts b/packages/core/src/callbacks/CallbackManager.ts index fd4ced7a8aaba13f4992596fb4dd3a9900167eb6..266058261ecaa2024cf55ecc792ee5ccce606c33 100644 --- a/packages/core/src/callbacks/CallbackManager.ts +++ b/packages/core/src/callbacks/CallbackManager.ts @@ -39,6 +39,13 @@ export interface DefaultStreamToken { //OpenAI stream token schema is the default. //Note: Anthropic and Replicate also use similar token schemas. export type OpenAIStreamToken = DefaultStreamToken; +export type AnthropicStreamToken = { + completion: string; + model: string; + stop_reason: string | undefined; + stop?: boolean | undefined; + log_id?: string; +}; // //Callback Responses diff --git a/packages/core/src/llm/LLM.ts b/packages/core/src/llm/LLM.ts index 54bfbae11e7927a05412f938ccc83759726a35db..777a22367775b34b65117ed1d0711c2858158dc2 100644 --- a/packages/core/src/llm/LLM.ts +++ b/packages/core/src/llm/LLM.ts @@ -1,5 +1,6 @@ import OpenAILLM, { ClientOptions as OpenAIClientOptions } from "openai"; import { + AnthropicStreamToken, CallbackManager, Event, EventType, @@ -48,27 +49,35 @@ export type CompletionResponse = ChatResponse; * Unified language model interface */ export interface LLM { + //Whether a LLM has streaming support + hasStreaming: boolean; /** * Get a chat response from the LLM * @param messages + * + * The return type of chat() and complete() are set by the "streaming" parameter being set to True. */ - chat(messages: ChatMessage[], parentEvent?: Event): Promise<ChatResponse>; + chat< + T extends boolean | undefined = undefined, + R = T extends true ? AsyncGenerator<string, void, unknown> : ChatResponse, + >( + messages: ChatMessage[], + parentEvent?: Event, + streaming?: T, + ): Promise<R>; /** * Get a prompt completion from the LLM * @param prompt the prompt to complete */ - complete(prompt: string, parentEvent?: Event): Promise<CompletionResponse>; - - stream_chat?( - messages: ChatMessage[], - parentEvent?: Event, - ): AsyncGenerator<string, void, unknown>; - - stream_complete?( - query: string, + complete< + T extends boolean | undefined = undefined, + R = T extends true ? AsyncGenerator<string, void, unknown> : ChatResponse, + >( + prompt: string, parentEvent?: Event, - ): AsyncGenerator<string, void, unknown>; + streaming?: T, + ): Promise<R>; } export const GPT4_MODELS = { @@ -102,6 +111,7 @@ export class OpenAI implements LLM { Partial<OpenAILLM.Chat.CompletionCreateParams>, "max_tokens" | "messages" | "model" | "temperature" | "top_p" | "streaming" >; + hasStreaming: boolean; // OpenAI session params apiKey?: string = undefined; @@ -129,6 +139,7 @@ export class OpenAI implements LLM { this.timeout = init?.timeout ?? 60 * 1000; // Default is 60 seconds this.additionalChatOptions = init?.additionalChatOptions; this.additionalSessionOptions = init?.additionalSessionOptions; + this.hasStreaming = init?.hasStreaming ?? true; if (init?.azure || shouldUseAzure()) { const azureConfig = getAzureConfigFromEnv({ @@ -186,10 +197,10 @@ export class OpenAI implements LLM { } } - async chat( - messages: ChatMessage[], - parentEvent?: Event, - ): Promise<ChatResponse> { + async chat< + T extends boolean | undefined = undefined, + R = T extends true ? AsyncGenerator<string, void, unknown> : ChatResponse, + >(messages: ChatMessage[], parentEvent?: Event, streaming?: T): Promise<R> { const baseRequestParams: OpenAILLM.Chat.CompletionCreateParams = { model: this.model, temperature: this.temperature, @@ -201,6 +212,13 @@ export class OpenAI implements LLM { top_p: this.topP, ...this.additionalChatOptions, }; + // Streaming + if (streaming) { + if (!this.hasStreaming) { + throw Error("No streaming support for this LLM."); + } + return this.streamChat(messages, parentEvent) as R; + } // Non-streaming const response = await this.session.openai.chat.completions.create({ ...baseRequestParams, @@ -208,20 +226,26 @@ export class OpenAI implements LLM { }); const content = response.choices[0].message?.content ?? ""; - return { message: { content, role: response.choices[0].message.role } }; + return { + message: { content, role: response.choices[0].message.role }, + } as R; } - async complete( - prompt: string, - parentEvent?: Event, - ): Promise<CompletionResponse> { - return this.chat([{ content: prompt, role: "user" }], parentEvent); + async complete< + T extends boolean | undefined = undefined, + R = T extends true ? AsyncGenerator<string, void, unknown> : ChatResponse, + >(prompt: string, parentEvent?: Event, streaming?: T): Promise<R> { + return this.chat( + [{ content: prompt, role: "user" }], + parentEvent, + streaming, + ); } //We can wrap a stream in a generator to add some additional logging behavior //For future edits: syntax for generator type is <typeof Yield, typeof Return, typeof Accept> //"typeof Accept" refers to what types you'll accept when you manually call generator.next(<AcceptType>) - async *stream_chat( + protected async *streamChat( messages: ChatMessage[], parentEvent?: Event, ): AsyncGenerator<string, void, unknown> { @@ -279,12 +303,12 @@ export class OpenAI implements LLM { return; } - //Stream_complete doesn't need to be async because it's child function is already async - stream_complete( + //streamComplete doesn't need to be async because it's child function is already async + protected streamComplete( query: string, parentEvent?: Event, ): AsyncGenerator<string, void, unknown> { - return this.stream_chat([{ content: query, role: "user" }], parentEvent); + return this.streamChat([{ content: query, role: "user" }], parentEvent); } } @@ -348,6 +372,7 @@ export class LlamaDeuce implements LLM { topP: number; maxTokens?: number; replicateSession: ReplicateSession; + hasStreaming: boolean; constructor(init?: Partial<LlamaDeuce>) { this.model = init?.model ?? "Llama-2-70b-chat-4bit"; @@ -362,6 +387,7 @@ export class LlamaDeuce implements LLM { init?.maxTokens ?? ALL_AVAILABLE_LLAMADEUCE_MODELS[this.model].contextWindow; // For Replicate, the default is 500 tokens which is too low. this.replicateSession = init?.replicateSession ?? new ReplicateSession(); + this.hasStreaming = init?.hasStreaming ?? false; } mapMessagesToPrompt(messages: ChatMessage[]) { @@ -468,10 +494,10 @@ If a question does not make any sense, or is not factually coherent, explain why }; } - async chat( - messages: ChatMessage[], - _parentEvent?: Event, - ): Promise<ChatResponse> { + async chat< + T extends boolean | undefined = undefined, + R = T extends true ? AsyncGenerator<string, void, unknown> : ChatResponse, + >(messages: ChatMessage[], _parentEvent?: Event, streaming?: T): Promise<R> { const api = ALL_AVAILABLE_LLAMADEUCE_MODELS[this.model] .replicateApi as `${string}/${string}:${string}`; @@ -492,6 +518,9 @@ If a question does not make any sense, or is not factually coherent, explain why replicateOptions.input.max_length = this.maxTokens; } + //TODO: Add streaming for this + + //Non-streaming const response = await this.replicateSession.replicate.run( api, replicateOptions, @@ -502,13 +531,13 @@ If a question does not make any sense, or is not factually coherent, explain why //^ We need to do this because Replicate returns a list of strings (for streaming functionality which is not exposed by the run function) role: "assistant", }, - }; + } as R; } - async complete( - prompt: string, - parentEvent?: Event, - ): Promise<CompletionResponse> { + async complete< + T extends boolean | undefined = undefined, + R = T extends true ? AsyncGenerator<string, void, unknown> : ChatResponse, + >(prompt: string, parentEvent?: Event, streaming?: T): Promise<R> { return this.chat([{ content: prompt, role: "user" }], parentEvent); } } @@ -529,6 +558,7 @@ export class Anthropic implements LLM { maxRetries: number; timeout?: number; session: AnthropicSession; + hasStreaming: boolean; callbackManager?: CallbackManager; @@ -548,6 +578,7 @@ export class Anthropic implements LLM { maxRetries: this.maxRetries, timeout: this.timeout, }); + this.hasStreaming = init?.hasStreaming ?? true; this.callbackManager = init?.callbackManager; } @@ -567,10 +598,22 @@ export class Anthropic implements LLM { ); } - async chat( + async chat< + T extends boolean | undefined = undefined, + R = T extends true ? AsyncGenerator<string, void, unknown> : ChatResponse, + >( messages: ChatMessage[], parentEvent?: Event | undefined, - ): Promise<ChatResponse> { + streaming?: T, + ): Promise<R> { + //Streaming + if (streaming) { + if (!this.hasStreaming) { + throw Error("No streaming support for this LLM."); + } + return this.streamChat(messages, parentEvent) as R; + } + //Non-streaming const response = await this.session.anthropic.completions.create({ model: this.model, prompt: this.mapMessagesToPrompt(messages), @@ -583,12 +626,56 @@ export class Anthropic implements LLM { message: { content: response.completion.trimStart(), role: "assistant" }, //^ We're trimming the start because Anthropic often starts with a space in the response // That space will be re-added when we generate the next prompt. - }; + } as R; + } + + protected async *streamChat( + messages: ChatMessage[], + parentEvent?: Event | undefined, + ): AsyncGenerator<string, void, unknown> { + // AsyncIterable<AnthropicStreamToken> + const stream: AsyncIterable<AnthropicStreamToken> = + await this.session.anthropic.completions.create({ + model: this.model, + prompt: this.mapMessagesToPrompt(messages), + max_tokens_to_sample: this.maxTokens ?? 100000, + temperature: this.temperature, + top_p: this.topP, + stream: true, + }); + + var idx_counter: number = 0; + for await (const part of stream) { + //TODO: LLM Stream Callback, pending re-work. + + idx_counter++; + yield part.completion; + } + return; } - async complete( + + async complete< + T extends boolean | undefined = undefined, + R = T extends true ? AsyncGenerator<string, void, unknown> : ChatResponse, + >( prompt: string, parentEvent?: Event | undefined, - ): Promise<CompletionResponse> { - return this.chat([{ content: prompt, role: "user" }], parentEvent); + streaming?: T, + ): Promise<R> { + if (streaming) { + return this.streamComplete(prompt, parentEvent) as R; + } + return this.chat( + [{ content: prompt, role: "user" }], + parentEvent, + streaming, + ) as R; + } + + protected streamComplete( + prompt: string, + parentEvent?: Event | undefined, + ): AsyncGenerator<string, void, unknown> { + return this.streamChat([{ content: prompt, role: "user" }], parentEvent); } }