diff --git a/src/HttpResponder/ObservableTasksDashboard.php b/src/HttpResponder/ObservableTasksDashboard.php index daab39c5471ac20d702085f8d5c23e908c45f920..1b91fc40ac341f8900c2ff981c2f93ae478e5c6c 100644 --- a/src/HttpResponder/ObservableTasksDashboard.php +++ b/src/HttpResponder/ObservableTasksDashboard.php @@ -5,15 +5,29 @@ declare(strict_types=1); namespace Distantmagic\Resonance\HttpResponder; use Distantmagic\Resonance\Attribute\Singleton; +use Distantmagic\Resonance\HttpInterceptableInterface; use Distantmagic\Resonance\HttpResponder; +use Distantmagic\Resonance\ObservableTaskTable; +use Distantmagic\Resonance\TwigTemplate; use Psr\Http\Message\ResponseInterface; use Psr\Http\Message\ServerRequestInterface; #[Singleton] readonly class ObservableTasksDashboard extends HttpResponder { - public function respond(ServerRequestInterface $request, ResponseInterface $response): ResponseInterface + public function __construct( + private ObservableTaskTable $observableTaskTable, + ) {} + + public function respond(ServerRequestInterface $request, ResponseInterface $response): HttpInterceptableInterface { - return $response->withBody($this->createStream('{"tasks": []}')); + return new TwigTemplate( + $request, + $response, + '@resonance/observable_tasks_dashboard.twig', + [ + 'observableTaskTable' => $this->observableTaskTable, + ] + ); } } diff --git a/src/ObservableTaskSlotStatusUpdate.php b/src/ObservableTaskSlotStatusUpdate.php index 9437b87de36908f21ce330b7f77cc7d5e7e673d2..66b4bdbea13c8bf50063f430fa1dc2febe033f54 100644 --- a/src/ObservableTaskSlotStatusUpdate.php +++ b/src/ObservableTaskSlotStatusUpdate.php @@ -4,10 +4,12 @@ declare(strict_types=1); namespace Distantmagic\Resonance; +use JsonSerializable; + /** * @template TData */ -readonly class ObservableTaskSlotStatusUpdate +readonly class ObservableTaskSlotStatusUpdate implements JsonSerializable { /** * @param ObservableTaskStatusUpdate<TData> $observableTaskStatusUpdate @@ -16,4 +18,12 @@ readonly class ObservableTaskSlotStatusUpdate public string $slotId, public ObservableTaskStatusUpdate $observableTaskStatusUpdate, ) {} + + public function jsonSerialize(): array + { + return [ + 'slot_id' => $this->slotId, + 'observable_task_status_update' => $this->observableTaskStatusUpdate, + ]; + } } diff --git a/src/ObservableTaskStatusUpdate.php b/src/ObservableTaskStatusUpdate.php index 8cd856fd8b0aeabd4f6538d371380c9d4a167acd..bfcbe8fe49550310798b0bd5e21ee60a799ae69d 100644 --- a/src/ObservableTaskStatusUpdate.php +++ b/src/ObservableTaskStatusUpdate.php @@ -4,10 +4,12 @@ declare(strict_types=1); namespace Distantmagic\Resonance; +use JsonSerializable; + /** * @template TData */ -readonly class ObservableTaskStatusUpdate +readonly class ObservableTaskStatusUpdate implements JsonSerializable { /** * @param TData $data @@ -16,4 +18,12 @@ readonly class ObservableTaskStatusUpdate public ObservableTaskStatus $status, public mixed $data ) {} + + public function jsonSerialize(): array + { + return [ + 'status' => $this->status->value, + 'data' => $this->data, + ]; + } } diff --git a/src/ObservableTaskTable.php b/src/ObservableTaskTable.php index 5d8d2251257d3cdca6b6a92242c67ea58b1fc9b0..fe2432d7186f78859b837eaf55a9e2311f866d8a 100644 --- a/src/ObservableTaskTable.php +++ b/src/ObservableTaskTable.php @@ -6,11 +6,16 @@ namespace Distantmagic\Resonance; use Distantmagic\Resonance\Attribute\Singleton; use Ds\Set; +use Generator; +use IteratorAggregate; use Swoole\Coroutine\Channel; use Swoole\Table; +/** + * @template-implements IteratorAggregate<non-empty-string,?ObservableTaskStatusUpdate> + */ #[Singleton] -readonly class ObservableTaskTable +readonly class ObservableTaskTable implements IteratorAggregate { /** * @var Set<Channel> @@ -41,6 +46,20 @@ readonly class ObservableTaskTable $this->table->destroy(); } + /** + * @return Generator<non-empty-string,?ObservableTaskStatusUpdate> + */ + public function getIterator(): Generator + { + /** + * @var non-empty-string $slotId + * @var mixed $row explicitly mixed for typechecks + */ + foreach ($this->table as $slotId => $row) { + yield $slotId => $this->unserializeTableRow($row); + } + } + /** * @param non-empty-string $taskId */ diff --git a/src/ObservableTaskTableSlotStatusUpdateIterator.php b/src/ObservableTaskTableSlotStatusUpdateIterator.php index af68dcceec5ef67d0faff60adc728d9db0aa5f21..29bc4102d1d1d38aea86d040bd36fc07a6df6f21 100644 --- a/src/ObservableTaskTableSlotStatusUpdateIterator.php +++ b/src/ObservableTaskTableSlotStatusUpdateIterator.php @@ -4,6 +4,7 @@ declare(strict_types=1); namespace Distantmagic\Resonance; +use Ds\Set; use Generator; use IteratorAggregate; use Swoole\Coroutine\Channel; @@ -13,10 +14,24 @@ use Swoole\Coroutine\Channel; */ readonly class ObservableTaskTableSlotStatusUpdateIterator implements IteratorAggregate { + /** + * @var Set<SwooleChannelIterator> + */ + private Set $swooleChannelIterators; + public function __construct( private ObservableTaskTable $observableTaskTable, private float $timeout = -1, - ) {} + ) { + $this->swooleChannelIterators = new Set(); + } + + public function close(): void + { + foreach ($this->swooleChannelIterators as $swooleChannelIterator) { + $swooleChannelIterator->close(); + } + } /** * @return Generator<ObservableTaskSlotStatusUpdate> @@ -28,12 +43,18 @@ readonly class ObservableTaskTableSlotStatusUpdateIterator implements IteratorAg $this->observableTaskTable->observableChannels->add($channel); try { + $swooleChannelIterator = new SwooleChannelIterator($channel, $this->timeout); + + $this->swooleChannelIterators->add($swooleChannelIterator); + /** * @var ObservableTaskSlotStatusUpdate $observableTaskSlotStatusUpdate */ - foreach (new SwooleChannelIterator($channel, $this->timeout) as $observableTaskSlotStatusUpdate) { + foreach ($swooleChannelIterator as $observableTaskSlotStatusUpdate) { yield $observableTaskSlotStatusUpdate; } + + $this->swooleChannelIterators->remove($swooleChannelIterator); } finally { $this->observableTaskTable->observableChannels->remove($channel); } diff --git a/src/SingletonProvider/TwigEnvironmentProvider.php b/src/SingletonProvider/TwigEnvironmentProvider.php index ac5d1af460ffadf8963d60be09c249112037a7ba..433c88ba124efa18d61701747a2ba81bbd949e0a 100644 --- a/src/SingletonProvider/TwigEnvironmentProvider.php +++ b/src/SingletonProvider/TwigEnvironmentProvider.php @@ -22,6 +22,7 @@ use Twig\Cache\FilesystemCache; use Twig\Environment as TwigEnvironment; use Twig\Error\Error; use Twig\Extension\ExtensionInterface; +use Twig\Loader\FilesystemLoader; /** * @template-extends SingletonProvider<TwigEnvironment> @@ -80,8 +81,15 @@ final readonly class TwigEnvironmentProvider extends SingletonProvider private function warmupCache(TwigEnvironment $twigEnvironment): void { - $viewsDirectory = DM_APP_ROOT.'/views'; + $this->warmupCacheDirectory($twigEnvironment, DM_APP_ROOT.'/views', FilesystemLoader::MAIN_NAMESPACE); + $this->warmupCacheDirectory($twigEnvironment, DM_RESONANCE_ROOT.'/views', 'resonance'); + } + private function warmupCacheDirectory( + TwigEnvironment $twigEnvironment, + string $viewsDirectory, + string $namespace, + ): void { if (!is_dir($viewsDirectory)) { return; } @@ -99,7 +107,11 @@ final readonly class TwigEnvironmentProvider extends SingletonProvider $relativePathname = $file->getRelativePathname(); try { - $twigEnvironment->load($relativePathname); + $twigEnvironment->load(sprintf( + '@%s/%s', + $namespace, + $relativePathname, + )); } catch (Error $error) { $this->logger->warning(sprintf( 'twig_cache_warmup_error("%s", %d): %s', diff --git a/src/WebSocketRPCResponder/LlamaCppSubjectActionPromptResponder.php b/src/WebSocketRPCResponder/LlamaCppSubjectActionPromptResponder.php index 1387ad96e6eca5bc92123cabfd316baf1c9cbf9b..7d996fd0fcbfb1043f91ee6e85f7915108ccb054 100644 --- a/src/WebSocketRPCResponder/LlamaCppSubjectActionPromptResponder.php +++ b/src/WebSocketRPCResponder/LlamaCppSubjectActionPromptResponder.php @@ -11,14 +11,17 @@ use Distantmagic\Resonance\LlamaCppCompletionRequest; use Distantmagic\Resonance\LlmPrompt\SubjectActionPrompt; use Distantmagic\Resonance\LlmPromptTemplate; use Distantmagic\Resonance\LlmPromptTemplate\ChainPrompt; +use Distantmagic\Resonance\ObservableTask; +use Distantmagic\Resonance\ObservableTaskStatus; +use Distantmagic\Resonance\ObservableTaskStatusUpdate; +use Distantmagic\Resonance\ObservableTaskTable; use Distantmagic\Resonance\PromptSubjectResponderAggregate; -use Distantmagic\Resonance\RPCNotification; use Distantmagic\Resonance\RPCRequest; use Distantmagic\Resonance\WebSocketAuthResolution; use Distantmagic\Resonance\WebSocketConnection; use Distantmagic\Resonance\WebSocketRPCResponder; +use Generator; use Psr\Log\LoggerInterface; -use RuntimeException; use WeakMap; /** @@ -51,6 +54,7 @@ abstract readonly class LlamaCppSubjectActionPromptResponder extends WebSocketRP public function __construct( private LlamaCppClient $llamaCppClient, private LoggerInterface $logger, + private ObservableTaskTable $observableTaskTable, private PromptSubjectResponderAggregate $promptSubjectResponderAggregate, private SubjectActionGrammar $subjectActionGrammar, private SubjectActionPrompt $subjectActionPrompt, @@ -70,15 +74,39 @@ abstract readonly class LlamaCppSubjectActionPromptResponder extends WebSocketRP } } - public function onNotification( + public function onRequest( WebSocketAuthResolution $webSocketAuthResolution, WebSocketConnection $webSocketConnection, - RPCNotification $rpcNotification, - ): never { - throw new RuntimeException('Unexpected notification'); + RPCRequest $rpcRequest, + ): void { + $this->observableTaskTable->observe(new ObservableTask( + /** + * @return Generator<ObservableTaskStatusUpdate> + */ + function () use ( + $webSocketAuthResolution, + $webSocketConnection, + $rpcRequest, + ): Generator { + yield new ObservableTaskStatusUpdate(ObservableTaskStatus::Running, null); + + try { + $this->onObservableRequest( + $webSocketAuthResolution, + $webSocketConnection, + $rpcRequest, + ); + } finally { + yield new ObservableTaskStatusUpdate(ObservableTaskStatus::Finished, null); + } + } + )); } - public function onRequest( + /** + * @param RPCRequest<TPayload> $rpcRequest + */ + private function onObservableRequest( WebSocketAuthResolution $webSocketAuthResolution, WebSocketConnection $webSocketConnection, RPCRequest $rpcRequest, @@ -95,8 +123,7 @@ abstract readonly class LlamaCppSubjectActionPromptResponder extends WebSocketRP $this->runningCompletions->offsetSet($webSocketConnection, $completion); - $response = $this - ->promptSubjectResponderAggregate + $response = $this->promptSubjectResponderAggregate ->createResponseFromTokens( authenticatedUser: $webSocketAuthResolution->authenticatedUser, completion: $completion, diff --git a/src/WebSocketRPCResponder/ObservableTasksTableUpdateResponder.php b/src/WebSocketRPCResponder/ObservableTasksTableUpdateResponder.php index 40b4ec66f55c3f3492ce3a932a87efd81f516596..a51d99a9b7b4cd45a4b1d9dc3f7e088075a6390d 100644 --- a/src/WebSocketRPCResponder/ObservableTasksTableUpdateResponder.php +++ b/src/WebSocketRPCResponder/ObservableTasksTableUpdateResponder.php @@ -6,14 +6,14 @@ namespace Distantmagic\Resonance\WebSocketRPCResponder; use Distantmagic\Resonance\Constraint; use Distantmagic\Resonance\Constraint\ObjectConstraint; -use Distantmagic\Resonance\LlamaCppCompletionIterator; -use Distantmagic\Resonance\RPCNotification; +use Distantmagic\Resonance\ObservableTaskTable; +use Distantmagic\Resonance\ObservableTaskTableSlotStatusUpdateIterator; use Distantmagic\Resonance\RPCRequest; +use Distantmagic\Resonance\RPCResponse; use Distantmagic\Resonance\WebSocketAuthResolution; use Distantmagic\Resonance\WebSocketConnection; use Distantmagic\Resonance\WebSocketRPCResponder; use Psr\Log\LoggerInterface; -use RuntimeException; use WeakMap; /** @@ -24,15 +24,16 @@ use WeakMap; readonly class ObservableTasksTableUpdateResponder extends WebSocketRPCResponder { /** - * @var WeakMap<WebSocketConnection,LlamaCppCompletionIterator> + * @var WeakMap<WebSocketConnection,ObservableTaskTableSlotStatusUpdateIterator> */ private WeakMap $runningCompletions; public function __construct( private LoggerInterface $logger, + private ObservableTaskTable $observableTaskTable, ) { /** - * @var WeakMap<WebSocketConnection,LlamaCppCompletionIterator> + * @var WeakMap<WebSocketConnection,ObservableTaskTableSlotStatusUpdateIterator> */ $this->runningCompletions = new WeakMap(); } @@ -47,21 +48,28 @@ readonly class ObservableTasksTableUpdateResponder extends WebSocketRPCResponder WebSocketConnection $webSocketConnection, ): void { if ($this->runningCompletions->offsetExists($webSocketConnection)) { - $this->runningCompletions->offsetGet($webSocketConnection)->stop(); + $this->runningCompletions->offsetGet($webSocketConnection)->close(); } } - public function onNotification( - WebSocketAuthResolution $webSocketAuthResolution, - WebSocketConnection $webSocketConnection, - RPCNotification $rpcNotification, - ): never { - throw new RuntimeException('Unexpected notification'); - } - public function onRequest( WebSocketAuthResolution $webSocketAuthResolution, WebSocketConnection $webSocketConnection, RPCRequest $rpcRequest, - ): void {} + ): void { + $observableTaskUpdatesIterator = new ObservableTaskTableSlotStatusUpdateIterator($this->observableTaskTable); + + $this->runningCompletions->offsetSet($webSocketConnection, $observableTaskUpdatesIterator); + + foreach ($observableTaskUpdatesIterator as $observableTaskSlotStatusUpdate) { + if (!$webSocketConnection->status->isOpen()) { + break; + } + + $webSocketConnection->push(new RPCResponse( + rpcRequest: $rpcRequest, + content: $observableTaskSlotStatusUpdate, + )); + } + } }