Skip to content
Snippets Groups Projects
Unverified Commit c5a49e0f authored by Logan's avatar Logan Committed by GitHub
Browse files

remove error ignoring during streaming (#13160)

parent 805d8ef2
No related branches found
No related tags found
No related merge requests found
......@@ -111,6 +111,8 @@ class StreamingAgentChatResponse:
_is_function_false_event: Optional[asyncio.Event] = None
# signal when an OpenAI function is being executed
_is_function_not_none_thread_event: Event = field(default_factory=Event)
# Track if an exception occurred
_exception: Optional[Exception] = None
def __post_init__(self) -> None:
if self.sources and not self.source_nodes:
......@@ -147,7 +149,6 @@ class StreamingAgentChatResponse:
self,
memory: BaseMemory,
on_stream_end_fn: Optional[callable] = None,
raise_error: bool = False,
) -> None:
if self.chat_stream is None:
raise ValueError(
......@@ -176,12 +177,14 @@ class StreamingAgentChatResponse:
memory.put(chat.message)
except Exception as e:
dispatch_event(StreamChatErrorEvent(exception=e))
if not raise_error:
logger.warning(
f"Encountered exception writing response to history: {e}"
)
else:
raise
self._exception = e
# This act as is_done events for any consumers waiting
self._is_function_not_none_thread_event.set()
# force the queue reader to see the exception
self.put_in_queue("")
raise
dispatch_event(StreamChatEndEvent())
self._is_done = True
......@@ -230,7 +233,15 @@ class StreamingAgentChatResponse:
memory.put(chat.message)
except Exception as e:
dispatch_event(StreamChatErrorEvent(exception=e))
logger.warning(f"Encountered exception writing response to history: {e}")
self._exception = e
# These act as is_done events for any consumers waiting
self._is_function_false_event.set()
self._new_item_event.set()
# force the queue reader to see the exception
self.aput_in_queue("")
raise
dispatch_event(StreamChatEndEvent())
self._is_done = True
......@@ -243,6 +254,9 @@ class StreamingAgentChatResponse:
@property
def response_gen(self) -> Generator[str, None, None]:
while not self._is_done or not self._queue.empty():
if self._exception is not None:
raise self._exception
try:
delta = self._queue.get(block=False)
self._unformatted_response += delta
......@@ -256,6 +270,9 @@ class StreamingAgentChatResponse:
self._ensure_async_setup()
while True:
if not self._aqueue.empty() or not self._is_done:
if self._exception is not None:
raise self._exception
try:
delta = await asyncio.wait_for(self._aqueue.get(), timeout=0.1)
except asyncio.TimeoutError:
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment