Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • mirrored_repos/machinelearning/run-llama/create-llama
1 result
Show changes
Commits on Source (4)
Showing
with 582 additions and 112 deletions
......@@ -474,13 +474,6 @@ export const installPythonTemplate = async ({
await copyRouterCode(root, tools ?? []);
}
// Copy multiagents overrides
if (template === "multiagent") {
await copy("**", path.join(root), {
cwd: path.join(compPath, "multiagent", "python"),
});
}
if (template === "multiagent" || template === "reflex") {
if (useCase) {
const sourcePath =
......
......@@ -18,13 +18,13 @@ from llama_index.core.workflow import (
from app.engine.index import IndexConfig, get_index
from app.workflows.agents import plan_research, research, write_report
from app.workflows.events import SourceNodesEvent
from app.workflows.models import (
CollectAnswersEvent,
DataEvent,
PlanResearchEvent,
ReportEvent,
ResearchEvent,
SourceNodesEvent,
)
logger = logging.getLogger("uvicorn")
......
......@@ -4,6 +4,8 @@ from llama_index.core.schema import NodeWithScore
from llama_index.core.workflow import Event
from pydantic import BaseModel
from app.api.routers.models import SourceNodes
# Workflow events
class PlanResearchEvent(Event):
......@@ -41,3 +43,18 @@ class DataEvent(Event):
def to_response(self):
return self.model_dump()
class SourceNodesEvent(Event):
nodes: List[NodeWithScore]
def to_response(self):
return {
"type": "sources",
"data": {
"nodes": [
SourceNodes.from_source_node(node).model_dump()
for node in self.nodes
]
},
}
from enum import Enum
from typing import List, Optional
from llama_index.core.schema import NodeWithScore
from llama_index.core.workflow import Event
from app.api.routers.models import SourceNodes
class AgentRunEventType(Enum):
TEXT = "text"
PROGRESS = "progress"
class AgentRunEvent(Event):
name: str
msg: str
event_type: AgentRunEventType = AgentRunEventType.TEXT
data: Optional[dict] = None
def to_response(self) -> dict:
return {
"type": "agent",
"data": {
"agent": self.name,
"type": self.event_type.value,
"text": self.msg,
"data": self.data,
},
}
class SourceNodesEvent(Event):
nodes: List[NodeWithScore]
def to_response(self):
return {
"type": "sources",
"data": {
"nodes": [
SourceNodes.from_source_node(node).model_dump()
for node in self.nodes
]
},
}
import logging
import uuid
from abc import ABC, abstractmethod
from typing import Any, AsyncGenerator, Callable, Optional
from llama_index.core.base.llms.types import ChatMessage, ChatResponse, MessageRole
from llama_index.core.llms.function_calling import FunctionCallingLLM
from llama_index.core.tools import (
BaseTool,
FunctionTool,
ToolOutput,
ToolSelection,
)
from llama_index.core.workflow import Context
from pydantic import BaseModel, ConfigDict
from app.workflows.events import AgentRunEvent, AgentRunEventType
logger = logging.getLogger("uvicorn")
class ContextAwareTool(FunctionTool, ABC):
@abstractmethod
async def acall(self, ctx: Context, input: Any) -> ToolOutput: # type: ignore
pass
class ChatWithToolsResponse(BaseModel):
"""
A tool call response from chat_with_tools.
"""
tool_calls: Optional[list[ToolSelection]]
tool_call_message: Optional[ChatMessage]
generator: Optional[AsyncGenerator[ChatResponse | None, None]]
model_config = ConfigDict(arbitrary_types_allowed=True)
def is_calling_different_tools(self) -> bool:
tool_names = {tool_call.tool_name for tool_call in self.tool_calls}
return len(tool_names) > 1
def has_tool_calls(self) -> bool:
return self.tool_calls is not None and len(self.tool_calls) > 0
def tool_name(self) -> str:
assert self.has_tool_calls()
assert not self.is_calling_different_tools()
return self.tool_calls[0].tool_name
async def full_response(self) -> str:
assert self.generator is not None
full_response = ""
async for chunk in self.generator:
content = chunk.message.content
if content:
full_response += content
return full_response
async def chat_with_tools( # type: ignore
llm: FunctionCallingLLM,
tools: list[BaseTool],
chat_history: list[ChatMessage],
) -> ChatWithToolsResponse:
"""
Request LLM to call tools or not.
This function doesn't change the memory.
"""
generator = _tool_call_generator(llm, tools, chat_history)
is_tool_call = await generator.__anext__()
if is_tool_call:
# Last chunk is the full response
# Wait for the last chunk
full_response = None
async for chunk in generator:
full_response = chunk
assert isinstance(full_response, ChatResponse)
return ChatWithToolsResponse(
tool_calls=llm.get_tool_calls_from_response(full_response),
tool_call_message=full_response.message,
generator=None,
)
else:
return ChatWithToolsResponse(
tool_calls=None,
tool_call_message=None,
generator=generator,
)
async def call_tools(
ctx: Context,
agent_name: str,
tools: list[BaseTool],
tool_calls: list[ToolSelection],
emit_agent_events: bool = True,
) -> list[ChatMessage]:
if len(tool_calls) == 0:
return []
tools_by_name = {tool.metadata.get_name(): tool for tool in tools}
if len(tool_calls) == 1:
return [
await call_tool(
ctx,
tools_by_name[tool_calls[0].tool_name],
tool_calls[0],
lambda msg: ctx.write_event_to_stream(
AgentRunEvent(
name=agent_name,
msg=msg,
)
),
)
]
# Multiple tool calls, show progress
tool_msgs: list[ChatMessage] = []
progress_id = str(uuid.uuid4())
total_steps = len(tool_calls)
if emit_agent_events:
ctx.write_event_to_stream(
AgentRunEvent(
name=agent_name,
msg=f"Making {total_steps} tool calls",
)
)
for i, tool_call in enumerate(tool_calls):
tool = tools_by_name.get(tool_call.tool_name)
if not tool:
tool_msgs.append(
ChatMessage(
role=MessageRole.ASSISTANT,
content=f"Tool {tool_call.tool_name} does not exist",
)
)
continue
tool_msg = await call_tool(
ctx,
tool,
tool_call,
event_emitter=lambda msg: ctx.write_event_to_stream(
AgentRunEvent(
name=agent_name,
msg=msg,
event_type=AgentRunEventType.PROGRESS,
data={
"id": progress_id,
"total": total_steps,
"current": i,
},
)
),
)
tool_msgs.append(tool_msg)
return tool_msgs
async def call_tool(
ctx: Context,
tool: BaseTool,
tool_call: ToolSelection,
event_emitter: Optional[Callable[[str], None]],
) -> ChatMessage:
if event_emitter:
event_emitter(
f"Calling tool {tool_call.tool_name}, {str(tool_call.tool_kwargs)}"
)
try:
if isinstance(tool, ContextAwareTool):
if ctx is None:
raise ValueError("Context is required for context aware tool")
# inject context for calling an context aware tool
response = await tool.acall(ctx=ctx, **tool_call.tool_kwargs)
else:
response = await tool.acall(**tool_call.tool_kwargs) # type: ignore
return ChatMessage(
role=MessageRole.TOOL,
content=str(response.raw_output),
additional_kwargs={
"tool_call_id": tool_call.tool_id,
"name": tool.metadata.get_name(),
},
)
except Exception as e:
logger.error(f"Got error in tool {tool_call.tool_name}: {str(e)}")
if event_emitter:
event_emitter(f"Got error in tool {tool_call.tool_name}: {str(e)}")
return ChatMessage(
role=MessageRole.TOOL,
content=f"Error: {str(e)}",
additional_kwargs={
"tool_call_id": tool_call.tool_id,
"name": tool.metadata.get_name(),
},
)
async def _tool_call_generator(
llm: FunctionCallingLLM,
tools: list[BaseTool],
chat_history: list[ChatMessage],
) -> AsyncGenerator[ChatResponse | bool, None]:
response_stream = await llm.astream_chat_with_tools(
tools,
chat_history=chat_history,
allow_parallel_tool_calls=False,
)
full_response = None
yielded_indicator = False
async for chunk in response_stream:
if "tool_calls" not in chunk.message.additional_kwargs:
# Yield a boolean to indicate whether the response is a tool call
if not yielded_indicator:
yield False
yielded_indicator = True
# if not a tool call, yield the chunks!
yield chunk # type: ignore
elif not yielded_indicator:
# Yield the indicator for a tool call
yield True
yielded_indicator = True
full_response = chunk
if full_response:
yield full_response # type: ignore
from enum import Enum
from typing import List, Optional
from llama_index.core.schema import NodeWithScore
from llama_index.core.workflow import Event
from app.api.routers.models import SourceNodes
class AgentRunEventType(Enum):
TEXT = "text"
PROGRESS = "progress"
class AgentRunEvent(Event):
name: str
msg: str
event_type: AgentRunEventType = AgentRunEventType.TEXT
data: Optional[dict] = None
def to_response(self) -> dict:
return {
"type": "agent",
"data": {
"agent": self.name,
"type": self.event_type.value,
"text": self.msg,
"data": self.data,
},
}
class SourceNodesEvent(Event):
nodes: List[NodeWithScore]
def to_response(self):
return {
"type": "sources",
"data": {
"nodes": [
SourceNodes.from_source_node(node).model_dump()
for node in self.nodes
]
},
}
import logging
import uuid
from abc import ABC, abstractmethod
from typing import Any, AsyncGenerator, Callable, Optional
from llama_index.core.base.llms.types import ChatMessage, ChatResponse, MessageRole
from llama_index.core.llms.function_calling import FunctionCallingLLM
from llama_index.core.tools import (
BaseTool,
FunctionTool,
ToolOutput,
ToolSelection,
)
from llama_index.core.workflow import Context
from pydantic import BaseModel, ConfigDict
from app.workflows.events import AgentRunEvent, AgentRunEventType
logger = logging.getLogger("uvicorn")
class ContextAwareTool(FunctionTool, ABC):
@abstractmethod
async def acall(self, ctx: Context, input: Any) -> ToolOutput: # type: ignore
pass
class ChatWithToolsResponse(BaseModel):
"""
A tool call response from chat_with_tools.
"""
tool_calls: Optional[list[ToolSelection]]
tool_call_message: Optional[ChatMessage]
generator: Optional[AsyncGenerator[ChatResponse | None, None]]
model_config = ConfigDict(arbitrary_types_allowed=True)
def is_calling_different_tools(self) -> bool:
tool_names = {tool_call.tool_name for tool_call in self.tool_calls}
return len(tool_names) > 1
def has_tool_calls(self) -> bool:
return self.tool_calls is not None and len(self.tool_calls) > 0
def tool_name(self) -> str:
assert self.has_tool_calls()
assert not self.is_calling_different_tools()
return self.tool_calls[0].tool_name
async def full_response(self) -> str:
assert self.generator is not None
full_response = ""
async for chunk in self.generator:
content = chunk.message.content
if content:
full_response += content
return full_response
async def chat_with_tools( # type: ignore
llm: FunctionCallingLLM,
tools: list[BaseTool],
chat_history: list[ChatMessage],
) -> ChatWithToolsResponse:
"""
Request LLM to call tools or not.
This function doesn't change the memory.
"""
generator = _tool_call_generator(llm, tools, chat_history)
is_tool_call = await generator.__anext__()
if is_tool_call:
# Last chunk is the full response
# Wait for the last chunk
full_response = None
async for chunk in generator:
full_response = chunk
assert isinstance(full_response, ChatResponse)
return ChatWithToolsResponse(
tool_calls=llm.get_tool_calls_from_response(full_response),
tool_call_message=full_response.message,
generator=None,
)
else:
return ChatWithToolsResponse(
tool_calls=None,
tool_call_message=None,
generator=generator,
)
async def call_tools(
ctx: Context,
agent_name: str,
tools: list[BaseTool],
tool_calls: list[ToolSelection],
emit_agent_events: bool = True,
) -> list[ChatMessage]:
if len(tool_calls) == 0:
return []
tools_by_name = {tool.metadata.get_name(): tool for tool in tools}
if len(tool_calls) == 1:
return [
await call_tool(
ctx,
tools_by_name[tool_calls[0].tool_name],
tool_calls[0],
lambda msg: ctx.write_event_to_stream(
AgentRunEvent(
name=agent_name,
msg=msg,
)
),
)
]
# Multiple tool calls, show progress
tool_msgs: list[ChatMessage] = []
progress_id = str(uuid.uuid4())
total_steps = len(tool_calls)
if emit_agent_events:
ctx.write_event_to_stream(
AgentRunEvent(
name=agent_name,
msg=f"Making {total_steps} tool calls",
)
)
for i, tool_call in enumerate(tool_calls):
tool = tools_by_name.get(tool_call.tool_name)
if not tool:
tool_msgs.append(
ChatMessage(
role=MessageRole.ASSISTANT,
content=f"Tool {tool_call.tool_name} does not exist",
)
)
continue
tool_msg = await call_tool(
ctx,
tool,
tool_call,
event_emitter=lambda msg: ctx.write_event_to_stream(
AgentRunEvent(
name=agent_name,
msg=msg,
event_type=AgentRunEventType.PROGRESS,
data={
"id": progress_id,
"total": total_steps,
"current": i,
},
)
),
)
tool_msgs.append(tool_msg)
return tool_msgs
async def call_tool(
ctx: Context,
tool: BaseTool,
tool_call: ToolSelection,
event_emitter: Optional[Callable[[str], None]],
) -> ChatMessage:
if event_emitter:
event_emitter(
f"Calling tool {tool_call.tool_name}, {str(tool_call.tool_kwargs)}"
)
try:
if isinstance(tool, ContextAwareTool):
if ctx is None:
raise ValueError("Context is required for context aware tool")
# inject context for calling an context aware tool
response = await tool.acall(ctx=ctx, **tool_call.tool_kwargs)
else:
response = await tool.acall(**tool_call.tool_kwargs) # type: ignore
return ChatMessage(
role=MessageRole.TOOL,
content=str(response.raw_output),
additional_kwargs={
"tool_call_id": tool_call.tool_id,
"name": tool.metadata.get_name(),
},
)
except Exception as e:
logger.error(f"Got error in tool {tool_call.tool_name}: {str(e)}")
if event_emitter:
event_emitter(f"Got error in tool {tool_call.tool_name}: {str(e)}")
return ChatMessage(
role=MessageRole.TOOL,
content=f"Error: {str(e)}",
additional_kwargs={
"tool_call_id": tool_call.tool_id,
"name": tool.metadata.get_name(),
},
)
async def _tool_call_generator(
llm: FunctionCallingLLM,
tools: list[BaseTool],
chat_history: list[ChatMessage],
) -> AsyncGenerator[ChatResponse | bool, None]:
response_stream = await llm.astream_chat_with_tools(
tools,
chat_history=chat_history,
allow_parallel_tool_calls=False,
)
full_response = None
yielded_indicator = False
async for chunk in response_stream:
if "tool_calls" not in chunk.message.additional_kwargs:
# Yield a boolean to indicate whether the response is a tool call
if not yielded_indicator:
yield False
yielded_indicator = True
# if not a tool call, yield the chunks!
yield chunk # type: ignore
elif not yielded_indicator:
# Yield the indicator for a tool call
yield True
yielded_indicator = True
full_response = chunk
if full_response:
yield full_response # type: ignore
import logging
from fastapi import APIRouter, BackgroundTasks, HTTPException, Request, status
from app.api.callbacks.llamacloud import LlamaCloudFileDownload
from app.api.callbacks.next_question import SuggestNextQuestions
from app.api.callbacks.stream_handler import StreamHandler
from app.api.callbacks.source_nodes import AddNodeUrl
from app.api.routers.models import (
ChatData,
)
from app.engine.query_filter import generate_filters
from app.workflows import create_workflow
chat_router = r = APIRouter()
logger = logging.getLogger("uvicorn")
@r.post("")
async def chat(
request: Request,
data: ChatData,
background_tasks: BackgroundTasks,
):
try:
last_message_content = data.get_last_message_content()
messages = data.get_history_messages(include_agent_messages=True)
doc_ids = data.get_chat_document_ids()
filters = generate_filters(doc_ids)
params = data.data or {}
workflow = create_workflow(
params=params,
filters=filters,
)
handler = workflow.run(
user_msg=last_message_content,
chat_history=messages,
stream=True,
)
return StreamHandler.from_default(
handler=handler,
callbacks=[
LlamaCloudFileDownload.from_default(background_tasks),
SuggestNextQuestions.from_default(data),
AddNodeUrl.from_default(),
],
).vercel_stream()
except Exception as e:
logger.exception("Error in chat engine", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Error in chat engine: {e}",
) from e
import json
import logging
from fastapi import APIRouter, BackgroundTasks, HTTPException, Request, status
from llama_index.core.agent.workflow import AgentOutput
from llama_index.core.llms import MessageRole
from app.api.callbacks.llamacloud import LlamaCloudFileDownload
from app.api.callbacks.next_question import SuggestNextQuestions
from app.api.callbacks.source_nodes import AddNodeUrl
from app.api.callbacks.stream_handler import StreamHandler
from app.api.callbacks.source_nodes import AddNodeUrl
from app.api.routers.models import (
ChatData,
Message,
Result,
)
from app.engine.engine import get_engine
from app.engine.query_filter import generate_filters
from app.workflows import create_workflow
chat_router = r = APIRouter()
logger = logging.getLogger("uvicorn")
# streaming endpoint - delete if not needed
@r.post("")
async def chat(
request: Request,
......@@ -31,16 +25,18 @@ async def chat(
):
try:
last_message_content = data.get_last_message_content()
messages = data.get_history_messages()
messages = data.get_history_messages(include_agent_messages=True)
doc_ids = data.get_chat_document_ids()
filters = generate_filters(doc_ids)
params = data.data or {}
logger.info(
f"Creating chat engine with filters: {str(filters)}",
workflow = create_workflow(
params=params,
filters=filters,
)
engine = get_engine(filters=filters, params=params)
handler = engine.run(
handler = workflow.run(
user_msg=last_message_content,
chat_history=messages,
stream=True,
......@@ -59,35 +55,3 @@ async def chat(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Error in chat engine: {e}",
) from e
# non-streaming endpoint - delete if not needed
@r.post("/request")
async def chat_request(
data: ChatData,
) -> Result:
last_message_content = data.get_last_message_content()
messages = data.get_history_messages()
doc_ids = data.get_chat_document_ids()
filters = generate_filters(doc_ids)
params = data.data or {}
logger.info(
f"Creating chat engine with filters: {str(filters)}",
)
engine = get_engine(filters=filters, params=params)
response = await engine.run(
user_msg=last_message_content,
chat_history=messages,
stream=False,
)
output = response
if isinstance(output, AgentOutput):
content = output.response.content
else:
content = json.dumps(output)
return Result(
result=Message(role=MessageRole.ASSISTANT, content=content),
)
from .agent import create_workflow
__all__ = ["create_workflow"]
......@@ -2,7 +2,6 @@ import os
from typing import List
from llama_index.core.agent.workflow import AgentWorkflow
from llama_index.core.settings import Settings
from llama_index.core.tools import BaseTool
......@@ -11,7 +10,7 @@ from app.engine.tools import ToolFactory
from app.engine.tools.query_engine import get_query_engine_tool
def get_engine(params=None, **kwargs):
def create_workflow(params=None, **kwargs):
if params is None:
params = {}
system_prompt = os.getenv("SYSTEM_PROMPT")
......