diff --git a/phpunit.xml b/phpunit.xml index cef418a3fddc3f185ef481cfc8e03f24e051d526..3fd52b7b5dd35b6557b7623e0e9dcedab5d824b7 100644 --- a/phpunit.xml +++ b/phpunit.xml @@ -2,10 +2,11 @@ <phpunit beStrictAboutCoverageMetadata="true" beStrictAboutChangesToGlobalState="true" - beStrictAboutTestsThatDoNotTestAnything="true" + beStrictAboutTestsThatDoNotTestAnything="false" bootstrap="phpunit_bootstrap.php" colors="true" displayDetailsOnTestsThatTriggerWarnings="true" + processIsolation="false" > <testsuites> <testsuite name="Unit"> @@ -17,4 +18,8 @@ <directory suffix=".php">src</directory> </include> </source> + <extensions> + <bootstrap class="Distantmagic\Resonance\PHPUnitSwooleCoroutineExtension"> + </bootstrap> + </extensions> </phpunit> diff --git a/src/ObservableTaskTable.php b/src/ObservableTaskTable.php index dd32771909c84ac5c8a5c031284db9a743fe27bd..d88e9b49b13b239b59c88c60a65504b37bbf8e1d 100644 --- a/src/ObservableTaskTable.php +++ b/src/ObservableTaskTable.php @@ -10,7 +10,6 @@ use Generator; use IteratorAggregate; use RuntimeException; use Swoole\Coroutine; -use Swoole\Coroutine\Channel; use Swoole\Table; /** @@ -20,9 +19,9 @@ use Swoole\Table; readonly class ObservableTaskTable implements IteratorAggregate { /** - * @var Set<Channel> + * @var Set<callable(ObservableTaskSlotStatusUpdate):bool> */ - public Set $observableChannels; + public Set $observers; private SwooleTableAvailableRowsPool $availableRowsPool; private string $serializedPendingStatus; @@ -33,7 +32,7 @@ readonly class ObservableTaskTable implements IteratorAggregate private SerializerInterface $serializer, ) { $this->availableRowsPool = new SwooleTableAvailableRowsPool($observableTaskConfiguration->maxTasks); - $this->observableChannels = new Set(); + $this->observers = new Set(); $this->serializedPendingStatus = $serializer->serialize( new ObservableTaskStatusUpdate(ObservableTaskStatus::Pending, null) ); @@ -87,17 +86,25 @@ readonly class ObservableTaskTable implements IteratorAggregate foreach ($observableTask as $statusUpdate) { if (!$this->table->set($slotId, [ - 'status' => $this->serializer->serialize($statusUpdate), - ]) + 'status' => $this->serializer->serialize($statusUpdate), + ]) ) { throw new RuntimeException('Unable to update a slot status.'); } - if (!$this->observableChannels->isEmpty()) { + if (!$this->observers->isEmpty()) { $slotStatusUpdate = new ObservableTaskSlotStatusUpdate($slotId, $statusUpdate); - foreach ($this->observableChannels as $observableChannel) { - $observableChannel->push($slotStatusUpdate); + foreach ($this->observers as $observer) { + if (!is_callable($observer)) { + throw new RuntimeException('Observer is not callable'); + } + + SwooleCoroutineHelper::mustGo(function () use ($observer, $slotStatusUpdate) { + if (false === $observer($slotStatusUpdate)) { + $this->observers->remove($observer); + } + }); } } diff --git a/src/ObservableTaskTableSlotStatusUpdateIterator.php b/src/ObservableTaskTableSlotStatusUpdateIterator.php deleted file mode 100644 index 29bc4102d1d1d38aea86d040bd36fc07a6df6f21..0000000000000000000000000000000000000000 --- a/src/ObservableTaskTableSlotStatusUpdateIterator.php +++ /dev/null @@ -1,64 +0,0 @@ -<?php - -declare(strict_types=1); - -namespace Distantmagic\Resonance; - -use Ds\Set; -use Generator; -use IteratorAggregate; -use Swoole\Coroutine\Channel; - -/** - * @template-implements IteratorAggregate<ObservableTaskSlotStatusUpdate> - */ -readonly class ObservableTaskTableSlotStatusUpdateIterator implements IteratorAggregate -{ - /** - * @var Set<SwooleChannelIterator> - */ - private Set $swooleChannelIterators; - - public function __construct( - private ObservableTaskTable $observableTaskTable, - private float $timeout = -1, - ) { - $this->swooleChannelIterators = new Set(); - } - - public function close(): void - { - foreach ($this->swooleChannelIterators as $swooleChannelIterator) { - $swooleChannelIterator->close(); - } - } - - /** - * @return Generator<ObservableTaskSlotStatusUpdate> - */ - public function getIterator(): Generator - { - $channel = new Channel(1); - - $this->observableTaskTable->observableChannels->add($channel); - - try { - $swooleChannelIterator = new SwooleChannelIterator($channel, $this->timeout); - - $this->swooleChannelIterators->add($swooleChannelIterator); - - /** - * @var ObservableTaskSlotStatusUpdate $observableTaskSlotStatusUpdate - */ - foreach ($swooleChannelIterator as $observableTaskSlotStatusUpdate) { - yield $observableTaskSlotStatusUpdate; - } - - $this->swooleChannelIterators->remove($swooleChannelIterator); - } finally { - $this->observableTaskTable->observableChannels->remove($channel); - } - - $channel->close(); - } -} diff --git a/src/ObservableTaskTableSlotStatusUpdateIteratorTest.php b/src/ObservableTaskTableSlotStatusUpdateIteratorTest.php deleted file mode 100644 index cbbabe07f3ff28c682fa7c76708759f0ad74feb6..0000000000000000000000000000000000000000 --- a/src/ObservableTaskTableSlotStatusUpdateIteratorTest.php +++ /dev/null @@ -1,74 +0,0 @@ -<?php - -declare(strict_types=1); - -namespace Distantmagic\Resonance; - -use Distantmagic\Resonance\Serializer\Vanilla; -use PHPUnit\Framework\Attributes\CoversClass; -use PHPUnit\Framework\TestCase; -use Swoole\Event; - -/** - * @internal - */ -#[CoversClass(ObservableTaskTableSlotStatusUpdateIterator::class)] -final class ObservableTaskTableSlotStatusUpdateIteratorTest extends TestCase -{ - private ?ObservableTaskConfiguration $observableTaskConfiguration = null; - private ?ObservableTaskTable $observableTaskTable = null; - - protected function setUp(): void - { - $this->observableTaskConfiguration = new ObservableTaskConfiguration( - maxTasks: 4, - serializedStatusSize: 32768, - ); - - $this->observableTaskTable = new ObservableTaskTable( - observableTaskConfiguration: $this->observableTaskConfiguration, - serializer: new Vanilla(), - ); - } - - protected function tearDown(): void - { - Event::wait(); - } - - public function test_channel_is_observed(): void - { - $observableTask = new ObservableTask(static function () { - yield new ObservableTaskStatusUpdate( - ObservableTaskStatus::Running, - 'test1', - ); - - yield new ObservableTaskStatusUpdate( - ObservableTaskStatus::Finished, - 'test2', - ); - }); - - SwooleCoroutineHelper::mustGo(function () { - self::assertNotNull($this->observableTaskTable); - - $iterator = new ObservableTaskTableSlotStatusUpdateIterator($this->observableTaskTable); - - foreach ($iterator as $statusUpdate) { - self::assertInstanceOf(ObservableTaskSlotStatusUpdate::class, $statusUpdate); - self::assertEquals('0', $statusUpdate->slotId); - - if (ObservableTaskStatus::Finished === $statusUpdate->observableTaskStatusUpdate->status) { - self::assertEquals('test2', $statusUpdate->observableTaskStatusUpdate->data); - - break; - } - - self::assertEquals('test1', $statusUpdate->observableTaskStatusUpdate->data); - } - }); - - $this->observableTaskTable?->observe($observableTask); - } -} diff --git a/src/ObservableTaskTableTest.php b/src/ObservableTaskTableTest.php index 118339c3a2644890447abb62591609f1d89d4c1e..f0810c557786f75b6c727b6079894b2a3d53908e 100644 --- a/src/ObservableTaskTableTest.php +++ b/src/ObservableTaskTableTest.php @@ -7,9 +7,6 @@ namespace Distantmagic\Resonance; use Distantmagic\Resonance\Serializer\Vanilla; use PHPUnit\Framework\Attributes\CoversClass; use PHPUnit\Framework\TestCase; -use Swoole\Coroutine; -use Swoole\Coroutine\Channel; -use Swoole\Coroutine\WaitGroup; use Swoole\Event; /** @@ -41,12 +38,13 @@ final class ObservableTaskTableTest extends TestCase public function test_channel_is_observed(): void { - $channel = new Channel(); - $wg = new WaitGroup(); + self::assertNotNull($this->observableTaskTable); - $this->observableTaskTable?->observableChannels->add($channel); + $this->observableTaskTable->observers->add(static function (ObservableTaskSlotStatusUpdate $status): bool { + return ObservableTaskStatus::Finished === $status->observableTaskStatusUpdate->status; + }); - $observableTask = new ObservableTask(static function () { + $this->observableTaskTable->observe(new ObservableTask(static function () { yield new ObservableTaskStatusUpdate( ObservableTaskStatus::Running, 'test1', @@ -56,49 +54,24 @@ final class ObservableTaskTableTest extends TestCase ObservableTaskStatus::Finished, 'test2', ); - }); - - $wg->add(); - - SwooleCoroutineHelper::mustGo(static function () use ($channel, $wg) { - Coroutine::defer(static function () use ($wg) { - $wg->done(); - }); - - $status1 = $channel->pop(); - - self::assertInstanceOf(ObservableTaskSlotStatusUpdate::class, $status1); - self::assertSame(ObservableTaskStatus::Running, $status1->observableTaskStatusUpdate->status); - - $status2 = $channel->pop(); - - self::assertInstanceOf(ObservableTaskSlotStatusUpdate::class, $status2); - self::assertSame(ObservableTaskStatus::Finished, $status2->observableTaskStatusUpdate->status); - }); - - $this->observableTaskTable?->observe($observableTask); - - $wg->wait(); - - $this->observableTaskTable?->observableChannels->remove($channel); + })); } public function test_task_is_observed(): void { - $observableTask = new ObservableTask(static function () { + self::assertNotNull($this->observableTaskTable); + self::assertNull($this->observableTaskTable->getStatus('0')); + + $slotId = $this->observableTaskTable->observe(new ObservableTask(static function () { yield new ObservableTaskStatusUpdate( ObservableTaskStatus::Running, 'test', ); - }); - - self::assertNull($this->observableTaskTable?->getStatus('0')); - - $slotId = $this->observableTaskTable?->observe($observableTask); + })); self::assertSame('0', $slotId); - $status = $this->observableTaskTable?->getStatus($slotId); + $status = $this->observableTaskTable->getStatus($slotId); self::assertInstanceOf(ObservableTaskStatusUpdate::class, $status); self::assertSame(ObservableTaskStatus::Running, $status->status); diff --git a/src/PHPUnitSwooleCoroutineExtension.php b/src/PHPUnitSwooleCoroutineExtension.php new file mode 100644 index 0000000000000000000000000000000000000000..9128a229b382ed5be6480fa616f29d06a28b4819 --- /dev/null +++ b/src/PHPUnitSwooleCoroutineExtension.php @@ -0,0 +1,43 @@ +<?php + +declare(strict_types=1); + +namespace Distantmagic\Resonance; + +use PHPUnit\Event\TestRunner\Finished; +use PHPUnit\Event\TestRunner\FinishedSubscriber; +use PHPUnit\Runner\Extension\Extension; +use PHPUnit\Runner\Extension\Facade as EventFacade; +use PHPUnit\Runner\Extension\ParameterCollection; +use PHPUnit\TextUI\Configuration\Configuration; +use Swoole\Coroutine; +use Swoole\Timer; + +final class PHPUnitSwooleCoroutineExtension implements Extension +{ + public function bootstrap(Configuration $configuration, EventFacade $facade, ParameterCollection $parameters): void + { + $facade->registerSubscriber(new class($this) implements FinishedSubscriber { + public function __construct(private PHPUnitSwooleCoroutineExtension $thisClass) {} + + public function notify(Finished $event): void + { + $this->thisClass->executeAfterLastTest(); + } + }); + } + + public function executeAfterLastTest(): void + { + /** + * @var array{ coroutine_num: int } $coroutineStats + */ + $coroutineStats = Coroutine::stats(); + + while ($coroutineStats['coroutine_num'] > 1) { + Coroutine::sleep(0.1); + } + + Timer::clearAll(); + } +} diff --git a/src/PostfixBounceAnalyzerTest.php b/src/PostfixBounceAnalyzerTest.php index f05f057f13ac81a6f414c63f3a4ea0cc30e3e31d..d26af5cec0044094984ba22cc65cb8a3cdc9bb69 100644 --- a/src/PostfixBounceAnalyzerTest.php +++ b/src/PostfixBounceAnalyzerTest.php @@ -13,8 +13,6 @@ use PHPUnit\Framework\TestCase; #[CoversClass(PostfixBounceAnalyzer::class)] final class PostfixBounceAnalyzerTest extends TestCase { - use TestsDependencyInectionContainerTrait; - public const DELIVERY_REPORT = <<<'REPORT' From double-bounce@myhost Sat Feb 3 08:52:39 2024 Return-Path: <double-bounce@myhost> @@ -82,7 +80,7 @@ final class PostfixBounceAnalyzerTest extends TestCase public function test_delivery_report_is_analyzed(): void { - $analyzer = self::$container->make(PostfixBounceAnalyzer::class); + $analyzer = new PostfixBounceAnalyzer(); $report = $analyzer->extractReport(self::DELIVERY_REPORT); diff --git a/src/SwooleTimeout.php b/src/SwooleTimeout.php index 978a27ffbf5e5ab37b60091cbc2bfbe3e517415b..a74fcfee1c9d31128454a8f546fc70c8439cf070 100644 --- a/src/SwooleTimeout.php +++ b/src/SwooleTimeout.php @@ -5,24 +5,29 @@ declare(strict_types=1); namespace Distantmagic\Resonance; use Closure; +use RuntimeException; +use Swoole\Timer; readonly class SwooleTimeout { private Closure $callback; - private SwooleTimeoutScheduler $swooleTimeoutScheduler; public function __construct(callable $callback) { $this->callback = Closure::fromCallable($callback); - $this->swooleTimeoutScheduler = new SwooleTimeoutScheduler(); } public function setTimeout(float $timeout): SwooleTimeoutScheduled { - return new SwooleTimeoutScheduled( - $this->callback, - $this->swooleTimeoutScheduler->scheduleTimeout($timeout, $this->callback), - $this->swooleTimeoutScheduler, - ); + /** + * @var false|int $timerId + */ + $timerId = Timer::after((int) ($timeout * 1000), $this->callback); + + if (!is_int($timerId)) { + throw new RuntimeException('Unable to schedule a timer'); + } + + return new SwooleTimeoutScheduled($this->callback, $timerId); } } diff --git a/src/SwooleTimeoutScheduled.php b/src/SwooleTimeoutScheduled.php index 9e9faae44871cbd504b75e909ea9726daffa72fc..735ddb78455bf0a53f80c648f174ff274ccafaa9 100644 --- a/src/SwooleTimeoutScheduled.php +++ b/src/SwooleTimeoutScheduled.php @@ -6,7 +6,7 @@ namespace Distantmagic\Resonance; use Closure; use RuntimeException; -use Swoole\Coroutine; +use Swoole\Timer; readonly class SwooleTimeoutScheduled { @@ -14,22 +14,17 @@ readonly class SwooleTimeoutScheduled public function __construct( callable $callback, - private int $coroutineId, - private SwooleTimeoutScheduler $swooleTimeoutScheduler, + private int $timeoutId, ) { $this->callback = Closure::fromCallable($callback); } public function cancel(): bool { - if (!Coroutine::exists($this->coroutineId)) { - return true; - } - /** * @var bool */ - return Coroutine::cancel($this->coroutineId); + return Timer::clear($this->timeoutId); } public function reschedule(float $timeout): self @@ -38,10 +33,15 @@ readonly class SwooleTimeoutScheduled throw new RuntimeException('Unable to cancel a coroutine.'); } - return new self( - $this->callback, - $this->swooleTimeoutScheduler->scheduleTimeout($timeout, $this->callback), - $this->swooleTimeoutScheduler, - ); + /** + * @var false|int $timerId + */ + $timerId = Timer::after((int) ($timeout * 1000), $this->callback); + + if (!is_int($timerId)) { + throw new RuntimeException('Unable to schedule a timer'); + } + + return new self($this->callback, $timerId); } } diff --git a/src/SwooleTimeoutScheduler.php b/src/SwooleTimeoutScheduler.php deleted file mode 100644 index 6eb5d1807da134517ab11c682595d9f200ffe38e..0000000000000000000000000000000000000000 --- a/src/SwooleTimeoutScheduler.php +++ /dev/null @@ -1,23 +0,0 @@ -<?php - -declare(strict_types=1); - -namespace Distantmagic\Resonance; - -use Swoole\Coroutine; - -readonly class SwooleTimeoutScheduler -{ - public function scheduleTimeout( - float $timeout, - callable $callback, - ): int { - return SwooleCoroutineHelper::mustGo(static function () use ($callback, $timeout) { - Coroutine::sleep($timeout); - - if (!Coroutine::isCanceled()) { - $callback(); - } - }); - } -} diff --git a/src/SwooleTimeoutTest.php b/src/SwooleTimeoutTest.php index f3fdd91458ea1d04fa05d425411ce855abe9eea8..2e2c807801986b5fb9fd8bb306eb4bdf132f9b27 100644 --- a/src/SwooleTimeoutTest.php +++ b/src/SwooleTimeoutTest.php @@ -13,7 +13,6 @@ use Swoole\Event; */ #[CoversClass(SwooleTimeout::class)] #[CoversClass(SwooleTimeoutScheduled::class)] -#[CoversClass(SwooleTimeoutScheduler::class)] final class SwooleTimeoutTest extends TestCase { protected function tearDown(): void @@ -42,7 +41,7 @@ final class SwooleTimeoutTest extends TestCase $timeout = new SwooleTimeout(static function () use ($before) { $after = microtime(true); - self::assertGreaterThan(0.03, $after - $before); + self::assertGreaterThan(0.029, $after - $before); self::assertLessThan(0.035, $after - $before); }); diff --git a/src/WebSocketConnection.php b/src/WebSocketConnection.php index 84063de0558eaf9dc9afbfcdc3eafb0285fee0c6..94b6e5c6cc63cdd8876ffcb785a5484adf093dbd 100644 --- a/src/WebSocketConnection.php +++ b/src/WebSocketConnection.php @@ -19,8 +19,8 @@ class WebSocketConnection public readonly int $fd, ) {} - public function push(string|Stringable $response): void + public function push(string|Stringable $response): bool { - $this->server->push($this->fd, (string) $response); + return $this->server->push($this->fd, (string) $response); } } diff --git a/src/WebSocketRPCResponder/ObservableTasksTableUpdateResponder.php b/src/WebSocketRPCResponder/ObservableTasksTableUpdateResponder.php index a51d99a9b7b4cd45a4b1d9dc3f7e088075a6390d..b38daff1258d4d01f79d0ba5f2d52c0dbf235d79 100644 --- a/src/WebSocketRPCResponder/ObservableTasksTableUpdateResponder.php +++ b/src/WebSocketRPCResponder/ObservableTasksTableUpdateResponder.php @@ -6,15 +6,13 @@ namespace Distantmagic\Resonance\WebSocketRPCResponder; use Distantmagic\Resonance\Constraint; use Distantmagic\Resonance\Constraint\ObjectConstraint; +use Distantmagic\Resonance\ObservableTaskSlotStatusUpdate; use Distantmagic\Resonance\ObservableTaskTable; -use Distantmagic\Resonance\ObservableTaskTableSlotStatusUpdateIterator; use Distantmagic\Resonance\RPCRequest; use Distantmagic\Resonance\RPCResponse; use Distantmagic\Resonance\WebSocketAuthResolution; use Distantmagic\Resonance\WebSocketConnection; use Distantmagic\Resonance\WebSocketRPCResponder; -use Psr\Log\LoggerInterface; -use WeakMap; /** * @template TPayload @@ -23,53 +21,34 @@ use WeakMap; */ readonly class ObservableTasksTableUpdateResponder extends WebSocketRPCResponder { - /** - * @var WeakMap<WebSocketConnection,ObservableTaskTableSlotStatusUpdateIterator> - */ - private WeakMap $runningCompletions; - public function __construct( - private LoggerInterface $logger, private ObservableTaskTable $observableTaskTable, - ) { - /** - * @var WeakMap<WebSocketConnection,ObservableTaskTableSlotStatusUpdateIterator> - */ - $this->runningCompletions = new WeakMap(); - } + ) {} public function getConstraint(): Constraint { return new ObjectConstraint(); } - public function onBeforeMessage( - WebSocketAuthResolution $webSocketAuthResolution, - WebSocketConnection $webSocketConnection, - ): void { - if ($this->runningCompletions->offsetExists($webSocketConnection)) { - $this->runningCompletions->offsetGet($webSocketConnection)->close(); - } - } - public function onRequest( WebSocketAuthResolution $webSocketAuthResolution, WebSocketConnection $webSocketConnection, RPCRequest $rpcRequest, ): void { - $observableTaskUpdatesIterator = new ObservableTaskTableSlotStatusUpdateIterator($this->observableTaskTable); - - $this->runningCompletions->offsetSet($webSocketConnection, $observableTaskUpdatesIterator); - - foreach ($observableTaskUpdatesIterator as $observableTaskSlotStatusUpdate) { - if (!$webSocketConnection->status->isOpen()) { - break; + $this->observableTaskTable->observers->add( + static function (ObservableTaskSlotStatusUpdate $observableTaskSlotStatusUpdate) use ( + $rpcRequest, + $webSocketConnection, + ): bool { + if (!$webSocketConnection->status->isOpen()) { + return false; + } + + return $webSocketConnection->push(new RPCResponse( + rpcRequest: $rpcRequest, + content: $observableTaskSlotStatusUpdate, + )); } - - $webSocketConnection->push(new RPCResponse( - rpcRequest: $rpcRequest, - content: $observableTaskSlotStatusUpdate, - )); - } + ); } }