From c5a49e0f6609d914dde96c9600c98c335896928b Mon Sep 17 00:00:00 2001 From: Logan <logan.markewich@live.com> Date: Tue, 30 Apr 2024 09:31:55 -0600 Subject: [PATCH] remove error ignoring during streaming (#13160) --- .../llama_index/core/chat_engine/types.py | 33 ++++++++++++++----- 1 file changed, 25 insertions(+), 8 deletions(-) diff --git a/llama-index-core/llama_index/core/chat_engine/types.py b/llama-index-core/llama_index/core/chat_engine/types.py index a987281f61..1163806622 100644 --- a/llama-index-core/llama_index/core/chat_engine/types.py +++ b/llama-index-core/llama_index/core/chat_engine/types.py @@ -111,6 +111,8 @@ class StreamingAgentChatResponse: _is_function_false_event: Optional[asyncio.Event] = None # signal when an OpenAI function is being executed _is_function_not_none_thread_event: Event = field(default_factory=Event) + # Track if an exception occurred + _exception: Optional[Exception] = None def __post_init__(self) -> None: if self.sources and not self.source_nodes: @@ -147,7 +149,6 @@ class StreamingAgentChatResponse: self, memory: BaseMemory, on_stream_end_fn: Optional[callable] = None, - raise_error: bool = False, ) -> None: if self.chat_stream is None: raise ValueError( @@ -176,12 +177,14 @@ class StreamingAgentChatResponse: memory.put(chat.message) except Exception as e: dispatch_event(StreamChatErrorEvent(exception=e)) - if not raise_error: - logger.warning( - f"Encountered exception writing response to history: {e}" - ) - else: - raise + self._exception = e + + # This act as is_done events for any consumers waiting + self._is_function_not_none_thread_event.set() + + # force the queue reader to see the exception + self.put_in_queue("") + raise dispatch_event(StreamChatEndEvent()) self._is_done = True @@ -230,7 +233,15 @@ class StreamingAgentChatResponse: memory.put(chat.message) except Exception as e: dispatch_event(StreamChatErrorEvent(exception=e)) - logger.warning(f"Encountered exception writing response to history: {e}") + self._exception = e + + # These act as is_done events for any consumers waiting + self._is_function_false_event.set() + self._new_item_event.set() + + # force the queue reader to see the exception + self.aput_in_queue("") + raise dispatch_event(StreamChatEndEvent()) self._is_done = True @@ -243,6 +254,9 @@ class StreamingAgentChatResponse: @property def response_gen(self) -> Generator[str, None, None]: while not self._is_done or not self._queue.empty(): + if self._exception is not None: + raise self._exception + try: delta = self._queue.get(block=False) self._unformatted_response += delta @@ -256,6 +270,9 @@ class StreamingAgentChatResponse: self._ensure_async_setup() while True: if not self._aqueue.empty() or not self._is_done: + if self._exception is not None: + raise self._exception + try: delta = await asyncio.wait_for(self._aqueue.get(), timeout=0.1) except asyncio.TimeoutError: -- GitLab