diff --git a/src/LlamaCppClient.php b/src/LlamaCppClient.php index 5f756c57096168ace4c8eebf48c190e965a3a1a8..455e51a0173edffa8c2b218685c110ae4426de8c 100644 --- a/src/LlamaCppClient.php +++ b/src/LlamaCppClient.php @@ -78,10 +78,14 @@ readonly class LlamaCppClient $responseChunks = $this->streamResponse($request, '/infill'); foreach ($responseChunks as $responseChunk) { + if ($responseChunk instanceof SwooleChannelIteratorError) { + throw new RuntimeException('Unable to generate infill'); + } + /** * @var object{ content: string } */ - $token = $this->jsonSerializer->unserialize($responseChunk->chunk); + $token = $this->jsonSerializer->unserialize($responseChunk->data->chunk); yield new LlamaCppInfill( after: $request->after, diff --git a/src/LlamaCppCompletionIterator.php b/src/LlamaCppCompletionIterator.php index be43c8dc7802103f4bf5aece45a0d9b8973d17c0..f4eb21a75721b09b902f67a716a8fb4e46f41fa8 100644 --- a/src/LlamaCppCompletionIterator.php +++ b/src/LlamaCppCompletionIterator.php @@ -35,9 +35,7 @@ readonly class LlamaCppCompletionIterator implements IteratorAggregate $previousChunk = ''; foreach ($this->responseChunks as $responseChunk) { - if (ObservableTaskStatus::Failed === $responseChunk->status) { - $previousChunk = ''; - + if ($responseChunk instanceof SwooleChannelIteratorError || ObservableTaskStatus::Failed === $responseChunk->data->status) { yield new LlamaCppCompletionToken( content: '', isFailed: true, @@ -47,7 +45,7 @@ readonly class LlamaCppCompletionIterator implements IteratorAggregate break; } - $previousChunk .= $responseChunk->chunk; + $previousChunk .= $responseChunk->data->chunk; /** * @var null|object{ diff --git a/src/ObservableTaskTable.php b/src/ObservableTaskTable.php index b168bed6f4ac441041df3a79435a2503e5487f5f..d59d1909cea8c6f9ec19effb971c6688ebbd15b1 100644 --- a/src/ObservableTaskTable.php +++ b/src/ObservableTaskTable.php @@ -4,6 +4,7 @@ declare(strict_types=1); namespace Distantmagic\Resonance; +use DateTimeImmutable; use Distantmagic\Resonance\Attribute\Singleton; use Ds\Set; use Generator; @@ -39,6 +40,7 @@ readonly class ObservableTaskTable implements IteratorAggregate $this->table = new Table(2 * $observableTaskConfiguration->maxTasks); $this->table->column('category', Table::TYPE_STRING, 255); + $this->table->column('modified_at', Table::TYPE_INT); $this->table->column('name', Table::TYPE_STRING, 255); $this->table->column('status', Table::TYPE_STRING, $observableTaskConfiguration->serializedStatusSize); $this->table->create(); @@ -91,6 +93,7 @@ readonly class ObservableTaskTable implements IteratorAggregate if ( !$this->table->set($slotId, [ 'category' => $observableTask->getCategory(), + 'modified_at' => time(), 'name' => $observableTask->getName(), 'status' => $this->serializedPendingStatus, ]) @@ -102,6 +105,7 @@ readonly class ObservableTaskTable implements IteratorAggregate if ( !$this->table->set($slotId, [ 'category' => $observableTask->getCategory(), + 'modified_at' => time(), 'name' => $observableTask->getName(), 'status' => $this->serializer->serialize($statusUpdate), ]) @@ -142,13 +146,18 @@ readonly class ObservableTaskTable implements IteratorAggregate $observableTaskStatusUpdate = $this->unserializeTableStatusColumn($row); - if (is_null($observableTaskStatusUpdate) || !is_string($row['name']) || !is_string($row['category'])) { + if (is_null($observableTaskStatusUpdate) + || !is_string($row['name']) + || !is_string($row['category']) + || !is_int($row['modified_at']) + ) { return null; } return new ObservableTaskTableRow( name: $row['name'], category: $row['category'], + modifiedAt: (new DateTimeImmutable())->setTimestamp($row['modified_at']), observableTaskStatusUpdate: $observableTaskStatusUpdate, ); } diff --git a/src/ObservableTaskTableRow.php b/src/ObservableTaskTableRow.php index e1db6fe304f881c263d75ef5578b723f68ae4f82..7d587cc0110f48000528c5bca17a79f0944dbe47 100644 --- a/src/ObservableTaskTableRow.php +++ b/src/ObservableTaskTableRow.php @@ -4,6 +4,8 @@ declare(strict_types=1); namespace Distantmagic\Resonance; +use DateTimeImmutable; + /** * @psalm-suppress PossiblyUnusedProperty it's used in the templates */ @@ -13,5 +15,6 @@ readonly class ObservableTaskTableRow public ObservableTaskStatusUpdate $observableTaskStatusUpdate, public string $category, public string $name, + public DateTimeImmutable $modifiedAt, ) {} } diff --git a/src/ObservableTaskTimeoutIterator.php b/src/ObservableTaskTimeoutIterator.php index 83b2f3d402bfc5ff57bbbcbcb39313f65d458f3b..38aac69e4f03586b0590dff330ba798f0f914c5e 100644 --- a/src/ObservableTaskTimeoutIterator.php +++ b/src/ObservableTaskTimeoutIterator.php @@ -31,9 +31,9 @@ readonly class ObservableTaskTimeoutIterator implements IteratorAggregate } /** - * @return SwooleChannelIterator<ObservableTaskStatusUpdate> + * @return Generator<ObservableTaskStatusUpdate> */ - public function __invoke(): SwooleChannelIterator + public function __invoke(): Generator { return $this->getIterator(); } @@ -41,9 +41,9 @@ readonly class ObservableTaskTimeoutIterator implements IteratorAggregate /** * @psalm-suppress UnusedVariable $generatorCoroutineId is used, just asynchronously * - * @return SwooleChannelIterator<ObservableTaskStatusUpdate> + * @return Generator<ObservableTaskStatusUpdate> */ - public function getIterator(): SwooleChannelIterator + public function getIterator(): Generator { /** * @var null|int $generatorCoroutineId @@ -88,9 +88,23 @@ readonly class ObservableTaskTimeoutIterator implements IteratorAggregate /** * @var SwooleChannelIterator<ObservableTaskStatusUpdate> */ - return new SwooleChannelIterator( + $swooleChannelIterator = new SwooleChannelIterator( channel: $channel, timeout: $this->inactivityTimeout, ); + + foreach ($swooleChannelIterator as $observableTaskStatusUpdate) { + if ($observableTaskStatusUpdate instanceof SwooleChannelIteratorError) { + if ($observableTaskStatusUpdate->isTimeout) { + yield new ObservableTaskStatusUpdate(ObservableTaskStatus::TimedOut, null); + } else { + yield new ObservableTaskStatusUpdate(ObservableTaskStatus::Failed, null); + } + + break; + } + + yield $observableTaskStatusUpdate->data; + } } } diff --git a/src/PromptSubjectResponderAggregate.php b/src/PromptSubjectResponderAggregate.php index cd974eb1054a344acf5acb86e3ee37470edda82b..5193006f1e67ea67857f1d7ff72307e5d2846025 100644 --- a/src/PromptSubjectResponderAggregate.php +++ b/src/PromptSubjectResponderAggregate.php @@ -39,6 +39,7 @@ readonly class PromptSubjectResponderAggregate yield new PromptSubjectResponseChunk( isFailed: true, isLastChunk: true, + isTimeout: false, payload: '', ); } diff --git a/src/PromptSubjectResponse.php b/src/PromptSubjectResponse.php index cd02c328537fdbec29616f32232aa28c3ba5a476..d09d083fcf1fc1be830a0858e467f83583df6f64 100644 --- a/src/PromptSubjectResponse.php +++ b/src/PromptSubjectResponse.php @@ -4,6 +4,7 @@ declare(strict_types=1); namespace Distantmagic\Resonance; +use Generator; use IteratorAggregate; use Swoole\Coroutine\Channel; @@ -31,6 +32,7 @@ readonly class PromptSubjectResponse implements IteratorAggregate $this->channel->push(new PromptSubjectResponseChunk( isFailed: false, isLastChunk: true, + isTimeout: false, payload: $payload, )); } finally { @@ -39,17 +41,32 @@ readonly class PromptSubjectResponse implements IteratorAggregate } /** - * @return SwooleChannelIterator<PromptSubjectResponseChunk> + * @return Generator<PromptSubjectResponseChunk> */ - public function getIterator(): SwooleChannelIterator + public function getIterator(): Generator { /** * @var SwooleChannelIterator<PromptSubjectResponseChunk> */ - return new SwooleChannelIterator( + $swooleChannelIterator = new SwooleChannelIterator( channel: $this->channel, timeout: $this->inactivityTimeout, ); + + foreach ($swooleChannelIterator as $promptSubjectResponseChunk) { + if ($promptSubjectResponseChunk instanceof SwooleChannelIteratorError) { + yield new PromptSubjectResponseChunk( + isFailed: true, + isLastChunk: true, + isTimeout: $promptSubjectResponseChunk->isTimeout, + payload: null, + ); + + break; + } + + yield $promptSubjectResponseChunk->data; + } } public function write(mixed $payload): void @@ -57,6 +74,7 @@ readonly class PromptSubjectResponse implements IteratorAggregate $this->channel->push(new PromptSubjectResponseChunk( isFailed: false, isLastChunk: false, + isTimeout: false, payload: $payload, )); } diff --git a/src/PromptSubjectResponseChunk.php b/src/PromptSubjectResponseChunk.php index 756ed3195b434e007d3feb869e3fa211cdea2039..e05fecbbd1c4c6ddf38ea411bafadd73ef51f403 100644 --- a/src/PromptSubjectResponseChunk.php +++ b/src/PromptSubjectResponseChunk.php @@ -9,6 +9,7 @@ readonly class PromptSubjectResponseChunk public function __construct( public bool $isFailed, public bool $isLastChunk, + public bool $isTimeout, public mixed $payload, ) {} } diff --git a/src/SwooleChannelIterator.php b/src/SwooleChannelIterator.php index b11a631d1badd00effa3bebed2930625fd62ea0a..93ee37628fa972b454e5ab963acfd507a617b3a3 100644 --- a/src/SwooleChannelIterator.php +++ b/src/SwooleChannelIterator.php @@ -12,7 +12,7 @@ use Swoole\Coroutine\Channel; /** * @template TData * - * @template-implements IteratorAggregate<TData> + * @template-implements IteratorAggregate<SwooleChannelIteratorChunk<TData>|SwooleChannelIteratorError> */ readonly class SwooleChannelIterator implements IteratorAggregate { @@ -30,7 +30,7 @@ readonly class SwooleChannelIterator implements IteratorAggregate * @psalm-suppress TypeDoesNotContainType false positive, swoole channel * status * - * @return Generator<int,TData,bool> + * @return Generator<SwooleChannelIteratorChunk<TData>|SwooleChannelIteratorError> */ public function getIterator(): Generator { @@ -45,7 +45,9 @@ readonly class SwooleChannelIterator implements IteratorAggregate $data = $this->channel->pop($this->timeout); if (SWOOLE_CHANNEL_TIMEOUT === $this->channel->errCode) { - throw new RuntimeException('Channel timed out'); + yield new SwooleChannelIteratorError(isTimeout: true); + + break; } if (false === $data) { @@ -61,7 +63,7 @@ readonly class SwooleChannelIterator implements IteratorAggregate * @psalm-suppress RedundantCondition errCode might change async */ if (SWOOLE_CHANNEL_OK === $this->channel->errCode) { - yield $data; + yield new SwooleChannelIteratorChunk($data); } } while (true); } diff --git a/src/SwooleChannelIteratorChunk.php b/src/SwooleChannelIteratorChunk.php new file mode 100644 index 0000000000000000000000000000000000000000..e41f9a5b7b2477a973d5b5e98d7eddace6c59e0a --- /dev/null +++ b/src/SwooleChannelIteratorChunk.php @@ -0,0 +1,16 @@ +<?php + +declare(strict_types=1); + +namespace Distantmagic\Resonance; + +/** + * @template TData + */ +readonly class SwooleChannelIteratorChunk +{ + /** + * @param TData $data + */ + public function __construct(public mixed $data) {} +} diff --git a/src/SwooleChannelIteratorError.php b/src/SwooleChannelIteratorError.php new file mode 100644 index 0000000000000000000000000000000000000000..613db3de48be053d16666bca745bb64a08a596fe --- /dev/null +++ b/src/SwooleChannelIteratorError.php @@ -0,0 +1,12 @@ +<?php + +declare(strict_types=1); + +namespace Distantmagic\Resonance; + +readonly class SwooleChannelIteratorError +{ + public function __construct( + public bool $isTimeout, + ) {} +} diff --git a/src/WebSocketJsonRPCResponder/LlamaCppSubjectActionPromptResponder.php b/src/WebSocketJsonRPCResponder/LlamaCppSubjectActionPromptResponder.php index fcb48713e587d9b13386091f2aa0108b295f84e8..862c3d4b4bb21b7c574998c8cb2c441e3ee61bdf 100644 --- a/src/WebSocketJsonRPCResponder/LlamaCppSubjectActionPromptResponder.php +++ b/src/WebSocketJsonRPCResponder/LlamaCppSubjectActionPromptResponder.php @@ -144,8 +144,12 @@ abstract readonly class LlamaCppSubjectActionPromptResponder extends WebSocketJs ; foreach ($response as $responseChunk) { - if ($responseChunk->isFailed) { - yield new ObservableTaskStatusUpdate(ObservableTaskStatus::Failed, null); + if ($responseChunk->isFailed || $responseChunk->isTimeout) { + if ($responseChunk->isTimeout) { + yield new ObservableTaskStatusUpdate(ObservableTaskStatus::TimedOut, null); + } else { + yield new ObservableTaskStatusUpdate(ObservableTaskStatus::Failed, null); + } $this->onRequestFailure( webSocketAuthResolution: $webSocketAuthResolution, diff --git a/src/views/observable_tasks_dashboard.twig b/src/views/observable_tasks_dashboard.twig index 297fed2ac1588863831d3abc43a78a1cf2a8891e..0d85c4c34c5bd9899cd7e3517a67c68df49d4bc9 100644 --- a/src/views/observable_tasks_dashboard.twig +++ b/src/views/observable_tasks_dashboard.twig @@ -13,6 +13,7 @@ <th>status</th> <th>category</th> <th>name</th> + <th>last update</th> </tr> </thead> <tbody> @@ -22,6 +23,7 @@ <td>{{ observableTask.observableTaskStatusUpdate.status.value }}</td> <td>{{ observableTask.category }}</td> <td>{{ observableTask.name }}</td> + <td>{{ observableTask.modifiedAt|intl_format_date(request) }}</td> </tr> {% endfor %} </tbody>