Skip to content
Snippets Groups Projects
Unverified Commit a9ff814f authored by Sherif Abdekarim's avatar Sherif Abdekarim Committed by GitHub
Browse files

Create lazy initialization for async elements in StreamingAgentChatResponse (#12116)

parent a94a68d2
No related branches found
No related tags found
No related merge requests found
......@@ -72,17 +72,17 @@ class StreamingAgentChatResponse:
source_nodes: List[NodeWithScore] = field(default_factory=list)
_unformatted_response: str = ""
_queue: queue.Queue = field(default_factory=queue.Queue)
_aqueue: asyncio.Queue = field(default_factory=asyncio.Queue)
_aqueue: Optional[asyncio.Queue] = None
# flag when chat message is a function call
_is_function: Optional[bool] = None
# flag when processing done
_is_done = False
# signal when a new item is added to the queue
_new_item_event: asyncio.Event = field(default_factory=asyncio.Event)
_new_item_event: Optional[asyncio.Event] = None
# NOTE: async code uses two events rather than one since it yields
# control when waiting for queue item
# signal when the OpenAI functions stop executing
_is_function_false_event: asyncio.Event = field(default_factory=asyncio.Event)
_is_function_false_event: Optional[asyncio.Event] = None
# signal when an OpenAI function is being executed
_is_function_not_none_thread_event: Event = field(default_factory=Event)
......@@ -100,11 +100,20 @@ class StreamingAgentChatResponse:
self.response = self._unformatted_response.strip()
return self.response
def _ensure_async_setup(self) -> None:
if self._aqueue is None:
self._aqueue = asyncio.Queue()
if self._new_item_event is None:
self._new_item_event = asyncio.Event()
if self._is_function_false_event is None:
self._is_function_false_event = asyncio.Event()
def put_in_queue(self, delta: Optional[str]) -> None:
self._queue.put_nowait(delta)
self._is_function_not_none_thread_event.set()
def aput_in_queue(self, delta: Optional[str]) -> None:
self._ensure_async_setup()
self._aqueue.put_nowait(delta)
self._new_item_event.set()
......@@ -207,6 +216,7 @@ class StreamingAgentChatResponse:
self.response = self._unformatted_response.strip()
async def async_response_gen(self) -> AsyncGenerator[str, None]:
self._ensure_async_setup()
while True:
if not self._aqueue.empty() or not self._is_done:
try:
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment