diff --git a/config.ini.example b/config.ini.example index 25c10e8d87ecf6379c13386095c4e5027814ed43..6b294304abb5b431261937a9ef908868aa1b007f 100644 --- a/config.ini.example +++ b/config.ini.example @@ -44,6 +44,10 @@ session_key_authorization_request = oauth2.authorization_request session_key_pkce = oauth2.pkce session_key_state = oauth2.state +[observable_task] +max_tasks = 10000 +serialized_status_size = 32768 + [openapi] description = description title = title diff --git a/src/HttpResponder/ObservableTasksDashboard.php b/src/HttpResponder/ObservableTasksDashboard.php new file mode 100644 index 0000000000000000000000000000000000000000..daab39c5471ac20d702085f8d5c23e908c45f920 --- /dev/null +++ b/src/HttpResponder/ObservableTasksDashboard.php @@ -0,0 +1,19 @@ +<?php + +declare(strict_types=1); + +namespace Distantmagic\Resonance\HttpResponder; + +use Distantmagic\Resonance\Attribute\Singleton; +use Distantmagic\Resonance\HttpResponder; +use Psr\Http\Message\ResponseInterface; +use Psr\Http\Message\ServerRequestInterface; + +#[Singleton] +readonly class ObservableTasksDashboard extends HttpResponder +{ + public function respond(ServerRequestInterface $request, ResponseInterface $response): ResponseInterface + { + return $response->withBody($this->createStream('{"tasks": []}')); + } +} diff --git a/src/ObservableTaskSlotStatusUpdate.php b/src/ObservableTaskSlotStatusUpdate.php new file mode 100644 index 0000000000000000000000000000000000000000..9437b87de36908f21ce330b7f77cc7d5e7e673d2 --- /dev/null +++ b/src/ObservableTaskSlotStatusUpdate.php @@ -0,0 +1,19 @@ +<?php + +declare(strict_types=1); + +namespace Distantmagic\Resonance; + +/** + * @template TData + */ +readonly class ObservableTaskSlotStatusUpdate +{ + /** + * @param ObservableTaskStatusUpdate<TData> $observableTaskStatusUpdate + */ + public function __construct( + public string $slotId, + public ObservableTaskStatusUpdate $observableTaskStatusUpdate, + ) {} +} diff --git a/src/ObservableTaskTable.php b/src/ObservableTaskTable.php index 13cd1f86f46d5a62b2b5e40fcae9ffac9c78c1eb..5d8d2251257d3cdca6b6a92242c67ea58b1fc9b0 100644 --- a/src/ObservableTaskTable.php +++ b/src/ObservableTaskTable.php @@ -5,12 +5,18 @@ declare(strict_types=1); namespace Distantmagic\Resonance; use Distantmagic\Resonance\Attribute\Singleton; -use Generator; +use Ds\Set; +use Swoole\Coroutine\Channel; use Swoole\Table; #[Singleton] readonly class ObservableTaskTable { + /** + * @var Set<Channel> + */ + public Set $observableChannels; + private SwooleTableAvailableRowsPool $availableRowsPool; private string $serializedPendingStatus; private Table $table; @@ -20,7 +26,7 @@ readonly class ObservableTaskTable private SerializerInterface $serializer, ) { $this->availableRowsPool = new SwooleTableAvailableRowsPool($observableTaskConfiguration->maxTasks); - + $this->observableChannels = new Set(); $this->serializedPendingStatus = $serializer->serialize( new ObservableTaskStatusUpdate(ObservableTaskStatus::Pending, null) ); @@ -43,24 +49,6 @@ readonly class ObservableTaskTable return $this->unserializeTableRow($this->table->get($taskId)); } - /** - * @return Generator<non-empty-string,ObservableTaskStatusUpdate> - */ - public function getStatuses(): Generator - { - /** - * @var non-empty-string $taskId - * @var mixed $row explicitly mixed for typechecks - */ - foreach ($this->table as $taskId => $row) { - $status = $this->unserializeTableRow($row); - - if ($status instanceof ObservableTaskStatusUpdate) { - yield $taskId => $status; - } - } - } - /** * @return non-empty-string */ @@ -79,6 +67,14 @@ readonly class ObservableTaskTable 'status' => $this->serializer->serialize($statusUpdate), ]); + if (!$this->observableChannels->isEmpty()) { + $slotStatusUpdate = new ObservableTaskSlotStatusUpdate($slotId, $statusUpdate); + + foreach ($this->observableChannels as $observableChannel) { + $observableChannel->push($slotStatusUpdate); + } + } + if (ObservableTaskStatus::Running !== $statusUpdate->status) { break; } diff --git a/src/ObservableTaskTableSlotStatusUpdateIterator.php b/src/ObservableTaskTableSlotStatusUpdateIterator.php new file mode 100644 index 0000000000000000000000000000000000000000..af68dcceec5ef67d0faff60adc728d9db0aa5f21 --- /dev/null +++ b/src/ObservableTaskTableSlotStatusUpdateIterator.php @@ -0,0 +1,43 @@ +<?php + +declare(strict_types=1); + +namespace Distantmagic\Resonance; + +use Generator; +use IteratorAggregate; +use Swoole\Coroutine\Channel; + +/** + * @template-implements IteratorAggregate<ObservableTaskSlotStatusUpdate> + */ +readonly class ObservableTaskTableSlotStatusUpdateIterator implements IteratorAggregate +{ + public function __construct( + private ObservableTaskTable $observableTaskTable, + private float $timeout = -1, + ) {} + + /** + * @return Generator<ObservableTaskSlotStatusUpdate> + */ + public function getIterator(): Generator + { + $channel = new Channel(1); + + $this->observableTaskTable->observableChannels->add($channel); + + try { + /** + * @var ObservableTaskSlotStatusUpdate $observableTaskSlotStatusUpdate + */ + foreach (new SwooleChannelIterator($channel, $this->timeout) as $observableTaskSlotStatusUpdate) { + yield $observableTaskSlotStatusUpdate; + } + } finally { + $this->observableTaskTable->observableChannels->remove($channel); + } + + $channel->close(); + } +} diff --git a/src/ObservableTaskTableSlotStatusUpdateIteratorTest.php b/src/ObservableTaskTableSlotStatusUpdateIteratorTest.php new file mode 100644 index 0000000000000000000000000000000000000000..cb50155e7df51641581cdeb2019e3fe381a04c8c --- /dev/null +++ b/src/ObservableTaskTableSlotStatusUpdateIteratorTest.php @@ -0,0 +1,87 @@ +<?php + +declare(strict_types=1); + +namespace Distantmagic\Resonance; + +use Distantmagic\Resonance\Serializer\Vanilla; +use PHPUnit\Framework\Attributes\CoversClass; +use PHPUnit\Framework\TestCase; +use Swoole\Coroutine\WaitGroup; +use Swoole\Event; + +/** + * @internal + */ +#[CoversClass(ObservableTaskTableSlotStatusUpdateIterator::class)] +final class ObservableTaskTableSlotStatusUpdateIteratorTest extends TestCase +{ + private ?ObservableTaskConfiguration $observableTaskConfiguration = null; + private ?ObservableTaskTable $observableTaskTable = null; + + protected function setUp(): void + { + $this->observableTaskConfiguration = new ObservableTaskConfiguration( + maxTasks: 4, + serializedStatusSize: 32768, + ); + + $this->observableTaskTable = new ObservableTaskTable( + observableTaskConfiguration: $this->observableTaskConfiguration, + serializer: new Vanilla(), + ); + } + + protected function tearDown(): void + { + Event::wait(); + } + + public function test_channel_is_observed(): void + { + SwooleCoroutineHelper::mustRun(function () { + $wg = new WaitGroup(); + + $observableTask = new ObservableTask(static function () { + yield new ObservableTaskStatusUpdate( + ObservableTaskStatus::Running, + 'test1', + ); + + yield new ObservableTaskStatusUpdate( + ObservableTaskStatus::Finished, + 'test2', + ); + }); + + $wg->add(); + + SwooleCoroutineHelper::mustGo(function () use ($wg) { + self::assertNotNull($this->observableTaskTable); + + try { + $iterator = new ObservableTaskTableSlotStatusUpdateIterator($this->observableTaskTable); + + foreach ($iterator as $statusUpdate) { + self::assertInstanceOf(ObservableTaskSlotStatusUpdate::class, $statusUpdate); + self::assertEquals('0', $statusUpdate->slotId); + + if (ObservableTaskStatus::Finished === $statusUpdate->observableTaskStatusUpdate->status) { + self::assertEquals('test2', $statusUpdate->observableTaskStatusUpdate->data); + + break; + } + + self::assertEquals('test1', $statusUpdate->observableTaskStatusUpdate->data); + } + } finally { + $wg->done(); + } + }); + + $this->observableTaskTable?->observe($observableTask); + + $wg->wait(); + }); + } +} diff --git a/src/ObservableTaskTableTest.php b/src/ObservableTaskTableTest.php index 13ce0cb1c94ac9c1e0404d0c2e1110ebab97c300..090ac1f2b42ca9b34648332b1529c67f738be139 100644 --- a/src/ObservableTaskTableTest.php +++ b/src/ObservableTaskTableTest.php @@ -7,6 +7,8 @@ namespace Distantmagic\Resonance; use Distantmagic\Resonance\Serializer\Vanilla; use PHPUnit\Framework\Attributes\CoversClass; use PHPUnit\Framework\TestCase; +use Swoole\Coroutine\Channel; +use Swoole\Coroutine\WaitGroup; use Swoole\Event; /** @@ -15,23 +17,75 @@ use Swoole\Event; #[CoversClass(ObservableTaskTable::class)] final class ObservableTaskTableTest extends TestCase { - protected function tearDown(): void - { - Event::wait(); - } + private ?ObservableTaskConfiguration $observableTaskConfiguration = null; + private ?ObservableTaskTable $observableTaskTable = null; - public function test_address_is_trusted(): void + protected function setUp(): void { - $observableTaskConfiguration = new ObservableTaskConfiguration( + $this->observableTaskConfiguration = new ObservableTaskConfiguration( maxTasks: 4, serializedStatusSize: 32768, ); - $observableTaskTable = new ObservableTaskTable( - observableTaskConfiguration: $observableTaskConfiguration, + $this->observableTaskTable = new ObservableTaskTable( + observableTaskConfiguration: $this->observableTaskConfiguration, serializer: new Vanilla(), ); + } + + protected function tearDown(): void + { + Event::wait(); + } + + public function test_channel_is_observed(): void + { + SwooleCoroutineHelper::mustRun(function () { + $channel = new Channel(); + $wg = new WaitGroup(); + + $this->observableTaskTable?->observableChannels->add($channel); + $observableTask = new ObservableTask(static function () { + yield new ObservableTaskStatusUpdate( + ObservableTaskStatus::Running, + 'test1', + ); + + yield new ObservableTaskStatusUpdate( + ObservableTaskStatus::Finished, + 'test2', + ); + }); + + $wg->add(); + + SwooleCoroutineHelper::mustGo(static function () use ($channel, $wg) { + try { + $status1 = $channel->pop(); + + self::assertInstanceOf(ObservableTaskSlotStatusUpdate::class, $status1); + self::assertSame(ObservableTaskStatus::Running, $status1->observableTaskStatusUpdate->status); + + $status2 = $channel->pop(); + + self::assertInstanceOf(ObservableTaskSlotStatusUpdate::class, $status2); + self::assertSame(ObservableTaskStatus::Finished, $status2->observableTaskStatusUpdate->status); + } finally { + $wg->done(); + } + }); + + $this->observableTaskTable?->observe($observableTask); + + $wg->wait(); + + $this->observableTaskTable?->observableChannels->remove($channel); + }); + } + + public function test_task_is_observed(): void + { $observableTask = new ObservableTask(static function () { yield new ObservableTaskStatusUpdate( ObservableTaskStatus::Running, @@ -39,13 +93,13 @@ final class ObservableTaskTableTest extends TestCase ); }); - self::assertNull($observableTaskTable->getStatus('0')); + self::assertNull($this->observableTaskTable?->getStatus('0')); - $slotId = $observableTaskTable->observe($observableTask); + $slotId = $this->observableTaskTable?->observe($observableTask); self::assertSame('0', $slotId); - $status = $observableTaskTable->getStatus($slotId); + $status = $this->observableTaskTable?->getStatus($slotId); self::assertInstanceOf(ObservableTaskStatusUpdate::class, $status); self::assertSame(ObservableTaskStatus::Running, $status->status); diff --git a/src/SwooleTableAvailableRowsPoolTest.php b/src/SwooleTableAvailableRowsPoolTest.php index 2b14f4d02b849f825fb470be9f9c434a6d36d792..603814c227ccf5041c95b4e0890c759bd569e328 100644 --- a/src/SwooleTableAvailableRowsPoolTest.php +++ b/src/SwooleTableAvailableRowsPoolTest.php @@ -13,7 +13,7 @@ use PHPUnit\Framework\TestCase; #[CoversClass(SwooleTableAvailableRowsPool::class)] final class SwooleTableAvailableRowsPoolTest extends TestCase { - public function test_address_is_trusted(): void + public function test_rows_pool_is_maintained(): void { $availableRowsPool = new SwooleTableAvailableRowsPool(4); diff --git a/src/WebSocketRPCResponder/ObservableTasksTableUpdateResponder.php b/src/WebSocketRPCResponder/ObservableTasksTableUpdateResponder.php new file mode 100644 index 0000000000000000000000000000000000000000..40b4ec66f55c3f3492ce3a932a87efd81f516596 --- /dev/null +++ b/src/WebSocketRPCResponder/ObservableTasksTableUpdateResponder.php @@ -0,0 +1,67 @@ +<?php + +declare(strict_types=1); + +namespace Distantmagic\Resonance\WebSocketRPCResponder; + +use Distantmagic\Resonance\Constraint; +use Distantmagic\Resonance\Constraint\ObjectConstraint; +use Distantmagic\Resonance\LlamaCppCompletionIterator; +use Distantmagic\Resonance\RPCNotification; +use Distantmagic\Resonance\RPCRequest; +use Distantmagic\Resonance\WebSocketAuthResolution; +use Distantmagic\Resonance\WebSocketConnection; +use Distantmagic\Resonance\WebSocketRPCResponder; +use Psr\Log\LoggerInterface; +use RuntimeException; +use WeakMap; + +/** + * @template TPayload + * + * @template-extends WebSocketRPCResponder<TPayload> + */ +readonly class ObservableTasksTableUpdateResponder extends WebSocketRPCResponder +{ + /** + * @var WeakMap<WebSocketConnection,LlamaCppCompletionIterator> + */ + private WeakMap $runningCompletions; + + public function __construct( + private LoggerInterface $logger, + ) { + /** + * @var WeakMap<WebSocketConnection,LlamaCppCompletionIterator> + */ + $this->runningCompletions = new WeakMap(); + } + + public function getConstraint(): Constraint + { + return new ObjectConstraint(); + } + + public function onBeforeMessage( + WebSocketAuthResolution $webSocketAuthResolution, + WebSocketConnection $webSocketConnection, + ): void { + if ($this->runningCompletions->offsetExists($webSocketConnection)) { + $this->runningCompletions->offsetGet($webSocketConnection)->stop(); + } + } + + public function onNotification( + WebSocketAuthResolution $webSocketAuthResolution, + WebSocketConnection $webSocketConnection, + RPCNotification $rpcNotification, + ): never { + throw new RuntimeException('Unexpected notification'); + } + + public function onRequest( + WebSocketAuthResolution $webSocketAuthResolution, + WebSocketConnection $webSocketConnection, + RPCRequest $rpcRequest, + ): void {} +} diff --git a/src/WebSocketServerConnectionTable.php b/src/WebSocketServerConnectionTable.php index 268b690d411e812b74ae48bb58dda07b8cd200aa..c9bb0ecf7ee075db20aac887286c2b9a21cde5ea 100644 --- a/src/WebSocketServerConnectionTable.php +++ b/src/WebSocketServerConnectionTable.php @@ -22,7 +22,7 @@ readonly class WebSocketServerConnectionTable implements IteratorAggregate public function __construct(WebSocketConfiguration $webSocketConfiguration) { - $this->table = new Table($webSocketConfiguration->maxConnections); + $this->table = new Table(2 * $webSocketConfiguration->maxConnections); $this->table->column('worker_id', Table::TYPE_INT); $this->table->create(); }