diff --git a/Makefile b/Makefile index 1866eb795be6923f8a0cd7908a21a58c9e2c6049..6c826fe20e832b76fba23d54852b678f40007650 100644 --- a/Makefile +++ b/Makefile @@ -159,6 +159,14 @@ psalm: tools/psalm/vendor/bin/psalm vendor --show-info=true \ --root=$(CURDIR) +.PHONY: psalm.taint +psalm.taint: node_modules vendor + ./tools/psalm/vendor/bin/psalm \ + --no-cache \ + --show-info=true \ + --root=$(CURDIR) \ + --taint-analysis + .PHONY: psalm.watch psalm.watch: node_modules vendor ./node_modules/.bin/nodemon \ diff --git a/composer.lock b/composer.lock index 50c356a4785418e092077c99acd263de356e99dc..86c09ec88b2f3f20e442c6f0b98948808f843534 100644 --- a/composer.lock +++ b/composer.lock @@ -1048,16 +1048,16 @@ }, { "name": "doctrine/dbal", - "version": "3.7.2", + "version": "3.7.3", "source": { "type": "git", "url": "https://github.com/doctrine/dbal.git", - "reference": "0ac3c270590e54910715e9a1a044cc368df282b2" + "reference": "ce594cbc39a4866c544f1a970d285ff0548221ad" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/doctrine/dbal/zipball/0ac3c270590e54910715e9a1a044cc368df282b2", - "reference": "0ac3c270590e54910715e9a1a044cc368df282b2", + "url": "https://api.github.com/repos/doctrine/dbal/zipball/ce594cbc39a4866c544f1a970d285ff0548221ad", + "reference": "ce594cbc39a4866c544f1a970d285ff0548221ad", "shasum": "" }, "require": { @@ -1073,14 +1073,14 @@ "doctrine/coding-standard": "12.0.0", "fig/log-test": "^1", "jetbrains/phpstorm-stubs": "2023.1", - "phpstan/phpstan": "1.10.42", + "phpstan/phpstan": "1.10.56", "phpstan/phpstan-strict-rules": "^1.5", - "phpunit/phpunit": "9.6.13", + "phpunit/phpunit": "9.6.15", "psalm/plugin-phpunit": "0.18.4", "slevomat/coding-standard": "8.13.1", - "squizlabs/php_codesniffer": "3.7.2", - "symfony/cache": "^5.4|^6.0", - "symfony/console": "^4.4|^5.4|^6.0", + "squizlabs/php_codesniffer": "3.8.1", + "symfony/cache": "^5.4|^6.0|^7.0", + "symfony/console": "^4.4|^5.4|^6.0|^7.0", "vimeo/psalm": "4.30.0" }, "suggest": { @@ -1141,7 +1141,7 @@ ], "support": { "issues": "https://github.com/doctrine/dbal/issues", - "source": "https://github.com/doctrine/dbal/tree/3.7.2" + "source": "https://github.com/doctrine/dbal/tree/3.7.3" }, "funding": [ { @@ -1157,7 +1157,7 @@ "type": "tidelift" } ], - "time": "2023-11-19T08:06:58+00:00" + "time": "2024-01-21T07:53:09+00:00" }, { "name": "doctrine/deprecations", @@ -1640,16 +1640,16 @@ }, { "name": "doctrine/orm", - "version": "2.17.2", + "version": "2.17.3", "source": { "type": "git", "url": "https://github.com/doctrine/orm.git", - "reference": "393679a4795e49b0b3ac317dce84d0f8888f2b77" + "reference": "398ab0547aaf90bdb352b560a94c24f44ff00670" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/doctrine/orm/zipball/393679a4795e49b0b3ac317dce84d0f8888f2b77", - "reference": "393679a4795e49b0b3ac317dce84d0f8888f2b77", + "url": "https://api.github.com/repos/doctrine/orm/zipball/398ab0547aaf90bdb352b560a94c24f44ff00670", + "reference": "398ab0547aaf90bdb352b560a94c24f44ff00670", "shasum": "" }, "require": { @@ -1735,9 +1735,9 @@ ], "support": { "issues": "https://github.com/doctrine/orm/issues", - "source": "https://github.com/doctrine/orm/tree/2.17.2" + "source": "https://github.com/doctrine/orm/tree/2.17.3" }, - "time": "2023-12-20T21:47:52+00:00" + "time": "2024-01-16T21:32:04+00:00" }, { "name": "doctrine/persistence", @@ -3049,16 +3049,16 @@ }, { "name": "nette/php-generator", - "version": "v4.1.2", + "version": "v4.1.3", "source": { "type": "git", "url": "https://github.com/nette/php-generator.git", - "reference": "abc0e79b2d02d4b8aba5933765b90df3f610c143" + "reference": "08ab9bff22ae34fe4e1d2fe8ba16b3770ea2459f" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/nette/php-generator/zipball/abc0e79b2d02d4b8aba5933765b90df3f610c143", - "reference": "abc0e79b2d02d4b8aba5933765b90df3f610c143", + "url": "https://api.github.com/repos/nette/php-generator/zipball/08ab9bff22ae34fe4e1d2fe8ba16b3770ea2459f", + "reference": "08ab9bff22ae34fe4e1d2fe8ba16b3770ea2459f", "shasum": "" }, "require": { @@ -3068,7 +3068,7 @@ "require-dev": { "jetbrains/phpstorm-attributes": "dev-master", "nette/tester": "^2.4", - "nikic/php-parser": "^4.15", + "nikic/php-parser": "^4.18 || ^5.0", "phpstan/phpstan": "^1.0", "tracy/tracy": "^2.8" }, @@ -3112,37 +3112,37 @@ ], "support": { "issues": "https://github.com/nette/php-generator/issues", - "source": "https://github.com/nette/php-generator/tree/v4.1.2" + "source": "https://github.com/nette/php-generator/tree/v4.1.3" }, - "time": "2023-10-29T22:57:32+00:00" + "time": "2024-01-18T17:44:20+00:00" }, { "name": "nette/schema", - "version": "v1.2.5", + "version": "v1.3.0", "source": { "type": "git", "url": "https://github.com/nette/schema.git", - "reference": "0462f0166e823aad657c9224d0f849ecac1ba10a" + "reference": "a6d3a6d1f545f01ef38e60f375d1cf1f4de98188" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/nette/schema/zipball/0462f0166e823aad657c9224d0f849ecac1ba10a", - "reference": "0462f0166e823aad657c9224d0f849ecac1ba10a", + "url": "https://api.github.com/repos/nette/schema/zipball/a6d3a6d1f545f01ef38e60f375d1cf1f4de98188", + "reference": "a6d3a6d1f545f01ef38e60f375d1cf1f4de98188", "shasum": "" }, "require": { - "nette/utils": "^2.5.7 || ^3.1.5 || ^4.0", - "php": "7.1 - 8.3" + "nette/utils": "^4.0", + "php": "8.1 - 8.3" }, "require-dev": { - "nette/tester": "^2.3 || ^2.4", + "nette/tester": "^2.4", "phpstan/phpstan-nette": "^1.0", - "tracy/tracy": "^2.7" + "tracy/tracy": "^2.8" }, "type": "library", "extra": { "branch-alias": { - "dev-master": "1.2-dev" + "dev-master": "1.3-dev" } }, "autoload": { @@ -3174,22 +3174,22 @@ ], "support": { "issues": "https://github.com/nette/schema/issues", - "source": "https://github.com/nette/schema/tree/v1.2.5" + "source": "https://github.com/nette/schema/tree/v1.3.0" }, - "time": "2023-10-05T20:37:59+00:00" + "time": "2023-12-11T11:54:22+00:00" }, { "name": "nette/utils", - "version": "v4.0.3", + "version": "v4.0.4", "source": { "type": "git", "url": "https://github.com/nette/utils.git", - "reference": "a9d127dd6a203ce6d255b2e2db49759f7506e015" + "reference": "d3ad0aa3b9f934602cb3e3902ebccf10be34d218" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/nette/utils/zipball/a9d127dd6a203ce6d255b2e2db49759f7506e015", - "reference": "a9d127dd6a203ce6d255b2e2db49759f7506e015", + "url": "https://api.github.com/repos/nette/utils/zipball/d3ad0aa3b9f934602cb3e3902ebccf10be34d218", + "reference": "d3ad0aa3b9f934602cb3e3902ebccf10be34d218", "shasum": "" }, "require": { @@ -3260,9 +3260,9 @@ ], "support": { "issues": "https://github.com/nette/utils/issues", - "source": "https://github.com/nette/utils/tree/v4.0.3" + "source": "https://github.com/nette/utils/tree/v4.0.4" }, - "time": "2023-10-29T21:02:13+00:00" + "time": "2024-01-17T16:50:36+00:00" }, { "name": "nicmart/tree", @@ -6701,16 +6701,16 @@ }, { "name": "webonyx/graphql-php", - "version": "v15.8.1", + "version": "v15.9.0", "source": { "type": "git", "url": "https://github.com/webonyx/graphql-php.git", - "reference": "575ac95f13adfb38219a748572355385c101fdf7" + "reference": "84a9e2ccd06c8d643bc1e3cf9cf0076cbfa3dc36" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/webonyx/graphql-php/zipball/575ac95f13adfb38219a748572355385c101fdf7", - "reference": "575ac95f13adfb38219a748572355385c101fdf7", + "url": "https://api.github.com/repos/webonyx/graphql-php/zipball/84a9e2ccd06c8d643bc1e3cf9cf0076cbfa3dc36", + "reference": "84a9e2ccd06c8d643bc1e3cf9cf0076cbfa3dc36", "shasum": "" }, "require": { @@ -6723,19 +6723,19 @@ "amphp/http-server": "^2.1", "dms/phpunit-arraysubset-asserts": "dev-master", "ergebnis/composer-normalize": "^2.28", - "friendsofphp/php-cs-fixer": "3.30.0", + "friendsofphp/php-cs-fixer": "3.48.0", "mll-lab/php-cs-fixer-config": "^5", "nyholm/psr7": "^1.5", "phpbench/phpbench": "^1.2", "phpstan/extension-installer": "^1.1", - "phpstan/phpstan": "1.10.47", + "phpstan/phpstan": "1.10.56", "phpstan/phpstan-phpunit": "1.3.15", "phpstan/phpstan-strict-rules": "1.5.2", "phpunit/phpunit": "^9.5 || ^10", "psr/http-message": "^1 || ^2", "react/http": "^1.6", - "react/promise": "^2.9", - "rector/rector": "^0.18", + "react/promise": "^2.0 || ^3.0", + "rector/rector": "^0.19", "symfony/polyfill-php81": "^1.23", "symfony/var-exporter": "^5 || ^6 || ^7", "thecodingmachine/safe": "^1.3 || ^2" @@ -6763,7 +6763,7 @@ ], "support": { "issues": "https://github.com/webonyx/graphql-php/issues", - "source": "https://github.com/webonyx/graphql-php/tree/v15.8.1" + "source": "https://github.com/webonyx/graphql-php/tree/v15.9.0" }, "funding": [ { @@ -6771,7 +6771,7 @@ "type": "open_collective" } ], - "time": "2023-12-05T17:23:35+00:00" + "time": "2024-01-21T09:37:43+00:00" } ], "packages-dev": [ @@ -7460,16 +7460,16 @@ }, { "name": "phpunit/phpunit", - "version": "10.5.7", + "version": "10.5.9", "source": { "type": "git", "url": "https://github.com/sebastianbergmann/phpunit.git", - "reference": "e5c5b397a95cb0db013270a985726fcae93e61b8" + "reference": "0bd663704f0165c9e76fe4f06ffa6a1ca727fdbe" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/sebastianbergmann/phpunit/zipball/e5c5b397a95cb0db013270a985726fcae93e61b8", - "reference": "e5c5b397a95cb0db013270a985726fcae93e61b8", + "url": "https://api.github.com/repos/sebastianbergmann/phpunit/zipball/0bd663704f0165c9e76fe4f06ffa6a1ca727fdbe", + "reference": "0bd663704f0165c9e76fe4f06ffa6a1ca727fdbe", "shasum": "" }, "require": { @@ -7541,7 +7541,7 @@ "support": { "issues": "https://github.com/sebastianbergmann/phpunit/issues", "security": "https://github.com/sebastianbergmann/phpunit/security/policy", - "source": "https://github.com/sebastianbergmann/phpunit/tree/10.5.7" + "source": "https://github.com/sebastianbergmann/phpunit/tree/10.5.9" }, "funding": [ { @@ -7557,7 +7557,7 @@ "type": "tidelift" } ], - "time": "2024-01-14T16:40:30+00:00" + "time": "2024-01-22T14:35:40+00:00" }, { "name": "sebastian/cli-parser", diff --git a/docs/pages/docs/features/ai/server/llama-cpp/index.md b/docs/pages/docs/features/ai/server/llama-cpp/index.md index 365c1cdbf2ed9c0cac4aa376ff829b3fc5337fac..30469b001edc73d58db510fec8a07dfab9012cf0 100644 --- a/docs/pages/docs/features/ai/server/llama-cpp/index.md +++ b/docs/pages/docs/features/ai/server/llama-cpp/index.md @@ -63,10 +63,6 @@ class LlamaCppGenerate // from the model you are using foreach ($completion as $token) { swoole_error_log(SWOOLE_LOG_DEBUG, (string) $token); - - if ($token->isLast) { - // ...do something else - } } } } @@ -76,21 +72,17 @@ class LlamaCppGenerate :::danger Using just the `break` keyword does not stop the completion request. You need -to use `$completion->send(...)`. - -See also: [Generator::send(...)](https://www.php.net/manual/en/generator.send.php) +to use `$completion->stop()`. ::: For example, stops after generating 10 tokens: ```php -use Distantmagic\Resonance\LlamaCppCompletionCommand; - $i = 0; foreach ($completion as $token) { if ($i > 9) { - $completion->send(LlamaCppCompletionCommand::Stop); + $completion->stop(); } else { // do something diff --git a/docs/pages/tutorials/how-to-create-llm-websocket-chat-with-llama-cpp/index.md b/docs/pages/tutorials/how-to-create-llm-websocket-chat-with-llama-cpp/index.md index 7c054142b8b8ee643b8e8daf4560db09e75e5cea..a263fd977bf0e3ee086bcdc84fdfe2cc950f7c5d 100644 --- a/docs/pages/tutorials/how-to-create-llm-websocket-chat-with-llama-cpp/index.md +++ b/docs/pages/tutorials/how-to-create-llm-websocket-chat-with-llama-cpp/index.md @@ -204,7 +204,6 @@ use Distantmagic\Resonance\Attribute\RespondsToWebSocketRPC; use Distantmagic\Resonance\Attribute\Singleton; use Distantmagic\Resonance\Feature; use Distantmagic\Resonance\LlamaCppClient; -use Distantmagic\Resonance\LlamaCppCompletionCommand; use Distantmagic\Resonance\LlamaCppCompletionRequest; use Distantmagic\Resonance\RPCNotification; use Distantmagic\Resonance\RPCRequest; @@ -241,7 +240,7 @@ final readonly class LlmChatPromptResponder extends WebSocketRPCResponder $token->content, )); } else { - $completion->send(LlamaCppCompletionCommand::Stop); + $completion->stop(); } } } diff --git a/docs/pages/tutorials/how-to-serve-llm-completions/index.md b/docs/pages/tutorials/how-to-serve-llm-completions/index.md index 166820e4b10c8205f95e65e807bbbf59179a0eb8..60ad9a9007538a56177f262dff1d8f04cfd5c481 100644 --- a/docs/pages/tutorials/how-to-serve-llm-completions/index.md +++ b/docs/pages/tutorials/how-to-serve-llm-completions/index.md @@ -128,10 +128,6 @@ class LlamaCppGenerate // from the model you are using foreach ($completion as $token) { swoole_error_log(SWOOLE_LOG_DEBUG, (string) $token); - - if ($token->isLast) { - // ...do something else - } } } } diff --git a/psalm.xml b/psalm.xml index ae4116b3a0bbc5467f98981c728a9b0f75721bc0..c55a91c5105000a50d0886a5ded9405909547213 100644 --- a/psalm.xml +++ b/psalm.xml @@ -76,6 +76,7 @@ <file name="vendor/swoole/ide-helper/src/swoole/Swoole/Http/Request.php" /> <file name="vendor/swoole/ide-helper/src/swoole/Swoole/Http/Response.php" /> <file name="vendor/swoole/ide-helper/src/swoole/Swoole/Http/Server.php" /> + <file name="vendor/swoole/ide-helper/src/swoole/Swoole/Table.php" /> <file name="vendor/swoole/ide-helper/src/swoole/Swoole/WebSocket/Frame.php" /> <file name="vendor/swoole/ide-helper/src/swoole/Swoole/WebSocket/Server.php" /> <file name="vendor/swoole/ide-helper/src/swoole_library/src/core/ConnectionPool.php" /> diff --git a/src/Command/Serve.php b/src/Command/Serve.php index 85dcbf7776428e0165c0de10587d7d4a9624b310..d167b0204469572427c81b4b1c31632a0275c461 100644 --- a/src/Command/Serve.php +++ b/src/Command/Serve.php @@ -13,11 +13,14 @@ use Distantmagic\Resonance\Event\HttpServerStarted; use Distantmagic\Resonance\EventDispatcherInterface; use Distantmagic\Resonance\HttpResponderAggregate; use Distantmagic\Resonance\SwooleConfiguration; +use Distantmagic\Resonance\WebSocketConnectionShutdownCommand; use Distantmagic\Resonance\WebSocketServerController; use Psr\Log\LoggerInterface; +use RuntimeException; use Swoole\Http\Request; use Swoole\Http\Response; use Swoole\Http\Server as HttpServer; +use Swoole\Server; use Swoole\WebSocket\Server as WebSocketServer; use Symfony\Component\Console\Input\InputInterface; use Symfony\Component\Console\Output\OutputInterface; @@ -74,6 +77,7 @@ final class Serve extends Command ]); $this->server->on('beforeShutdown', $this->onBeforeShutdown(...)); + $this->server->on('pipeMessage', $this->onPipeMessage(...)); $this->server->on('request', $this->httpResponderAggregate->respond(...)); $this->server->on('start', $this->onStart(...)); @@ -89,7 +93,7 @@ final class Serve extends Command private function onBeforeShutdown(): void { - $this->eventDispatcher->dispatch(new HttpServerBeforeStop()); + $this->eventDispatcher->dispatch(new HttpServerBeforeStop($this->server)); } private function onHandshake(Request $request, Response $response): void @@ -99,6 +103,20 @@ final class Serve extends Command } } + private function onPipeMessage(Server $server, int $srcWorkerId, mixed $data): void + { + if ($data instanceof WebSocketConnectionShutdownCommand) { + if (!($server instanceof WebSocketServer)) { + throw new RuntimeException(sprintf( + 'Only works with WebSocket server: "%s"', + WebSocketConnectionShutdownCommand::class, + )); + } + + $this->webSocketServerController?->onClose($data->fd); + } + } + private function onStart(): void { $this->eventDispatcher->dispatch(new HttpServerStarted()); diff --git a/src/DatabaseConnection.php b/src/DatabaseConnection.php index b9ce439908e27f252013dd51e781b19cd8aac8a1..063b0267da79cc291fa300a5d8cfd0a133baeeb0 100644 --- a/src/DatabaseConnection.php +++ b/src/DatabaseConnection.php @@ -61,6 +61,9 @@ readonly class DatabaseConnection implements ServerInfoAwareConnection return true; } + /** + * @psalm-taint-sink sql $sql + */ public function exec(string $sql): int { /** diff --git a/src/Event/HttpServerBeforeStop.php b/src/Event/HttpServerBeforeStop.php index ea16eb44c6dc2a8c6165db6b4fc2155fa52e152f..ee4bf31556efff33c7ee51aa8ec6969ab0948522 100644 --- a/src/Event/HttpServerBeforeStop.php +++ b/src/Event/HttpServerBeforeStop.php @@ -5,8 +5,9 @@ declare(strict_types=1); namespace Distantmagic\Resonance\Event; use Distantmagic\Resonance\Event; +use Swoole\Server; final readonly class HttpServerBeforeStop extends Event { - public function __construct() {} + public function __construct(public Server $server) {} } diff --git a/src/EventListener/CloseWebSocketConnections.php b/src/EventListener/CloseWebSocketConnections.php new file mode 100644 index 0000000000000000000000000000000000000000..621695d861585ddce26d45b162b5f9425cac5cb7 --- /dev/null +++ b/src/EventListener/CloseWebSocketConnections.php @@ -0,0 +1,58 @@ +<?php + +declare(strict_types=1); + +namespace Distantmagic\Resonance\EventListener; + +use Distantmagic\Resonance\Attribute\ListensTo; +use Distantmagic\Resonance\Attribute\Singleton; +use Distantmagic\Resonance\Event\HttpServerBeforeStop; +use Distantmagic\Resonance\EventInterface; +use Distantmagic\Resonance\EventListener; +use Distantmagic\Resonance\Feature; +use Distantmagic\Resonance\SingletonCollection; +use Distantmagic\Resonance\WebSocketConnectionShutdownCommand; +use Distantmagic\Resonance\WebSocketServerConnectionTable; +use RuntimeException; + +/** + * @template-extends EventListener<HttpServerBeforeStop,void> + */ +#[ListensTo(HttpServerBeforeStop::class)] +#[Singleton( + collection: SingletonCollection::EventListener, + grantsFeature: Feature::WebSocket, +)] +final readonly class CloseWebSocketConnections extends EventListener +{ + public function __construct( + private ?WebSocketServerConnectionTable $webSocketServerConnectionTable = null, + ) {} + + /** + * @param HttpServerBeforeStop $event + */ + public function handle(EventInterface $event): void + { + if (!$this->webSocketServerConnectionTable) { + throw new RuntimeException('WebSocket close connections listener should not have been registered'); + } + + foreach ($this->webSocketServerConnectionTable as $fd => $workerId) { + /** + * @psalm-suppress InvalidArgument `sendMessage` has type errors + */ + if (!$event->server->sendMessage( + new WebSocketConnectionShutdownCommand($fd), + $workerId, + )) { + throw new RuntimeException('Unable to send server message'); + } + } + } + + public function shouldRegister(): bool + { + return !is_null($this->webSocketServerConnectionTable); + } +} diff --git a/src/HttpRequestData.php b/src/HttpRequestData.php index f153123b1a9d2f1dba039e28cb8bb49d15fe8173..ca086df9ecf1e7e141f6d0b83c9fdd4f3bac38a7 100644 --- a/src/HttpRequestData.php +++ b/src/HttpRequestData.php @@ -8,6 +8,9 @@ use RuntimeException; readonly class HttpRequestData { + /** + * @psalm-taint-source user_data $data + */ public function __construct(private mixed $data) {} public function get(string $name, ?string $default = null): ?string diff --git a/src/LlamaCppClient.php b/src/LlamaCppClient.php index 16dc0e2ac6a22f977ac02a308c160876b0d7d8ff..facbdaecec53e3521010ec4387e2d82efdbaf762 100644 --- a/src/LlamaCppClient.php +++ b/src/LlamaCppClient.php @@ -14,60 +14,20 @@ use Swoole\Coroutine\Channel; #[Singleton] readonly class LlamaCppClient { - // strlen('data: ') - private const COMPLETION_CHUNKED_DATA_PREFIX_LENGTH = 6; - public function __construct( private JsonSerializer $jsonSerializer, private LlamaCppConfiguration $llamaCppConfiguration, private LlamaCppLinkBuilder $llamaCppLinkBuilder, ) {} - /** - * @return Generator<int,LlamaCppCompletionToken,null|LlamaCppCompletionCommand> - */ - public function generateCompletion(LlamaCppCompletionRequest $request): Generator + public function generateCompletion(LlamaCppCompletionRequest $request): LlamaCppCompletionIterator { $responseChunks = $this->streamResponse($request, '/completion'); - /** - * @var null|string - */ - $previousContent = null; - - foreach ($responseChunks as $responseChunk) { - /** - * @var object{ - * content: string, - * stop: boolean, - * } - */ - $unserializedToken = $this->jsonSerializer->unserialize( - json: $responseChunk, - offset: self::COMPLETION_CHUNKED_DATA_PREFIX_LENGTH, - ); - - if (is_string($previousContent)) { - $shouldContinue = yield new LlamaCppCompletionToken( - content: $previousContent, - isLast: $unserializedToken->stop, - ); - - if (LlamaCppCompletionCommand::Stop === $shouldContinue) { - if (!$responseChunks->channel->close()) { - throw new RuntimeException('Unable to close coroutine channel'); - } - - break; - } - - $previousContent = null; - } - - if (!$unserializedToken->stop) { - $previousContent = $unserializedToken->content; - } - } + return new LlamaCppCompletionIterator( + $this->jsonSerializer, + $responseChunks, + ); } public function generateEmbedding(LlamaCppEmbeddingRequest $request): LlamaCppEmbedding diff --git a/src/LlamaCppCompletionCommand.php b/src/LlamaCppCompletionCommand.php deleted file mode 100644 index 99dbf1f67ddcb539cfaca9c15db4b11575450624..0000000000000000000000000000000000000000 --- a/src/LlamaCppCompletionCommand.php +++ /dev/null @@ -1,10 +0,0 @@ -<?php - -declare(strict_types=1); - -namespace Distantmagic\Resonance; - -enum LlamaCppCompletionCommand -{ - case Stop; -} diff --git a/src/LlamaCppCompletionIterator.php b/src/LlamaCppCompletionIterator.php new file mode 100644 index 0000000000000000000000000000000000000000..bc5ee46e65d922e773848ab985675fac14b046c2 --- /dev/null +++ b/src/LlamaCppCompletionIterator.php @@ -0,0 +1,60 @@ +<?php + +declare(strict_types=1); + +namespace Distantmagic\Resonance; + +use Generator; +use IteratorAggregate; +use RuntimeException; + +/** + * @template-implements IteratorAggregate<LlamaCppCompletionToken> + */ +readonly class LlamaCppCompletionIterator implements IteratorAggregate +{ + // strlen('data: ') + private const COMPLETION_CHUNKED_DATA_PREFIX_LENGTH = 6; + + /** + * @param SwooleChannelIterator<string> $responseChunks + */ + public function __construct( + private JsonSerializer $jsonSerializer, + private SwooleChannelIterator $responseChunks, + ) {} + + /** + * @return Generator<LlamaCppCompletionToken> + */ + public function getIterator(): Generator + { + foreach ($this->responseChunks as $responseChunk) { + /** + * @var object{ + * content: string, + * stop: boolean, + * } + */ + $unserializedToken = $this->jsonSerializer->unserialize( + json: $responseChunk, + offset: self::COMPLETION_CHUNKED_DATA_PREFIX_LENGTH, + ); + + yield new LlamaCppCompletionToken( + content: $unserializedToken->content, + ); + } + } + + public function stop(): void + { + if (SWOOLE_CHANNEL_OK !== $this->responseChunks->channel->errCode) { + return; + } + + if (!$this->responseChunks->channel->close()) { + throw new RuntimeException('Unable to close coroutine channel'); + } + } +} diff --git a/src/LlamaCppCompletionToken.php b/src/LlamaCppCompletionToken.php index 6b3e9fb90f275624d07600677eef72c69908f7fa..22e71249237df6011e5c991403f566ffb11d4240 100644 --- a/src/LlamaCppCompletionToken.php +++ b/src/LlamaCppCompletionToken.php @@ -10,7 +10,6 @@ readonly class LlamaCppCompletionToken implements Stringable { public function __construct( public string $content, - public bool $isLast, ) {} public function __toString(): string diff --git a/src/LlamaCppConfiguration.php b/src/LlamaCppConfiguration.php index dd9654c4f2ee27788d3dc6dba4c8f40d9d1a6f2b..afdb15e8b383ca96aeea7ea1aee011c6257f1118 100644 --- a/src/LlamaCppConfiguration.php +++ b/src/LlamaCppConfiguration.php @@ -4,13 +4,27 @@ declare(strict_types=1); namespace Distantmagic\Resonance; +use SensitiveParameter; + readonly class LlamaCppConfiguration { + /** + * @psalm-taint-source system_secret $apiKey + * @psalm-taint-source system_secret $completionTokenTimeout + * @psalm-taint-source system_secret $host + * @psalm-taint-source system_secret $port + * @psalm-taint-source system_secret $scheme + */ public function __construct( + #[SensitiveParameter] public ?string $apiKey, + #[SensitiveParameter] public float $completionTokenTimeout, + #[SensitiveParameter] public string $host, + #[SensitiveParameter] public int $port, + #[SensitiveParameter] public string $scheme, ) {} } diff --git a/src/RedisConnectionPoolConfiguration.php b/src/RedisConnectionPoolConfiguration.php index ff9f65ddc7a0b27d7cb7ee6eb8cbfbbdea47ccad..8ef6515289283fbac7c38cb325259a4cfdf2a54a 100644 --- a/src/RedisConnectionPoolConfiguration.php +++ b/src/RedisConnectionPoolConfiguration.php @@ -8,6 +8,16 @@ use SensitiveParameter; readonly class RedisConnectionPoolConfiguration { + /** + * @psalm-taint-source system_secret $dbIndex + * @psalm-taint-source system_secret $host + * @psalm-taint-source system_secret $password + * @psalm-taint-source system_secret $poolPrefill + * @psalm-taint-source system_secret $poolSize + * @psalm-taint-source system_secret $port + * @psalm-taint-source system_secret $prefix + * @psalm-taint-source system_secret $timeout + */ public function __construct( #[SensitiveParameter] public int $dbIndex, diff --git a/src/SingletonProvider/ConfigurationProvider/WebSocketConfigurationProvider.php b/src/SingletonProvider/ConfigurationProvider/WebSocketConfigurationProvider.php new file mode 100644 index 0000000000000000000000000000000000000000..a49ed0651644b30f02c5e37c2924e7489270d1bb --- /dev/null +++ b/src/SingletonProvider/ConfigurationProvider/WebSocketConfigurationProvider.php @@ -0,0 +1,50 @@ +<?php + +declare(strict_types=1); + +namespace Distantmagic\Resonance\SingletonProvider\ConfigurationProvider; + +use Distantmagic\Resonance\Attribute\Singleton; +use Distantmagic\Resonance\Feature; +use Distantmagic\Resonance\JsonSchema; +use Distantmagic\Resonance\SingletonProvider\ConfigurationProvider; +use Distantmagic\Resonance\WebSocketConfiguration; + +/** + * @template-extends ConfigurationProvider<WebSocketConfiguration, object{ + * max_connections: int, + * }> + */ +#[Singleton( + grantsFeature: Feature::WebSocket, + provides: WebSocketConfiguration::class, +)] +final readonly class WebSocketConfigurationProvider extends ConfigurationProvider +{ + protected function getConfigurationKey(): string + { + return 'swoole'; + } + + protected function makeSchema(): JsonSchema + { + return new JsonSchema([ + 'type' => 'object', + 'properties' => [ + 'max_connections' => [ + 'type' => 'integer', + 'minimum' => 1, + 'maximum' => 65535, + 'default' => 10000, + ], + ], + ]); + } + + protected function provideConfiguration($validatedData): WebSocketConfiguration + { + return new WebSocketConfiguration( + maxConnections: $validatedData->max_connections, + ); + } +} diff --git a/src/StaticPageConfiguration.php b/src/StaticPageConfiguration.php index 0b58c1325f2161eeebe2caf767d453e776fe6bd6..29e678e29937b73d42e856149f3196e12ce4c0ff 100644 --- a/src/StaticPageConfiguration.php +++ b/src/StaticPageConfiguration.php @@ -4,13 +4,23 @@ declare(strict_types=1); namespace Distantmagic\Resonance; +use SensitiveParameter; + readonly class StaticPageConfiguration { + /** + * @psalm-taint-source file $esbuildMetafile + * @psalm-taint-source file $outputDirectory + * @psalm-taint-source file $sitemap + */ public function __construct( public string $baseUrl, + #[SensitiveParameter] public string $esbuildMetafile, + #[SensitiveParameter] public string $inputDirectory, public string $outputDirectory, + #[SensitiveParameter] public string $sitemap, public string $stripOutputPrefix, ) {} diff --git a/src/StaticPageSitemapGenerator.php b/src/StaticPageSitemapGenerator.php index f6d35f6455f9296d28fca82292042b07237fc752..1dbd0f39f617a758e40b8ac5721c97e1121bf0ab 100644 --- a/src/StaticPageSitemapGenerator.php +++ b/src/StaticPageSitemapGenerator.php @@ -14,6 +14,9 @@ readonly class StaticPageSitemapGenerator private StaticPageConfiguration $staticPageConfiguration, ) {} + /** + * @psalm-taint-sink file $filename + */ public function writeTo(string $filename): void { $baseUrl = $this->staticPageConfiguration->baseUrl; diff --git a/src/SwooleChannelIterator.php b/src/SwooleChannelIterator.php index 6ce038d4917ad054ab2f6d80e302938629dfd04e..39f91f31c45dd2641b02d039adf1cecc40b585d1 100644 --- a/src/SwooleChannelIterator.php +++ b/src/SwooleChannelIterator.php @@ -27,15 +27,18 @@ readonly class SwooleChannelIterator implements IteratorAggregate } /** + * @psalm-suppress TypeDoesNotContainType false positive, swoole channel + * status + * * @return Generator<int,TData,bool> */ public function getIterator(): Generator { - do { - if (SWOOLE_CHANNEL_CLOSED === $this->channel->errCode) { - return; - } + if (SWOOLE_CHANNEL_OK !== $this->channel->errCode) { + throw new RuntimeException('Channel is not OK.'); + } + do { /** * @var mixed $data explicitly mixed for typechecks */ @@ -44,13 +47,17 @@ readonly class SwooleChannelIterator implements IteratorAggregate if (false === $data) { switch ($this->channel->errCode) { case SWOOLE_CHANNEL_CLOSED: + case SWOOLE_CHANNEL_TIMEOUT: return; case SWOOLE_CHANNEL_OK: throw new RuntimeException('Using "false" is ambiguous in channels'); - case SWOOLE_CHANNEL_TIMEOUT: - throw new RuntimeException('Swoole channel timed out'); } - } else { + } + + /** + * @psalm-suppress RedundantCondition errCode might change async + */ + if (SWOOLE_CHANNEL_OK === $this->channel->errCode) { yield $data; } } while (true); diff --git a/src/SwooleConfiguration.php b/src/SwooleConfiguration.php index c109c29ebf3894f86bf6760f12282033002ff15d..e312b8ad193e1dbefeb3423e8f1406dbedd8b286 100644 --- a/src/SwooleConfiguration.php +++ b/src/SwooleConfiguration.php @@ -4,14 +4,26 @@ declare(strict_types=1); namespace Distantmagic\Resonance; +use SensitiveParameter; + readonly class SwooleConfiguration { + /** + * @psalm-taint-source file $sslCertFile + * @psalm-taint-source file $sslKeyFile + * @psalm-taint-source system_secret $host + * @psalm-taint-source system_secret $port + */ public function __construct( + #[SensitiveParameter] public string $host, public int $logLevel, public bool $logRequests, + #[SensitiveParameter] public int $port, + #[SensitiveParameter] public string $sslCertFile, + #[SensitiveParameter] public string $sslKeyFile, ) {} } diff --git a/src/TranslationsLoader.php b/src/TranslationsLoader.php index fff375d9351eb02c55be3ffc90ee51ddb87162b7..ba9f718eeb751b9101fd5bf0594fed2f80e841a6 100644 --- a/src/TranslationsLoader.php +++ b/src/TranslationsLoader.php @@ -10,6 +10,8 @@ use RuntimeException; readonly class TranslationsLoader { /** + * @psalm-taint-sink file $filename + * * @return Map<string, string> */ public static function load(string $filename): Map @@ -17,6 +19,9 @@ readonly class TranslationsLoader return (new self($filename))->getTranslations(); } + /** + * @psalm-taint-sink file $filename + */ public function __construct(private string $filename) {} /** diff --git a/src/TwigTemplate.php b/src/TwigTemplate.php index 006e87d4c73f8f56b76a446e2d1c583164048695..278640283d90eda698e1365cd0d5719b62197d61 100644 --- a/src/TwigTemplate.php +++ b/src/TwigTemplate.php @@ -11,6 +11,9 @@ use Swoole\Http\Response; #[ContentSecurityPolicy(ContentSecurityPolicyType::Html)] final readonly class TwigTemplate implements HttpInterceptableInterface { + /** + * @psalm-taint-source file $templatePath + */ public function __construct( private string $templatePath, private array $templateData = [], diff --git a/src/WebSocketConfiguration.php b/src/WebSocketConfiguration.php new file mode 100644 index 0000000000000000000000000000000000000000..5908cefaf78ad79dcf6dc8fbac5815cc379bedfc --- /dev/null +++ b/src/WebSocketConfiguration.php @@ -0,0 +1,18 @@ +<?php + +declare(strict_types=1); + +namespace Distantmagic\Resonance; + +use SensitiveParameter; + +readonly class WebSocketConfiguration +{ + /** + * @psalm-taint-source system_secret $maxConnections + */ + public function __construct( + #[SensitiveParameter] + public int $maxConnections, + ) {} +} diff --git a/src/WebSocketConnectionShutdownCommand.php b/src/WebSocketConnectionShutdownCommand.php new file mode 100644 index 0000000000000000000000000000000000000000..672b586e3bf56efc91f282421e224fd1d20ae6d2 --- /dev/null +++ b/src/WebSocketConnectionShutdownCommand.php @@ -0,0 +1,12 @@ +<?php + +declare(strict_types=1); + +namespace Distantmagic\Resonance; + +readonly class WebSocketConnectionShutdownCommand +{ + public function __construct( + public int $fd, + ) {} +} diff --git a/src/WebSocketProtocolController/RPCProtocolController.php b/src/WebSocketProtocolController/RPCProtocolController.php index ecafa31184a59d6977ae0093facce8f7d4939874..c6000eef9431c8659e2bc405c57b8b7c6035a8cf 100644 --- a/src/WebSocketProtocolController/RPCProtocolController.php +++ b/src/WebSocketProtocolController/RPCProtocolController.php @@ -76,7 +76,7 @@ final readonly class RPCProtocolController extends WebSocketProtocolController ); } - public function onClose(Server $server, int $fd): void + public function onClose(int $fd): void { $connectionHandle = $this->connectionHandles->get($fd, null); @@ -88,6 +88,7 @@ final readonly class RPCProtocolController extends WebSocketProtocolController } $connectionHandle->webSocketConnection->status = WebSocketConnectionStatus::Closed; + $connectionHandle->onClose(); $this->webSocketRPCConnectionController?->onClose( $connectionHandle->webSocketAuthResolution, diff --git a/src/WebSocketProtocolControllerInterface.php b/src/WebSocketProtocolControllerInterface.php index 3c603e34b0df007926ea04f007e17b2affae7c8b..f3e49655d595f61a1778ff648bbe8082d7254be8 100644 --- a/src/WebSocketProtocolControllerInterface.php +++ b/src/WebSocketProtocolControllerInterface.php @@ -12,7 +12,7 @@ interface WebSocketProtocolControllerInterface { public function isAuthorizedToConnect(Request $request): WebSocketAuthResolution; - public function onClose(Server $server, int $fd): void; + public function onClose(int $fd): void; public function onMessage(Server $server, Frame $frame): void; diff --git a/src/WebSocketRPCConnectionHandle.php b/src/WebSocketRPCConnectionHandle.php index 06ac51d0c507ea8262462e65708a958b73285734..c001426eb3bcb3e69bb858098a9f38016daca457 100644 --- a/src/WebSocketRPCConnectionHandle.php +++ b/src/WebSocketRPCConnectionHandle.php @@ -5,24 +5,46 @@ declare(strict_types=1); namespace Distantmagic\Resonance; use Distantmagic\Resonance\InputValidatedData\RPCMessage; +use Ds\Set; readonly class WebSocketRPCConnectionHandle { + /** + * @var Set<WebSocketRPCResponderInterface> + */ + private Set $activeResponders; + public function __construct( public WebSocketRPCResponderAggregate $webSocketRPCResponderAggregate, public WebSocketAuthResolution $webSocketAuthResolution, public WebSocketConnection $webSocketConnection, - ) {} + ) { + $this->activeResponders = new Set(); + } - public function onRPCMessage(RPCMessage $rpcMessage): void + public function onClose(): void { - $this - ->webSocketRPCResponderAggregate - ->respond( + foreach ($this->activeResponders as $responder) { + $responder->onClose( $this->webSocketAuthResolution, $this->webSocketConnection, - $rpcMessage - ) + ); + } + } + + public function onRPCMessage(RPCMessage $rpcMessage): void + { + $responder = $this + ->webSocketRPCResponderAggregate + ->selectResponder($rpcMessage) ; + + $this->activeResponders->add($responder); + + $responder->respond( + $this->webSocketAuthResolution, + $this->webSocketConnection, + $rpcMessage + ); } } diff --git a/src/WebSocketRPCResponder.php b/src/WebSocketRPCResponder.php index 199065ccdf488048de78a7e3544c5960a5e5eed0..25c74bfc1a11d5d849a83927cb02268d7badc7a3 100644 --- a/src/WebSocketRPCResponder.php +++ b/src/WebSocketRPCResponder.php @@ -10,6 +10,11 @@ use Distantmagic\Resonance\WebSocketProtocolException\UnexpectedRequest; abstract readonly class WebSocketRPCResponder implements WebSocketRPCResponderInterface { + public function onClose( + WebSocketAuthResolution $webSocketAuthResolution, + WebSocketConnection $webSocketConnection, + ): void {} + public function respond( WebSocketAuthResolution $webSocketAuthResolution, WebSocketConnection $webSocketConnection, diff --git a/src/WebSocketRPCResponderAggregate.php b/src/WebSocketRPCResponderAggregate.php index 9241dcaef3f288d215e7113a97135af01c093040..7457d26571f186de5c412d9143e5dd995be54e71 100644 --- a/src/WebSocketRPCResponderAggregate.php +++ b/src/WebSocketRPCResponderAggregate.php @@ -20,18 +20,7 @@ readonly class WebSocketRPCResponderAggregate $this->rpcResponders = new Map(); } - public function respond( - WebSocketAuthResolution $webSocketAuthResolution, - WebSocketConnection $webSocketConnection, - RPCMessage $rpcMessage, - ): void { - $this - ->selectResponder($rpcMessage) - ->respond($webSocketAuthResolution, $webSocketConnection, $rpcMessage) - ; - } - - private function selectResponder(RPCMessage $rpcMessage): WebSocketRPCResponderInterface + public function selectResponder(RPCMessage $rpcMessage): WebSocketRPCResponderInterface { if (!$this->rpcResponders->hasKey($rpcMessage->method)) { throw new DomainException('Unsupported RPC method: '.$rpcMessage->method->getValue()); diff --git a/src/WebSocketRPCResponderInterface.php b/src/WebSocketRPCResponderInterface.php index 8aadaf9cc1f7cf308f999f8edba19d45e4628ec6..afca1afdce1e6acb4f54fa299f1b7c7d956d938e 100644 --- a/src/WebSocketRPCResponderInterface.php +++ b/src/WebSocketRPCResponderInterface.php @@ -8,6 +8,11 @@ use Distantmagic\Resonance\InputValidatedData\RPCMessage; interface WebSocketRPCResponderInterface { + public function onClose( + WebSocketAuthResolution $webSocketAuthResolution, + WebSocketConnection $webSocketConnection, + ): void; + public function respond( WebSocketAuthResolution $webSocketAuthResolution, WebSocketConnection $webSocketConnection, diff --git a/src/WebSocketServerConnectionTable.php b/src/WebSocketServerConnectionTable.php new file mode 100644 index 0000000000000000000000000000000000000000..5d35c5580cbba4c1028fc2f78f73b78aae8d5f6e --- /dev/null +++ b/src/WebSocketServerConnectionTable.php @@ -0,0 +1,57 @@ +<?php + +declare(strict_types=1); + +namespace Distantmagic\Resonance; + +use Distantmagic\Resonance\Attribute\Singleton; +use Generator; +use IteratorAggregate; +use RuntimeException; +use Swoole\Table; + +/** + * @template-implements IteratorAggregate<int,int> + */ +#[Singleton(grantsFeature: Feature::WebSocket)] +readonly class WebSocketServerConnectionTable implements IteratorAggregate +{ + private Table $table; + + public function __construct(WebSocketConfiguration $webSocketConfiguration) + { + $this->table = new Table($webSocketConfiguration->maxConnections); + $this->table->column('worker_id', Table::TYPE_INT); + $this->table->create(); + } + + /** + * @return Generator<int,int> + */ + public function getIterator(): Generator + { + /** + * @var string $fd + * @var mixed $row explicitly mixed for typechecks + */ + foreach ($this->table as $fd => $row) { + if (is_array($row) && array_key_exists('worker_id', $row)) { + yield (int) $fd => (int) $row['worker_id']; + } else { + throw new RuntimeException('WebSocket table is corrupted'); + } + } + } + + public function registerConnection(int $fd, int $workerId): void + { + $this->table->set((string) $fd, [ + 'worker_id' => $workerId, + ]); + } + + public function unregisterConnection(int $fd): void + { + $this->table->delete((string) $fd); + } +} diff --git a/src/WebSocketServerController.php b/src/WebSocketServerController.php index 4abeead7c5945f55e934905917589ecc7e487699..0a03a1cd37035d1416499b85f052068e85303948 100644 --- a/src/WebSocketServerController.php +++ b/src/WebSocketServerController.php @@ -7,6 +7,7 @@ namespace Distantmagic\Resonance; use Distantmagic\Resonance\Attribute\Singleton; use Ds\Map; use Psr\Log\LoggerInterface; +use RuntimeException; use Swoole\Http\Request; use Swoole\Http\Response; use Swoole\WebSocket\Frame; @@ -34,25 +35,28 @@ final readonly class WebSocketServerController private const SEC_WEBSOCKET_KEY_BASE64_STRLEN = 24; /** - * @var Map<int, WebSocketProtocolControllerInterface> + * @var Map<int,WebSocketProtocolControllerInterface> */ - private Map $protocolControllerAssignments; + private Map $protocolControllers; public function __construct( private LoggerInterface $logger, private WebSocketProtocolControllerAggregate $protocolControllerAggregate, + private WebSocketServerConnectionTable $webSocketServerConnectionTable, ) { - $this->protocolControllerAssignments = new Map(); + $this->protocolControllers = new Map(); } - public function onClose(Server $server, int $fd): void + public function onClose(int $fd): void { - if (!$this->protocolControllerAssignments->hasKey($fd)) { - return; + $this->webSocketServerConnectionTable->unregisterConnection($fd); + + if (!$this->protocolControllers->hasKey($fd)) { + throw new RuntimeException('WebSocket connection is already closed'); } - $this->protocolControllerAssignments->get($fd)->onClose($server, $fd); - $this->protocolControllerAssignments->remove($fd); + $this->protocolControllers->get($fd)->onClose($fd); + $this->protocolControllers->remove($fd); } public function onHandshake(Server $server, Request $request, Response $response): void @@ -99,7 +103,19 @@ final readonly class WebSocketServerController } $fd = $request->fd; - $this->protocolControllerAssignments->put($fd, $controllerResolution->controller); + + $this->protocolControllers->put($fd, $controllerResolution->controller); + + $currentWorkerId = $server->getWorkerId(); + + if (!is_int($currentWorkerId)) { + throw new RuntimeException('WebSocket server needs to be run in a worker'); + } + + $this + ->webSocketServerConnectionTable + ->registerConnection($fd, $currentWorkerId) + ; $secWebSocketAccept = base64_encode(sha1($secWebSocketKey.self::HANDSHAKE_MAGIC_GUID, true)); @@ -117,14 +133,14 @@ final readonly class WebSocketServerController public function onMessage(Server $server, Frame $frame): void { - if (!$this->protocolControllerAssignments->hasKey($frame->fd)) { + $protocolController = $this->protocolControllers->get($frame->fd, null); + + if ($protocolController) { + $protocolController->onMessage($server, $frame); + } else { $this->logger->error(self::MESSAGE_NO_WEBSOCKET_CONTROLLER); $server->disconnect($frame->fd, SWOOLE_WEBSOCKET_CLOSE_SERVER_ERROR); - - return; } - - $this->protocolControllerAssignments->get($frame->fd)->onMessage($server, $frame); } public function onOpen(Server $server, Request $request): void