From 4c25116d4f58d5739c3a80d91b7162d2ee465206 Mon Sep 17 00:00:00 2001 From: Mateusz Charytoniuk <mateusz.charytoniuk@protonmail.com> Date: Sat, 23 Mar 2024 22:02:08 +0100 Subject: [PATCH] chore: handle swoole timeouts without throwing exceptions --- src/LlamaCppClient.php | 6 ++++- src/LlamaCppCompletionIterator.php | 6 ++--- src/ObservableTaskTable.php | 11 ++++++++- src/ObservableTaskTableRow.php | 3 +++ src/ObservableTaskTimeoutIterator.php | 24 +++++++++++++++---- src/PromptSubjectResponderAggregate.php | 1 + src/PromptSubjectResponse.php | 24 ++++++++++++++++--- src/PromptSubjectResponseChunk.php | 1 + src/SwooleChannelIterator.php | 10 ++++---- src/SwooleChannelIteratorChunk.php | 16 +++++++++++++ src/SwooleChannelIteratorError.php | 12 ++++++++++ .../LlamaCppSubjectActionPromptResponder.php | 8 +++++-- src/views/observable_tasks_dashboard.twig | 2 ++ 13 files changed, 104 insertions(+), 20 deletions(-) create mode 100644 src/SwooleChannelIteratorChunk.php create mode 100644 src/SwooleChannelIteratorError.php diff --git a/src/LlamaCppClient.php b/src/LlamaCppClient.php index 5f756c57..455e51a0 100644 --- a/src/LlamaCppClient.php +++ b/src/LlamaCppClient.php @@ -78,10 +78,14 @@ readonly class LlamaCppClient $responseChunks = $this->streamResponse($request, '/infill'); foreach ($responseChunks as $responseChunk) { + if ($responseChunk instanceof SwooleChannelIteratorError) { + throw new RuntimeException('Unable to generate infill'); + } + /** * @var object{ content: string } */ - $token = $this->jsonSerializer->unserialize($responseChunk->chunk); + $token = $this->jsonSerializer->unserialize($responseChunk->data->chunk); yield new LlamaCppInfill( after: $request->after, diff --git a/src/LlamaCppCompletionIterator.php b/src/LlamaCppCompletionIterator.php index be43c8dc..f4eb21a7 100644 --- a/src/LlamaCppCompletionIterator.php +++ b/src/LlamaCppCompletionIterator.php @@ -35,9 +35,7 @@ readonly class LlamaCppCompletionIterator implements IteratorAggregate $previousChunk = ''; foreach ($this->responseChunks as $responseChunk) { - if (ObservableTaskStatus::Failed === $responseChunk->status) { - $previousChunk = ''; - + if ($responseChunk instanceof SwooleChannelIteratorError || ObservableTaskStatus::Failed === $responseChunk->data->status) { yield new LlamaCppCompletionToken( content: '', isFailed: true, @@ -47,7 +45,7 @@ readonly class LlamaCppCompletionIterator implements IteratorAggregate break; } - $previousChunk .= $responseChunk->chunk; + $previousChunk .= $responseChunk->data->chunk; /** * @var null|object{ diff --git a/src/ObservableTaskTable.php b/src/ObservableTaskTable.php index b168bed6..d59d1909 100644 --- a/src/ObservableTaskTable.php +++ b/src/ObservableTaskTable.php @@ -4,6 +4,7 @@ declare(strict_types=1); namespace Distantmagic\Resonance; +use DateTimeImmutable; use Distantmagic\Resonance\Attribute\Singleton; use Ds\Set; use Generator; @@ -39,6 +40,7 @@ readonly class ObservableTaskTable implements IteratorAggregate $this->table = new Table(2 * $observableTaskConfiguration->maxTasks); $this->table->column('category', Table::TYPE_STRING, 255); + $this->table->column('modified_at', Table::TYPE_INT); $this->table->column('name', Table::TYPE_STRING, 255); $this->table->column('status', Table::TYPE_STRING, $observableTaskConfiguration->serializedStatusSize); $this->table->create(); @@ -91,6 +93,7 @@ readonly class ObservableTaskTable implements IteratorAggregate if ( !$this->table->set($slotId, [ 'category' => $observableTask->getCategory(), + 'modified_at' => time(), 'name' => $observableTask->getName(), 'status' => $this->serializedPendingStatus, ]) @@ -102,6 +105,7 @@ readonly class ObservableTaskTable implements IteratorAggregate if ( !$this->table->set($slotId, [ 'category' => $observableTask->getCategory(), + 'modified_at' => time(), 'name' => $observableTask->getName(), 'status' => $this->serializer->serialize($statusUpdate), ]) @@ -142,13 +146,18 @@ readonly class ObservableTaskTable implements IteratorAggregate $observableTaskStatusUpdate = $this->unserializeTableStatusColumn($row); - if (is_null($observableTaskStatusUpdate) || !is_string($row['name']) || !is_string($row['category'])) { + if (is_null($observableTaskStatusUpdate) + || !is_string($row['name']) + || !is_string($row['category']) + || !is_int($row['modified_at']) + ) { return null; } return new ObservableTaskTableRow( name: $row['name'], category: $row['category'], + modifiedAt: (new DateTimeImmutable())->setTimestamp($row['modified_at']), observableTaskStatusUpdate: $observableTaskStatusUpdate, ); } diff --git a/src/ObservableTaskTableRow.php b/src/ObservableTaskTableRow.php index e1db6fe3..7d587cc0 100644 --- a/src/ObservableTaskTableRow.php +++ b/src/ObservableTaskTableRow.php @@ -4,6 +4,8 @@ declare(strict_types=1); namespace Distantmagic\Resonance; +use DateTimeImmutable; + /** * @psalm-suppress PossiblyUnusedProperty it's used in the templates */ @@ -13,5 +15,6 @@ readonly class ObservableTaskTableRow public ObservableTaskStatusUpdate $observableTaskStatusUpdate, public string $category, public string $name, + public DateTimeImmutable $modifiedAt, ) {} } diff --git a/src/ObservableTaskTimeoutIterator.php b/src/ObservableTaskTimeoutIterator.php index 83b2f3d4..38aac69e 100644 --- a/src/ObservableTaskTimeoutIterator.php +++ b/src/ObservableTaskTimeoutIterator.php @@ -31,9 +31,9 @@ readonly class ObservableTaskTimeoutIterator implements IteratorAggregate } /** - * @return SwooleChannelIterator<ObservableTaskStatusUpdate> + * @return Generator<ObservableTaskStatusUpdate> */ - public function __invoke(): SwooleChannelIterator + public function __invoke(): Generator { return $this->getIterator(); } @@ -41,9 +41,9 @@ readonly class ObservableTaskTimeoutIterator implements IteratorAggregate /** * @psalm-suppress UnusedVariable $generatorCoroutineId is used, just asynchronously * - * @return SwooleChannelIterator<ObservableTaskStatusUpdate> + * @return Generator<ObservableTaskStatusUpdate> */ - public function getIterator(): SwooleChannelIterator + public function getIterator(): Generator { /** * @var null|int $generatorCoroutineId @@ -88,9 +88,23 @@ readonly class ObservableTaskTimeoutIterator implements IteratorAggregate /** * @var SwooleChannelIterator<ObservableTaskStatusUpdate> */ - return new SwooleChannelIterator( + $swooleChannelIterator = new SwooleChannelIterator( channel: $channel, timeout: $this->inactivityTimeout, ); + + foreach ($swooleChannelIterator as $observableTaskStatusUpdate) { + if ($observableTaskStatusUpdate instanceof SwooleChannelIteratorError) { + if ($observableTaskStatusUpdate->isTimeout) { + yield new ObservableTaskStatusUpdate(ObservableTaskStatus::TimedOut, null); + } else { + yield new ObservableTaskStatusUpdate(ObservableTaskStatus::Failed, null); + } + + break; + } + + yield $observableTaskStatusUpdate->data; + } } } diff --git a/src/PromptSubjectResponderAggregate.php b/src/PromptSubjectResponderAggregate.php index cd974eb1..5193006f 100644 --- a/src/PromptSubjectResponderAggregate.php +++ b/src/PromptSubjectResponderAggregate.php @@ -39,6 +39,7 @@ readonly class PromptSubjectResponderAggregate yield new PromptSubjectResponseChunk( isFailed: true, isLastChunk: true, + isTimeout: false, payload: '', ); } diff --git a/src/PromptSubjectResponse.php b/src/PromptSubjectResponse.php index cd02c328..d09d083f 100644 --- a/src/PromptSubjectResponse.php +++ b/src/PromptSubjectResponse.php @@ -4,6 +4,7 @@ declare(strict_types=1); namespace Distantmagic\Resonance; +use Generator; use IteratorAggregate; use Swoole\Coroutine\Channel; @@ -31,6 +32,7 @@ readonly class PromptSubjectResponse implements IteratorAggregate $this->channel->push(new PromptSubjectResponseChunk( isFailed: false, isLastChunk: true, + isTimeout: false, payload: $payload, )); } finally { @@ -39,17 +41,32 @@ readonly class PromptSubjectResponse implements IteratorAggregate } /** - * @return SwooleChannelIterator<PromptSubjectResponseChunk> + * @return Generator<PromptSubjectResponseChunk> */ - public function getIterator(): SwooleChannelIterator + public function getIterator(): Generator { /** * @var SwooleChannelIterator<PromptSubjectResponseChunk> */ - return new SwooleChannelIterator( + $swooleChannelIterator = new SwooleChannelIterator( channel: $this->channel, timeout: $this->inactivityTimeout, ); + + foreach ($swooleChannelIterator as $promptSubjectResponseChunk) { + if ($promptSubjectResponseChunk instanceof SwooleChannelIteratorError) { + yield new PromptSubjectResponseChunk( + isFailed: true, + isLastChunk: true, + isTimeout: $promptSubjectResponseChunk->isTimeout, + payload: null, + ); + + break; + } + + yield $promptSubjectResponseChunk->data; + } } public function write(mixed $payload): void @@ -57,6 +74,7 @@ readonly class PromptSubjectResponse implements IteratorAggregate $this->channel->push(new PromptSubjectResponseChunk( isFailed: false, isLastChunk: false, + isTimeout: false, payload: $payload, )); } diff --git a/src/PromptSubjectResponseChunk.php b/src/PromptSubjectResponseChunk.php index 756ed319..e05fecbb 100644 --- a/src/PromptSubjectResponseChunk.php +++ b/src/PromptSubjectResponseChunk.php @@ -9,6 +9,7 @@ readonly class PromptSubjectResponseChunk public function __construct( public bool $isFailed, public bool $isLastChunk, + public bool $isTimeout, public mixed $payload, ) {} } diff --git a/src/SwooleChannelIterator.php b/src/SwooleChannelIterator.php index b11a631d..93ee3762 100644 --- a/src/SwooleChannelIterator.php +++ b/src/SwooleChannelIterator.php @@ -12,7 +12,7 @@ use Swoole\Coroutine\Channel; /** * @template TData * - * @template-implements IteratorAggregate<TData> + * @template-implements IteratorAggregate<SwooleChannelIteratorChunk<TData>|SwooleChannelIteratorError> */ readonly class SwooleChannelIterator implements IteratorAggregate { @@ -30,7 +30,7 @@ readonly class SwooleChannelIterator implements IteratorAggregate * @psalm-suppress TypeDoesNotContainType false positive, swoole channel * status * - * @return Generator<int,TData,bool> + * @return Generator<SwooleChannelIteratorChunk<TData>|SwooleChannelIteratorError> */ public function getIterator(): Generator { @@ -45,7 +45,9 @@ readonly class SwooleChannelIterator implements IteratorAggregate $data = $this->channel->pop($this->timeout); if (SWOOLE_CHANNEL_TIMEOUT === $this->channel->errCode) { - throw new RuntimeException('Channel timed out'); + yield new SwooleChannelIteratorError(isTimeout: true); + + break; } if (false === $data) { @@ -61,7 +63,7 @@ readonly class SwooleChannelIterator implements IteratorAggregate * @psalm-suppress RedundantCondition errCode might change async */ if (SWOOLE_CHANNEL_OK === $this->channel->errCode) { - yield $data; + yield new SwooleChannelIteratorChunk($data); } } while (true); } diff --git a/src/SwooleChannelIteratorChunk.php b/src/SwooleChannelIteratorChunk.php new file mode 100644 index 00000000..e41f9a5b --- /dev/null +++ b/src/SwooleChannelIteratorChunk.php @@ -0,0 +1,16 @@ +<?php + +declare(strict_types=1); + +namespace Distantmagic\Resonance; + +/** + * @template TData + */ +readonly class SwooleChannelIteratorChunk +{ + /** + * @param TData $data + */ + public function __construct(public mixed $data) {} +} diff --git a/src/SwooleChannelIteratorError.php b/src/SwooleChannelIteratorError.php new file mode 100644 index 00000000..613db3de --- /dev/null +++ b/src/SwooleChannelIteratorError.php @@ -0,0 +1,12 @@ +<?php + +declare(strict_types=1); + +namespace Distantmagic\Resonance; + +readonly class SwooleChannelIteratorError +{ + public function __construct( + public bool $isTimeout, + ) {} +} diff --git a/src/WebSocketJsonRPCResponder/LlamaCppSubjectActionPromptResponder.php b/src/WebSocketJsonRPCResponder/LlamaCppSubjectActionPromptResponder.php index fcb48713..862c3d4b 100644 --- a/src/WebSocketJsonRPCResponder/LlamaCppSubjectActionPromptResponder.php +++ b/src/WebSocketJsonRPCResponder/LlamaCppSubjectActionPromptResponder.php @@ -144,8 +144,12 @@ abstract readonly class LlamaCppSubjectActionPromptResponder extends WebSocketJs ; foreach ($response as $responseChunk) { - if ($responseChunk->isFailed) { - yield new ObservableTaskStatusUpdate(ObservableTaskStatus::Failed, null); + if ($responseChunk->isFailed || $responseChunk->isTimeout) { + if ($responseChunk->isTimeout) { + yield new ObservableTaskStatusUpdate(ObservableTaskStatus::TimedOut, null); + } else { + yield new ObservableTaskStatusUpdate(ObservableTaskStatus::Failed, null); + } $this->onRequestFailure( webSocketAuthResolution: $webSocketAuthResolution, diff --git a/src/views/observable_tasks_dashboard.twig b/src/views/observable_tasks_dashboard.twig index 297fed2a..0d85c4c3 100644 --- a/src/views/observable_tasks_dashboard.twig +++ b/src/views/observable_tasks_dashboard.twig @@ -13,6 +13,7 @@ <th>status</th> <th>category</th> <th>name</th> + <th>last update</th> </tr> </thead> <tbody> @@ -22,6 +23,7 @@ <td>{{ observableTask.observableTaskStatusUpdate.status.value }}</td> <td>{{ observableTask.category }}</td> <td>{{ observableTask.name }}</td> + <td>{{ observableTask.modifiedAt|intl_format_date(request) }}</td> </tr> {% endfor %} </tbody> -- GitLab