Skip to content
Snippets Groups Projects
Unverified Commit 9d8af88b authored by Logan's avatar Logan Committed by GitHub
Browse files

fix async streaming (#12187)

parent f54f2bd9
No related branches found
No related tags found
No related merge requests found
...@@ -615,6 +615,8 @@ class ReActAgentWorker(BaseAgentWorker): ...@@ -615,6 +615,8 @@ class ReActAgentWorker(BaseAgentWorker):
) )
) )
# wait until response writing is done # wait until response writing is done
agent_response._ensure_async_setup()
await agent_response._is_function_false_event.wait() await agent_response._is_function_false_event.wait()
return self._get_task_step_response(agent_response, step, is_done) return self._get_task_step_response(agent_response, step, is_done)
......
...@@ -113,7 +113,6 @@ class StreamingAgentChatResponse: ...@@ -113,7 +113,6 @@ class StreamingAgentChatResponse:
self._is_function_not_none_thread_event.set() self._is_function_not_none_thread_event.set()
def aput_in_queue(self, delta: Optional[str]) -> None: def aput_in_queue(self, delta: Optional[str]) -> None:
self._ensure_async_setup()
self._aqueue.put_nowait(delta) self._aqueue.put_nowait(delta)
self._new_item_event.set() self._new_item_event.set()
...@@ -167,6 +166,8 @@ class StreamingAgentChatResponse: ...@@ -167,6 +166,8 @@ class StreamingAgentChatResponse:
memory: BaseMemory, memory: BaseMemory,
on_stream_end_fn: Optional[callable] = None, on_stream_end_fn: Optional[callable] = None,
) -> None: ) -> None:
self._ensure_async_setup()
if self.achat_stream is None: if self.achat_stream is None:
raise ValueError( raise ValueError(
"achat_stream is None. Cannot asynchronously write to " "achat_stream is None. Cannot asynchronously write to "
......
...@@ -248,7 +248,9 @@ class BaseOpenAIAgent(BaseAgent): ...@@ -248,7 +248,9 @@ class BaseOpenAIAgent(BaseAgent):
chat_stream_response.awrite_response_to_history(self.memory) chat_stream_response.awrite_response_to_history(self.memory)
) )
# wait until openAI functions stop executing # wait until openAI functions stop executing
chat_stream_response._ensure_async_setup()
await chat_stream_response._is_function_false_event.wait() await chat_stream_response._is_function_false_event.wait()
# return response stream # return response stream
return chat_stream_response return chat_stream_response
......
...@@ -28,7 +28,7 @@ exclude = ["**/BUILD"] ...@@ -28,7 +28,7 @@ exclude = ["**/BUILD"]
license = "MIT" license = "MIT"
name = "llama-index-agent-openai-legacy" name = "llama-index-agent-openai-legacy"
readme = "README.md" readme = "README.md"
version = "0.1.2" version = "0.1.3"
[tool.poetry.dependencies] [tool.poetry.dependencies]
python = ">=3.8.1,<4.0" python = ">=3.8.1,<4.0"
......
...@@ -312,8 +312,11 @@ class OpenAIAgentWorker(BaseAgentWorker): ...@@ -312,8 +312,11 @@ class OpenAIAgentWorker(BaseAgentWorker):
on_stream_end_fn=partial(self.finalize_task, task), on_stream_end_fn=partial(self.finalize_task, task),
) )
) )
chat_stream_response._ensure_async_setup()
# wait until openAI functions stop executing # wait until openAI functions stop executing
await chat_stream_response._is_function_false_event.wait() await chat_stream_response._is_function_false_event.wait()
# return response stream # return response stream
return chat_stream_response return chat_stream_response
......
...@@ -28,7 +28,7 @@ exclude = ["**/BUILD"] ...@@ -28,7 +28,7 @@ exclude = ["**/BUILD"]
license = "MIT" license = "MIT"
name = "llama-index-agent-openai" name = "llama-index-agent-openai"
readme = "README.md" readme = "README.md"
version = "0.1.6" version = "0.1.7"
[tool.poetry.dependencies] [tool.poetry.dependencies]
python = ">=3.8.1,<4.0" python = ">=3.8.1,<4.0"
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment