Skip to content
Snippets Groups Projects
Commit 02c55fe1 authored by Mateusz Charytoniuk's avatar Mateusz Charytoniuk
Browse files

feat: observable tasks iterator

parent fedca5a7
No related branches found
No related tags found
No related merge requests found
...@@ -5,15 +5,29 @@ declare(strict_types=1); ...@@ -5,15 +5,29 @@ declare(strict_types=1);
namespace Distantmagic\Resonance\HttpResponder; namespace Distantmagic\Resonance\HttpResponder;
use Distantmagic\Resonance\Attribute\Singleton; use Distantmagic\Resonance\Attribute\Singleton;
use Distantmagic\Resonance\HttpInterceptableInterface;
use Distantmagic\Resonance\HttpResponder; use Distantmagic\Resonance\HttpResponder;
use Distantmagic\Resonance\ObservableTaskTable;
use Distantmagic\Resonance\TwigTemplate;
use Psr\Http\Message\ResponseInterface; use Psr\Http\Message\ResponseInterface;
use Psr\Http\Message\ServerRequestInterface; use Psr\Http\Message\ServerRequestInterface;
#[Singleton] #[Singleton]
readonly class ObservableTasksDashboard extends HttpResponder readonly class ObservableTasksDashboard extends HttpResponder
{ {
public function respond(ServerRequestInterface $request, ResponseInterface $response): ResponseInterface public function __construct(
private ObservableTaskTable $observableTaskTable,
) {}
public function respond(ServerRequestInterface $request, ResponseInterface $response): HttpInterceptableInterface
{ {
return $response->withBody($this->createStream('{"tasks": []}')); return new TwigTemplate(
$request,
$response,
'@resonance/observable_tasks_dashboard.twig',
[
'observableTaskTable' => $this->observableTaskTable,
]
);
} }
} }
...@@ -4,10 +4,12 @@ declare(strict_types=1); ...@@ -4,10 +4,12 @@ declare(strict_types=1);
namespace Distantmagic\Resonance; namespace Distantmagic\Resonance;
use JsonSerializable;
/** /**
* @template TData * @template TData
*/ */
readonly class ObservableTaskSlotStatusUpdate readonly class ObservableTaskSlotStatusUpdate implements JsonSerializable
{ {
/** /**
* @param ObservableTaskStatusUpdate<TData> $observableTaskStatusUpdate * @param ObservableTaskStatusUpdate<TData> $observableTaskStatusUpdate
...@@ -16,4 +18,12 @@ readonly class ObservableTaskSlotStatusUpdate ...@@ -16,4 +18,12 @@ readonly class ObservableTaskSlotStatusUpdate
public string $slotId, public string $slotId,
public ObservableTaskStatusUpdate $observableTaskStatusUpdate, public ObservableTaskStatusUpdate $observableTaskStatusUpdate,
) {} ) {}
public function jsonSerialize(): array
{
return [
'slot_id' => $this->slotId,
'observable_task_status_update' => $this->observableTaskStatusUpdate,
];
}
} }
...@@ -4,10 +4,12 @@ declare(strict_types=1); ...@@ -4,10 +4,12 @@ declare(strict_types=1);
namespace Distantmagic\Resonance; namespace Distantmagic\Resonance;
use JsonSerializable;
/** /**
* @template TData * @template TData
*/ */
readonly class ObservableTaskStatusUpdate readonly class ObservableTaskStatusUpdate implements JsonSerializable
{ {
/** /**
* @param TData $data * @param TData $data
...@@ -16,4 +18,12 @@ readonly class ObservableTaskStatusUpdate ...@@ -16,4 +18,12 @@ readonly class ObservableTaskStatusUpdate
public ObservableTaskStatus $status, public ObservableTaskStatus $status,
public mixed $data public mixed $data
) {} ) {}
public function jsonSerialize(): array
{
return [
'status' => $this->status->value,
'data' => $this->data,
];
}
} }
...@@ -6,11 +6,16 @@ namespace Distantmagic\Resonance; ...@@ -6,11 +6,16 @@ namespace Distantmagic\Resonance;
use Distantmagic\Resonance\Attribute\Singleton; use Distantmagic\Resonance\Attribute\Singleton;
use Ds\Set; use Ds\Set;
use Generator;
use IteratorAggregate;
use Swoole\Coroutine\Channel; use Swoole\Coroutine\Channel;
use Swoole\Table; use Swoole\Table;
/**
* @template-implements IteratorAggregate<non-empty-string,?ObservableTaskStatusUpdate>
*/
#[Singleton] #[Singleton]
readonly class ObservableTaskTable readonly class ObservableTaskTable implements IteratorAggregate
{ {
/** /**
* @var Set<Channel> * @var Set<Channel>
...@@ -41,6 +46,20 @@ readonly class ObservableTaskTable ...@@ -41,6 +46,20 @@ readonly class ObservableTaskTable
$this->table->destroy(); $this->table->destroy();
} }
/**
* @return Generator<non-empty-string,?ObservableTaskStatusUpdate>
*/
public function getIterator(): Generator
{
/**
* @var non-empty-string $slotId
* @var mixed $row explicitly mixed for typechecks
*/
foreach ($this->table as $slotId => $row) {
yield $slotId => $this->unserializeTableRow($row);
}
}
/** /**
* @param non-empty-string $taskId * @param non-empty-string $taskId
*/ */
......
...@@ -4,6 +4,7 @@ declare(strict_types=1); ...@@ -4,6 +4,7 @@ declare(strict_types=1);
namespace Distantmagic\Resonance; namespace Distantmagic\Resonance;
use Ds\Set;
use Generator; use Generator;
use IteratorAggregate; use IteratorAggregate;
use Swoole\Coroutine\Channel; use Swoole\Coroutine\Channel;
...@@ -13,10 +14,24 @@ use Swoole\Coroutine\Channel; ...@@ -13,10 +14,24 @@ use Swoole\Coroutine\Channel;
*/ */
readonly class ObservableTaskTableSlotStatusUpdateIterator implements IteratorAggregate readonly class ObservableTaskTableSlotStatusUpdateIterator implements IteratorAggregate
{ {
/**
* @var Set<SwooleChannelIterator>
*/
private Set $swooleChannelIterators;
public function __construct( public function __construct(
private ObservableTaskTable $observableTaskTable, private ObservableTaskTable $observableTaskTable,
private float $timeout = -1, private float $timeout = -1,
) {} ) {
$this->swooleChannelIterators = new Set();
}
public function close(): void
{
foreach ($this->swooleChannelIterators as $swooleChannelIterator) {
$swooleChannelIterator->close();
}
}
/** /**
* @return Generator<ObservableTaskSlotStatusUpdate> * @return Generator<ObservableTaskSlotStatusUpdate>
...@@ -28,12 +43,18 @@ readonly class ObservableTaskTableSlotStatusUpdateIterator implements IteratorAg ...@@ -28,12 +43,18 @@ readonly class ObservableTaskTableSlotStatusUpdateIterator implements IteratorAg
$this->observableTaskTable->observableChannels->add($channel); $this->observableTaskTable->observableChannels->add($channel);
try { try {
$swooleChannelIterator = new SwooleChannelIterator($channel, $this->timeout);
$this->swooleChannelIterators->add($swooleChannelIterator);
/** /**
* @var ObservableTaskSlotStatusUpdate $observableTaskSlotStatusUpdate * @var ObservableTaskSlotStatusUpdate $observableTaskSlotStatusUpdate
*/ */
foreach (new SwooleChannelIterator($channel, $this->timeout) as $observableTaskSlotStatusUpdate) { foreach ($swooleChannelIterator as $observableTaskSlotStatusUpdate) {
yield $observableTaskSlotStatusUpdate; yield $observableTaskSlotStatusUpdate;
} }
$this->swooleChannelIterators->remove($swooleChannelIterator);
} finally { } finally {
$this->observableTaskTable->observableChannels->remove($channel); $this->observableTaskTable->observableChannels->remove($channel);
} }
......
...@@ -22,6 +22,7 @@ use Twig\Cache\FilesystemCache; ...@@ -22,6 +22,7 @@ use Twig\Cache\FilesystemCache;
use Twig\Environment as TwigEnvironment; use Twig\Environment as TwigEnvironment;
use Twig\Error\Error; use Twig\Error\Error;
use Twig\Extension\ExtensionInterface; use Twig\Extension\ExtensionInterface;
use Twig\Loader\FilesystemLoader;
/** /**
* @template-extends SingletonProvider<TwigEnvironment> * @template-extends SingletonProvider<TwigEnvironment>
...@@ -80,8 +81,15 @@ final readonly class TwigEnvironmentProvider extends SingletonProvider ...@@ -80,8 +81,15 @@ final readonly class TwigEnvironmentProvider extends SingletonProvider
private function warmupCache(TwigEnvironment $twigEnvironment): void private function warmupCache(TwigEnvironment $twigEnvironment): void
{ {
$viewsDirectory = DM_APP_ROOT.'/views'; $this->warmupCacheDirectory($twigEnvironment, DM_APP_ROOT.'/views', FilesystemLoader::MAIN_NAMESPACE);
$this->warmupCacheDirectory($twigEnvironment, DM_RESONANCE_ROOT.'/views', 'resonance');
}
private function warmupCacheDirectory(
TwigEnvironment $twigEnvironment,
string $viewsDirectory,
string $namespace,
): void {
if (!is_dir($viewsDirectory)) { if (!is_dir($viewsDirectory)) {
return; return;
} }
...@@ -99,7 +107,11 @@ final readonly class TwigEnvironmentProvider extends SingletonProvider ...@@ -99,7 +107,11 @@ final readonly class TwigEnvironmentProvider extends SingletonProvider
$relativePathname = $file->getRelativePathname(); $relativePathname = $file->getRelativePathname();
try { try {
$twigEnvironment->load($relativePathname); $twigEnvironment->load(sprintf(
'@%s/%s',
$namespace,
$relativePathname,
));
} catch (Error $error) { } catch (Error $error) {
$this->logger->warning(sprintf( $this->logger->warning(sprintf(
'twig_cache_warmup_error("%s", %d): %s', 'twig_cache_warmup_error("%s", %d): %s',
......
...@@ -11,14 +11,17 @@ use Distantmagic\Resonance\LlamaCppCompletionRequest; ...@@ -11,14 +11,17 @@ use Distantmagic\Resonance\LlamaCppCompletionRequest;
use Distantmagic\Resonance\LlmPrompt\SubjectActionPrompt; use Distantmagic\Resonance\LlmPrompt\SubjectActionPrompt;
use Distantmagic\Resonance\LlmPromptTemplate; use Distantmagic\Resonance\LlmPromptTemplate;
use Distantmagic\Resonance\LlmPromptTemplate\ChainPrompt; use Distantmagic\Resonance\LlmPromptTemplate\ChainPrompt;
use Distantmagic\Resonance\ObservableTask;
use Distantmagic\Resonance\ObservableTaskStatus;
use Distantmagic\Resonance\ObservableTaskStatusUpdate;
use Distantmagic\Resonance\ObservableTaskTable;
use Distantmagic\Resonance\PromptSubjectResponderAggregate; use Distantmagic\Resonance\PromptSubjectResponderAggregate;
use Distantmagic\Resonance\RPCNotification;
use Distantmagic\Resonance\RPCRequest; use Distantmagic\Resonance\RPCRequest;
use Distantmagic\Resonance\WebSocketAuthResolution; use Distantmagic\Resonance\WebSocketAuthResolution;
use Distantmagic\Resonance\WebSocketConnection; use Distantmagic\Resonance\WebSocketConnection;
use Distantmagic\Resonance\WebSocketRPCResponder; use Distantmagic\Resonance\WebSocketRPCResponder;
use Generator;
use Psr\Log\LoggerInterface; use Psr\Log\LoggerInterface;
use RuntimeException;
use WeakMap; use WeakMap;
/** /**
...@@ -51,6 +54,7 @@ abstract readonly class LlamaCppSubjectActionPromptResponder extends WebSocketRP ...@@ -51,6 +54,7 @@ abstract readonly class LlamaCppSubjectActionPromptResponder extends WebSocketRP
public function __construct( public function __construct(
private LlamaCppClient $llamaCppClient, private LlamaCppClient $llamaCppClient,
private LoggerInterface $logger, private LoggerInterface $logger,
private ObservableTaskTable $observableTaskTable,
private PromptSubjectResponderAggregate $promptSubjectResponderAggregate, private PromptSubjectResponderAggregate $promptSubjectResponderAggregate,
private SubjectActionGrammar $subjectActionGrammar, private SubjectActionGrammar $subjectActionGrammar,
private SubjectActionPrompt $subjectActionPrompt, private SubjectActionPrompt $subjectActionPrompt,
...@@ -70,15 +74,39 @@ abstract readonly class LlamaCppSubjectActionPromptResponder extends WebSocketRP ...@@ -70,15 +74,39 @@ abstract readonly class LlamaCppSubjectActionPromptResponder extends WebSocketRP
} }
} }
public function onNotification( public function onRequest(
WebSocketAuthResolution $webSocketAuthResolution, WebSocketAuthResolution $webSocketAuthResolution,
WebSocketConnection $webSocketConnection, WebSocketConnection $webSocketConnection,
RPCNotification $rpcNotification, RPCRequest $rpcRequest,
): never { ): void {
throw new RuntimeException('Unexpected notification'); $this->observableTaskTable->observe(new ObservableTask(
/**
* @return Generator<ObservableTaskStatusUpdate>
*/
function () use (
$webSocketAuthResolution,
$webSocketConnection,
$rpcRequest,
): Generator {
yield new ObservableTaskStatusUpdate(ObservableTaskStatus::Running, null);
try {
$this->onObservableRequest(
$webSocketAuthResolution,
$webSocketConnection,
$rpcRequest,
);
} finally {
yield new ObservableTaskStatusUpdate(ObservableTaskStatus::Finished, null);
}
}
));
} }
public function onRequest( /**
* @param RPCRequest<TPayload> $rpcRequest
*/
private function onObservableRequest(
WebSocketAuthResolution $webSocketAuthResolution, WebSocketAuthResolution $webSocketAuthResolution,
WebSocketConnection $webSocketConnection, WebSocketConnection $webSocketConnection,
RPCRequest $rpcRequest, RPCRequest $rpcRequest,
...@@ -95,8 +123,7 @@ abstract readonly class LlamaCppSubjectActionPromptResponder extends WebSocketRP ...@@ -95,8 +123,7 @@ abstract readonly class LlamaCppSubjectActionPromptResponder extends WebSocketRP
$this->runningCompletions->offsetSet($webSocketConnection, $completion); $this->runningCompletions->offsetSet($webSocketConnection, $completion);
$response = $this $response = $this->promptSubjectResponderAggregate
->promptSubjectResponderAggregate
->createResponseFromTokens( ->createResponseFromTokens(
authenticatedUser: $webSocketAuthResolution->authenticatedUser, authenticatedUser: $webSocketAuthResolution->authenticatedUser,
completion: $completion, completion: $completion,
......
...@@ -6,14 +6,14 @@ namespace Distantmagic\Resonance\WebSocketRPCResponder; ...@@ -6,14 +6,14 @@ namespace Distantmagic\Resonance\WebSocketRPCResponder;
use Distantmagic\Resonance\Constraint; use Distantmagic\Resonance\Constraint;
use Distantmagic\Resonance\Constraint\ObjectConstraint; use Distantmagic\Resonance\Constraint\ObjectConstraint;
use Distantmagic\Resonance\LlamaCppCompletionIterator; use Distantmagic\Resonance\ObservableTaskTable;
use Distantmagic\Resonance\RPCNotification; use Distantmagic\Resonance\ObservableTaskTableSlotStatusUpdateIterator;
use Distantmagic\Resonance\RPCRequest; use Distantmagic\Resonance\RPCRequest;
use Distantmagic\Resonance\RPCResponse;
use Distantmagic\Resonance\WebSocketAuthResolution; use Distantmagic\Resonance\WebSocketAuthResolution;
use Distantmagic\Resonance\WebSocketConnection; use Distantmagic\Resonance\WebSocketConnection;
use Distantmagic\Resonance\WebSocketRPCResponder; use Distantmagic\Resonance\WebSocketRPCResponder;
use Psr\Log\LoggerInterface; use Psr\Log\LoggerInterface;
use RuntimeException;
use WeakMap; use WeakMap;
/** /**
...@@ -24,15 +24,16 @@ use WeakMap; ...@@ -24,15 +24,16 @@ use WeakMap;
readonly class ObservableTasksTableUpdateResponder extends WebSocketRPCResponder readonly class ObservableTasksTableUpdateResponder extends WebSocketRPCResponder
{ {
/** /**
* @var WeakMap<WebSocketConnection,LlamaCppCompletionIterator> * @var WeakMap<WebSocketConnection,ObservableTaskTableSlotStatusUpdateIterator>
*/ */
private WeakMap $runningCompletions; private WeakMap $runningCompletions;
public function __construct( public function __construct(
private LoggerInterface $logger, private LoggerInterface $logger,
private ObservableTaskTable $observableTaskTable,
) { ) {
/** /**
* @var WeakMap<WebSocketConnection,LlamaCppCompletionIterator> * @var WeakMap<WebSocketConnection,ObservableTaskTableSlotStatusUpdateIterator>
*/ */
$this->runningCompletions = new WeakMap(); $this->runningCompletions = new WeakMap();
} }
...@@ -47,21 +48,28 @@ readonly class ObservableTasksTableUpdateResponder extends WebSocketRPCResponder ...@@ -47,21 +48,28 @@ readonly class ObservableTasksTableUpdateResponder extends WebSocketRPCResponder
WebSocketConnection $webSocketConnection, WebSocketConnection $webSocketConnection,
): void { ): void {
if ($this->runningCompletions->offsetExists($webSocketConnection)) { if ($this->runningCompletions->offsetExists($webSocketConnection)) {
$this->runningCompletions->offsetGet($webSocketConnection)->stop(); $this->runningCompletions->offsetGet($webSocketConnection)->close();
} }
} }
public function onNotification(
WebSocketAuthResolution $webSocketAuthResolution,
WebSocketConnection $webSocketConnection,
RPCNotification $rpcNotification,
): never {
throw new RuntimeException('Unexpected notification');
}
public function onRequest( public function onRequest(
WebSocketAuthResolution $webSocketAuthResolution, WebSocketAuthResolution $webSocketAuthResolution,
WebSocketConnection $webSocketConnection, WebSocketConnection $webSocketConnection,
RPCRequest $rpcRequest, RPCRequest $rpcRequest,
): void {} ): void {
$observableTaskUpdatesIterator = new ObservableTaskTableSlotStatusUpdateIterator($this->observableTaskTable);
$this->runningCompletions->offsetSet($webSocketConnection, $observableTaskUpdatesIterator);
foreach ($observableTaskUpdatesIterator as $observableTaskSlotStatusUpdate) {
if (!$webSocketConnection->status->isOpen()) {
break;
}
$webSocketConnection->push(new RPCResponse(
rpcRequest: $rpcRequest,
content: $observableTaskSlotStatusUpdate,
));
}
}
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment