From e56157cb8cd4bdb3239925d1ff3fd79e74857cfb Mon Sep 17 00:00:00 2001
From: Mateusz Charytoniuk <mateusz.charytoniuk@protonmail.com>
Date: Mon, 25 Mar 2024 16:41:06 +0100
Subject: [PATCH] chore: document observable task table

---
 composer.json                                 |   3 +-
 composer.lock                                 |  65 +++++++++-
 .../docs/features/observability/index.md      |  13 ++
 .../observable-task-table/index.md            | 122 ++++++++++++++++++
 .../ObservableTasksDashboard.php              |  33 -----
 src/ObservableTaskTable.php                   |  23 ----
 src/ObservableTaskTableTest.php               |  21 ---
 src/SiteAction.php                            |   1 +
 .../LlamaCppSubjectActionPromptResponder.php  |   2 +-
 .../ObservableTasksTableUpdateResponder.php   |  54 --------
 src/views/observable_tasks_dashboard.twig     |  32 -----
 11 files changed, 203 insertions(+), 166 deletions(-)
 create mode 100644 docs/pages/docs/features/observability/index.md
 create mode 100644 docs/pages/docs/features/observability/observable-task-table/index.md
 delete mode 100644 src/HttpResponder/ObservableTasksDashboard.php
 delete mode 100644 src/WebSocketJsonRPCResponder/ObservableTasksTableUpdateResponder.php
 delete mode 100644 src/views/observable_tasks_dashboard.twig

diff --git a/composer.json b/composer.json
index 31379541..0828a515 100644
--- a/composer.json
+++ b/composer.json
@@ -56,7 +56,8 @@
         "symfony/uid": "^7.0",
         "symfony/doctrine-bridge": "^7.0",
         "bref/bref": "^2.1",
-        "symfony/http-foundation": "^7.0"
+        "symfony/http-foundation": "^7.0",
+        "symfony/expression-language": "^7.0"
     },
     "require-dev": {
         "phpunit/phpunit": "^11.0",
diff --git a/composer.lock b/composer.lock
index 81efe8ae..041c89f9 100644
--- a/composer.lock
+++ b/composer.lock
@@ -4,7 +4,7 @@
         "Read more about it at https://getcomposer.org/doc/01-basic-usage.md#installing-dependencies",
         "This file is @generated automatically"
     ],
-    "content-hash": "a4aa534fc5b3b730ea6124cfcff9b223",
+    "content-hash": "ef6348bdeee54a23ab896acae84599e3",
     "packages": [
         {
             "name": "amphp/amp",
@@ -6449,6 +6449,69 @@
             ],
             "time": "2023-05-23T14:45:45+00:00"
         },
+        {
+            "name": "symfony/expression-language",
+            "version": "v7.0.3",
+            "source": {
+                "type": "git",
+                "url": "https://github.com/symfony/expression-language.git",
+                "reference": "0877c599cb260c9614f9229c0a2090d6919fd621"
+            },
+            "dist": {
+                "type": "zip",
+                "url": "https://api.github.com/repos/symfony/expression-language/zipball/0877c599cb260c9614f9229c0a2090d6919fd621",
+                "reference": "0877c599cb260c9614f9229c0a2090d6919fd621",
+                "shasum": ""
+            },
+            "require": {
+                "php": ">=8.2",
+                "symfony/cache": "^6.4|^7.0",
+                "symfony/service-contracts": "^2.5|^3"
+            },
+            "type": "library",
+            "autoload": {
+                "psr-4": {
+                    "Symfony\\Component\\ExpressionLanguage\\": ""
+                },
+                "exclude-from-classmap": [
+                    "/Tests/"
+                ]
+            },
+            "notification-url": "https://packagist.org/downloads/",
+            "license": [
+                "MIT"
+            ],
+            "authors": [
+                {
+                    "name": "Fabien Potencier",
+                    "email": "fabien@symfony.com"
+                },
+                {
+                    "name": "Symfony Community",
+                    "homepage": "https://symfony.com/contributors"
+                }
+            ],
+            "description": "Provides an engine that can compile and evaluate expressions",
+            "homepage": "https://symfony.com",
+            "support": {
+                "source": "https://github.com/symfony/expression-language/tree/v7.0.3"
+            },
+            "funding": [
+                {
+                    "url": "https://symfony.com/sponsor",
+                    "type": "custom"
+                },
+                {
+                    "url": "https://github.com/fabpot",
+                    "type": "github"
+                },
+                {
+                    "url": "https://tidelift.com/funding/github/packagist/symfony/symfony",
+                    "type": "tidelift"
+                }
+            ],
+            "time": "2024-01-23T15:02:46+00:00"
+        },
         {
             "name": "symfony/filesystem",
             "version": "v7.0.3",
diff --git a/docs/pages/docs/features/observability/index.md b/docs/pages/docs/features/observability/index.md
new file mode 100644
index 00000000..981638ae
--- /dev/null
+++ b/docs/pages/docs/features/observability/index.md
@@ -0,0 +1,13 @@
+---
+collections: 
+    - documents
+layout: dm:document
+parent: docs/features/index
+title: Observability
+description: >
+    Observability is the ability to understand the state of a running system.
+---
+
+# Observability
+
+{{docs/features/observability/*/index}}
diff --git a/docs/pages/docs/features/observability/observable-task-table/index.md b/docs/pages/docs/features/observability/observable-task-table/index.md
new file mode 100644
index 00000000..0f23b9aa
--- /dev/null
+++ b/docs/pages/docs/features/observability/observable-task-table/index.md
@@ -0,0 +1,122 @@
+---
+collections: 
+    - documents
+layout: dm:document
+parent: docs/features/observability/index
+title: Observable Task Table
+description: >
+    Observable task table is a shared memory table that stores the status of 
+    observable tasks. It is used to observe the status of long-running tasks.
+---
+
+# Observable Task Table
+
+Observable task table is a shared memory table that stores the status of 
+observable tasks. It is used to observe the status of long-running tasks.
+
+# Usage
+
+## Updating Status of Observable Task
+
+Observable tasks can be observed by the `ObservableTaskTable` service. 
+
+The `ObservableTask` class is used to define the task to be observed. It's
+callback function needs to return a generator. The generator should yield 
+`ObservableTaskStatusUpdate` instances to update the status of the task.
+
+```php
+$this->observableTaskTable->observe(new ObservableTask(
+    iterableTask: function (): Generator {
+        yield new ObservableTaskStatusUpdate(ObservableTaskStatus::Running, null);
+
+        Coroutine::sleep(3);
+
+        yield new ObservableTaskStatusUpdate(ObservableTaskStatus::Finished, null);
+    },
+    name: 'test',
+    category: 'test_tasks',
+));
+```
+
+## Observing Task with Timeout
+
+The `ObservableTaskTimeoutIterator` class is used to define a timeout for the 
+task. If the task is inactive for the specified time, the task will be 
+cancelled.
+
+You can use it by composing it with an observable task:
+
+```php
+$observableTaskTimeout = new ObservableTaskTimeoutIterator(
+    iterableTask: function () use (
+        $webSocketAuthResolution,
+        $webSocketConnection,
+        $rpcRequest,
+    ): Generator {
+        yield new ObservableTaskStatusUpdate(ObservableTaskStatus::Running, null);
+
+        Coroutine::sleep(3);
+
+        yield new ObservableTaskStatusUpdate(ObservableTaskStatus::Finished, null);
+    },
+    inactivityTimeout: 5.0,
+);
+
+$this->observableTaskTable->observe(new ObservableTask(
+    iterableTask: $observableTaskTimeout,
+    name: 'test',
+    category: 'test_tasks',
+));
+```
+
+# Rendering
+
+Observable tasks table is iterable and uses shared memory. It is possible to
+iterate over it in any process to get the accurate state of currently running
+tasks.
+
+```php file:app/HttpResponder/ObservableTasksDashboard.php
+#[RespondsToHttp(
+    method: RequestMethod::GET,
+    pattern: '/observable_task_table',
+)]
+#[Singleton(collection: SingletonCollection::HttpResponder)]
+final readonly class ObservableTasksDashboard extends HttpResponder
+{
+    public function __construct(
+        private ObservableTaskTable $observableTaskTable,
+    ) {}
+
+    public function respond(ServerRequestInterface $request, ResponseInterface $response): HttpInterceptableInterface
+    {
+        return new TwigTemplate($request, $response, 'observable_task_table.twig',[
+            'observableTaskTable' => $this->observableTaskTable,
+        ]);
+    }
+}
+```
+
+```twig file:app/views/observable_task_table.twig
+<table>
+    <thead>
+        <tr>
+            <th>slot</th>
+            <th>status</th>
+            <th>category</th>
+            <th>name</th>
+            <th>last update</th>
+        </tr>
+    </thead>
+    <tbody>
+        {% for slotId, observableTask in observableTaskTable %}
+            <tr>
+                <td>{{ slotId }}</td>
+                <td>{{ observableTask.observableTaskStatusUpdate.status.value }}</td>
+                <td>{{ observableTask.category }}</td>
+                <td>{{ observableTask.name }}</td>
+                <td>{{ observableTask.modifiedAt|intl_format_date(request) }}</td>
+            </tr>
+        {% endfor %}
+    </tbody>
+</table>
+```
diff --git a/src/HttpResponder/ObservableTasksDashboard.php b/src/HttpResponder/ObservableTasksDashboard.php
deleted file mode 100644
index 1b91fc40..00000000
--- a/src/HttpResponder/ObservableTasksDashboard.php
+++ /dev/null
@@ -1,33 +0,0 @@
-<?php
-
-declare(strict_types=1);
-
-namespace Distantmagic\Resonance\HttpResponder;
-
-use Distantmagic\Resonance\Attribute\Singleton;
-use Distantmagic\Resonance\HttpInterceptableInterface;
-use Distantmagic\Resonance\HttpResponder;
-use Distantmagic\Resonance\ObservableTaskTable;
-use Distantmagic\Resonance\TwigTemplate;
-use Psr\Http\Message\ResponseInterface;
-use Psr\Http\Message\ServerRequestInterface;
-
-#[Singleton]
-readonly class ObservableTasksDashboard extends HttpResponder
-{
-    public function __construct(
-        private ObservableTaskTable $observableTaskTable,
-    ) {}
-
-    public function respond(ServerRequestInterface $request, ResponseInterface $response): HttpInterceptableInterface
-    {
-        return new TwigTemplate(
-            $request,
-            $response,
-            '@resonance/observable_tasks_dashboard.twig',
-            [
-                'observableTaskTable' => $this->observableTaskTable,
-            ]
-        );
-    }
-}
diff --git a/src/ObservableTaskTable.php b/src/ObservableTaskTable.php
index d59d1909..5319b5a3 100644
--- a/src/ObservableTaskTable.php
+++ b/src/ObservableTaskTable.php
@@ -6,7 +6,6 @@ namespace Distantmagic\Resonance;
 
 use DateTimeImmutable;
 use Distantmagic\Resonance\Attribute\Singleton;
-use Ds\Set;
 use Generator;
 use IteratorAggregate;
 use RuntimeException;
@@ -19,11 +18,6 @@ use Swoole\Table;
 #[Singleton]
 readonly class ObservableTaskTable implements IteratorAggregate
 {
-    /**
-     * @var Set<callable(ObservableTaskSlotStatusUpdate):bool>
-     */
-    public Set $observers;
-
     private SwooleTableAvailableRowsPool $availableRowsPool;
     private string $serializedPendingStatus;
     private Table $table;
@@ -33,7 +27,6 @@ readonly class ObservableTaskTable implements IteratorAggregate
         private SerializerInterface $serializer,
     ) {
         $this->availableRowsPool = new SwooleTableAvailableRowsPool($observableTaskConfiguration->maxTasks);
-        $this->observers = new Set();
         $this->serializedPendingStatus = $serializer->serialize(
             new ObservableTaskStatusUpdate(ObservableTaskStatus::Pending, null)
         );
@@ -113,22 +106,6 @@ readonly class ObservableTaskTable implements IteratorAggregate
                     throw new RuntimeException('Unable to update a slot status.');
                 }
 
-                if (!$this->observers->isEmpty()) {
-                    $slotStatusUpdate = new ObservableTaskSlotStatusUpdate($slotId, $statusUpdate);
-
-                    foreach ($this->observers as $observer) {
-                        if (!is_callable($observer)) {
-                            throw new RuntimeException('Observer is not callable');
-                        }
-
-                        SwooleCoroutineHelper::mustGo(function () use ($observer, $slotStatusUpdate) {
-                            if (false === $observer($slotStatusUpdate)) {
-                                $this->observers->remove($observer);
-                            }
-                        });
-                    }
-                }
-
                 if ($statusUpdate->status->isFinal()) {
                     break;
                 }
diff --git a/src/ObservableTaskTableTest.php b/src/ObservableTaskTableTest.php
index f0810c55..06e9cf96 100644
--- a/src/ObservableTaskTableTest.php
+++ b/src/ObservableTaskTableTest.php
@@ -36,27 +36,6 @@ final class ObservableTaskTableTest extends TestCase
         Event::wait();
     }
 
-    public function test_channel_is_observed(): void
-    {
-        self::assertNotNull($this->observableTaskTable);
-
-        $this->observableTaskTable->observers->add(static function (ObservableTaskSlotStatusUpdate $status): bool {
-            return ObservableTaskStatus::Finished === $status->observableTaskStatusUpdate->status;
-        });
-
-        $this->observableTaskTable->observe(new ObservableTask(static function () {
-            yield new ObservableTaskStatusUpdate(
-                ObservableTaskStatus::Running,
-                'test1',
-            );
-
-            yield new ObservableTaskStatusUpdate(
-                ObservableTaskStatus::Finished,
-                'test2',
-            );
-        }));
-    }
-
     public function test_task_is_observed(): void
     {
         self::assertNotNull($this->observableTaskTable);
diff --git a/src/SiteAction.php b/src/SiteAction.php
index b076b752..d6b0ebce 100644
--- a/src/SiteAction.php
+++ b/src/SiteAction.php
@@ -11,4 +11,5 @@ enum SiteAction implements SiteActionInterface
     case StartWebSocketJsonRPCConnection;
     case UseGraphQL;
     case UseOAuth2;
+    case ViewObservableTaskTable;
 }
diff --git a/src/WebSocketJsonRPCResponder/LlamaCppSubjectActionPromptResponder.php b/src/WebSocketJsonRPCResponder/LlamaCppSubjectActionPromptResponder.php
index 862c3d4b..9a696af8 100644
--- a/src/WebSocketJsonRPCResponder/LlamaCppSubjectActionPromptResponder.php
+++ b/src/WebSocketJsonRPCResponder/LlamaCppSubjectActionPromptResponder.php
@@ -105,7 +105,7 @@ abstract readonly class LlamaCppSubjectActionPromptResponder extends WebSocketJs
                         yield new ObservableTaskStatusUpdate(ObservableTaskStatus::Finished, null);
                     }
                 },
-                inactivityTimeout: 3.0,
+                inactivityTimeout: 5.0,
             ),
             name: 'websocket_jsonrpc_response',
             category: 'llama_cpp',
diff --git a/src/WebSocketJsonRPCResponder/ObservableTasksTableUpdateResponder.php b/src/WebSocketJsonRPCResponder/ObservableTasksTableUpdateResponder.php
deleted file mode 100644
index 5d2002c1..00000000
--- a/src/WebSocketJsonRPCResponder/ObservableTasksTableUpdateResponder.php
+++ /dev/null
@@ -1,54 +0,0 @@
-<?php
-
-declare(strict_types=1);
-
-namespace Distantmagic\Resonance\WebSocketJsonRPCResponder;
-
-use Distantmagic\Resonance\Constraint;
-use Distantmagic\Resonance\Constraint\ObjectConstraint;
-use Distantmagic\Resonance\JsonRPCRequest;
-use Distantmagic\Resonance\JsonRPCResponse;
-use Distantmagic\Resonance\ObservableTaskSlotStatusUpdate;
-use Distantmagic\Resonance\ObservableTaskTable;
-use Distantmagic\Resonance\WebSocketAuthResolution;
-use Distantmagic\Resonance\WebSocketConnection;
-use Distantmagic\Resonance\WebSocketJsonRPCResponder;
-
-/**
- * @template TPayload
- *
- * @template-extends WebSocketJsonRPCResponder<TPayload>
- */
-readonly class ObservableTasksTableUpdateResponder extends WebSocketJsonRPCResponder
-{
-    public function __construct(
-        private ObservableTaskTable $observableTaskTable,
-    ) {}
-
-    public function getConstraint(): Constraint
-    {
-        return new ObjectConstraint();
-    }
-
-    public function onRequest(
-        WebSocketAuthResolution $webSocketAuthResolution,
-        WebSocketConnection $webSocketConnection,
-        JsonRPCRequest $rpcRequest,
-    ): void {
-        $this->observableTaskTable->observers->add(
-            static function (ObservableTaskSlotStatusUpdate $observableTaskSlotStatusUpdate) use (
-                $rpcRequest,
-                $webSocketConnection,
-            ): bool {
-                if (!$webSocketConnection->status->isOpen()) {
-                    return false;
-                }
-
-                return $webSocketConnection->push(new JsonRPCResponse(
-                    rpcRequest: $rpcRequest,
-                    content: $observableTaskSlotStatusUpdate,
-                ));
-            }
-        );
-    }
-}
diff --git a/src/views/observable_tasks_dashboard.twig b/src/views/observable_tasks_dashboard.twig
deleted file mode 100644
index 0d85c4c3..00000000
--- a/src/views/observable_tasks_dashboard.twig
+++ /dev/null
@@ -1,32 +0,0 @@
-<!DOCTYPE html>
-<html lang="en">
-<head>
-    <meta charset="utf-8">
-    <meta name="viewport" content="width=device-width, initial-scale=1">
-    <title></title>
-</head>
-<body>
-    <table>
-        <thead>
-            <tr>
-                <th>slot</th>
-                <th>status</th>
-                <th>category</th>
-                <th>name</th>
-                <th>last update</th>
-            </tr>
-        </thead>
-        <tbody>
-            {% for slotId, observableTask in observableTaskTable %}
-                <tr>
-                    <td>{{ slotId }}</td>
-                    <td>{{ observableTask.observableTaskStatusUpdate.status.value }}</td>
-                    <td>{{ observableTask.category }}</td>
-                    <td>{{ observableTask.name }}</td>
-                    <td>{{ observableTask.modifiedAt|intl_format_date(request) }}</td>
-                </tr>
-            {% endfor %}
-        </tbody>
-    </table>
-</body>
-</html>
-- 
GitLab