From aad32db5e375ecca97f015ed77242f2e3a7e4c8e Mon Sep 17 00:00:00 2001 From: Timothy Carambat <rambat1010@gmail.com> Date: Fri, 16 Feb 2024 16:32:25 -0800 Subject: [PATCH] Migrate document processor to class (#735) * Migrate document processor to class * forgot "new" --- server/endpoints/api/document/index.js | 144 ++++++++++++------------ server/endpoints/extensions/index.js | 37 +++--- server/endpoints/system.js | 9 +- server/endpoints/workspaces.js | 20 ++-- server/swagger/openapi.json | 104 ++++++++--------- server/utils/collectorApi/index.js | 117 +++++++++++++++++++ server/utils/files/documentProcessor.js | 110 ------------------ 7 files changed, 272 insertions(+), 269 deletions(-) create mode 100644 server/utils/collectorApi/index.js delete mode 100644 server/utils/files/documentProcessor.js diff --git a/server/endpoints/api/document/index.js b/server/endpoints/api/document/index.js index c210fff4a..b9e8ecee4 100644 --- a/server/endpoints/api/document/index.js +++ b/server/endpoints/api/document/index.js @@ -1,19 +1,13 @@ const { Telemetry } = require("../../../models/telemetry"); const { validApiKey } = require("../../../utils/middleware/validApiKey"); const { setupMulter } = require("../../../utils/files/multer"); -const { - checkProcessorAlive, - acceptedFileTypes, - processDocument, - processLink, - processRawText, -} = require("../../../utils/files/documentProcessor"); const { viewLocalFiles, findDocumentInDocuments, } = require("../../../utils/files"); const { reqBody } = require("../../../utils/http"); const { EventLogs } = require("../../../models/eventLogs"); +const { CollectorApi } = require("../../../utils/collectorApi"); const { handleUploads } = setupMulter(); function apiDocumentEndpoints(app) { @@ -80,8 +74,9 @@ function apiDocumentEndpoints(app) { } */ try { + const Collector = new CollectorApi(); const { originalname } = request.file; - const processingOnline = await checkProcessorAlive(); + const processingOnline = await Collector.online(); if (!processingOnline) { response @@ -95,7 +90,7 @@ function apiDocumentEndpoints(app) { } const { success, reason, documents } = - await processDocument(originalname); + await Collector.processDocument(originalname); if (!success) { response .status(500) @@ -104,7 +99,7 @@ function apiDocumentEndpoints(app) { return; } - console.log( + Collector.log( `Document ${originalname} uploaded processed and successfully. It is now available in documents.` ); await Telemetry.sendTelemetry("document_uploaded"); @@ -177,8 +172,9 @@ function apiDocumentEndpoints(app) { } */ try { + const Collector = new CollectorApi(); const { link } = reqBody(request); - const processingOnline = await checkProcessorAlive(); + const processingOnline = await Collector.online(); if (!processingOnline) { response @@ -191,7 +187,8 @@ function apiDocumentEndpoints(app) { return; } - const { success, reason, documents } = await processLink(link); + const { success, reason, documents } = + await Collector.processLink(link); if (!success) { response .status(500) @@ -200,7 +197,7 @@ function apiDocumentEndpoints(app) { return; } - console.log( + Collector.log( `Link ${link} uploaded processed and successfully. It is now available in documents.` ); await Telemetry.sendTelemetry("link_uploaded"); @@ -278,9 +275,10 @@ function apiDocumentEndpoints(app) { } */ try { + const Collector = new CollectorApi(); const requiredMetadata = ["title"]; const { textContent, metadata = {} } = reqBody(request); - const processingOnline = await checkProcessorAlive(); + const processingOnline = await Collector.online(); if (!processingOnline) { response @@ -322,7 +320,7 @@ function apiDocumentEndpoints(app) { return; } - const { success, reason, documents } = await processRawText( + const { success, reason, documents } = await Collector.processRawText( textContent, metadata ); @@ -334,7 +332,7 @@ function apiDocumentEndpoints(app) { return; } - console.log( + Collector.log( `Document created successfully. It is now available in documents.` ); await Telemetry.sendTelemetry("raw_document_uploaded"); @@ -391,61 +389,6 @@ function apiDocumentEndpoints(app) { } }); - app.get("/v1/document/:docName", [validApiKey], async (request, response) => { - /* - #swagger.tags = ['Documents'] - #swagger.description = 'Get a single document by its unique AnythingLLM document name' - #swagger.parameters['docName'] = { - in: 'path', - description: 'Unique document name to find (name in /documents)', - required: true, - type: 'string' - } - #swagger.responses[200] = { - content: { - "application/json": { - schema: { - type: 'object', - example: { - "localFiles": { - "name": "documents", - "type": "folder", - items: [ - { - "name": "my-stored-document.txt-uuid1234.json", - "type": "file", - "id": "bb07c334-4dab-4419-9462-9d00065a49a1", - "url": "file://my-stored-document.txt", - "title": "my-stored-document.txt", - "cached": false - }, - ] - } - } - } - } - } - } - #swagger.responses[403] = { - schema: { - "$ref": "#/definitions/InvalidAPIKey" - } - } - */ - try { - const { docName } = request.params; - const document = await findDocumentInDocuments(docName); - if (!document) { - response.sendStatus(404).end(); - return; - } - response.status(200).json({ document }); - } catch (e) { - console.log(e.message, e); - response.sendStatus(500).end(); - } - }); - app.get( "/v1/document/accepted-file-types", [validApiKey], @@ -489,7 +432,7 @@ function apiDocumentEndpoints(app) { } */ try { - const types = await acceptedFileTypes(); + const types = await new CollectorApi().acceptedFileTypes(); if (!types) { response.sendStatus(404).end(); return; @@ -552,6 +495,63 @@ function apiDocumentEndpoints(app) { } } ); + + // Be careful and place as last route to prevent override of the other /document/ GET + // endpoints! + app.get("/v1/document/:docName", [validApiKey], async (request, response) => { + /* + #swagger.tags = ['Documents'] + #swagger.description = 'Get a single document by its unique AnythingLLM document name' + #swagger.parameters['docName'] = { + in: 'path', + description: 'Unique document name to find (name in /documents)', + required: true, + type: 'string' + } + #swagger.responses[200] = { + content: { + "application/json": { + schema: { + type: 'object', + example: { + "localFiles": { + "name": "documents", + "type": "folder", + items: [ + { + "name": "my-stored-document.txt-uuid1234.json", + "type": "file", + "id": "bb07c334-4dab-4419-9462-9d00065a49a1", + "url": "file://my-stored-document.txt", + "title": "my-stored-document.txt", + "cached": false + }, + ] + } + } + } + } + } + } + #swagger.responses[403] = { + schema: { + "$ref": "#/definitions/InvalidAPIKey" + } + } + */ + try { + const { docName } = request.params; + const document = await findDocumentInDocuments(docName); + if (!document) { + response.sendStatus(404).end(); + return; + } + response.status(200).json({ document }); + } catch (e) { + console.log(e.message, e); + response.sendStatus(500).end(); + } + }); } module.exports = { apiDocumentEndpoints }; diff --git a/server/endpoints/extensions/index.js b/server/endpoints/extensions/index.js index a2c884a01..bf07ec56c 100644 --- a/server/endpoints/extensions/index.js +++ b/server/endpoints/extensions/index.js @@ -1,7 +1,5 @@ const { Telemetry } = require("../../models/telemetry"); -const { - forwardExtensionRequest, -} = require("../../utils/files/documentProcessor"); +const { CollectorApi } = require("../../utils/collectorApi"); const { flexUserRoleValid, ROLES, @@ -16,11 +14,12 @@ function extensionEndpoints(app) { [validatedRequest, flexUserRoleValid([ROLES.admin, ROLES.manager])], async (request, response) => { try { - const responseFromProcessor = await forwardExtensionRequest({ - endpoint: "/ext/github-repo/branches", - method: "POST", - body: request.body, - }); + const responseFromProcessor = + await new CollectorApi().forwardExtensionRequest({ + endpoint: "/ext/github-repo/branches", + method: "POST", + body: request.body, + }); response.status(200).json(responseFromProcessor); } catch (e) { console.error(e); @@ -34,11 +33,12 @@ function extensionEndpoints(app) { [validatedRequest, flexUserRoleValid([ROLES.admin, ROLES.manager])], async (request, response) => { try { - const responseFromProcessor = await forwardExtensionRequest({ - endpoint: "/ext/github-repo", - method: "POST", - body: request.body, - }); + const responseFromProcessor = + await new CollectorApi().forwardExtensionRequest({ + endpoint: "/ext/github-repo", + method: "POST", + body: request.body, + }); await Telemetry.sendTelemetry("extension_invoked", { type: "github_repo", }); @@ -55,11 +55,12 @@ function extensionEndpoints(app) { [validatedRequest, flexUserRoleValid([ROLES.admin, ROLES.manager])], async (request, response) => { try { - const responseFromProcessor = await forwardExtensionRequest({ - endpoint: "/ext/youtube-transcript", - method: "POST", - body: request.body, - }); + const responseFromProcessor = + await new CollectorApi().forwardExtensionRequest({ + endpoint: "/ext/youtube-transcript", + method: "POST", + body: request.body, + }); await Telemetry.sendTelemetry("extension_invoked", { type: "youtube_transcript", }); diff --git a/server/endpoints/system.js b/server/endpoints/system.js index 50e63ef9e..02d786176 100644 --- a/server/endpoints/system.js +++ b/server/endpoints/system.js @@ -2,10 +2,6 @@ process.env.NODE_ENV === "development" ? require("dotenv").config({ path: `.env.${process.env.NODE_ENV}` }) : require("dotenv").config(); const { viewLocalFiles, normalizePath } = require("../utils/files"); -const { - checkProcessorAlive, - acceptedFileTypes, -} = require("../utils/files/documentProcessor"); const { purgeDocument, purgeFolder } = require("../utils/files/purgeDocument"); const { getVectorDbClass } = require("../utils/helpers"); const { updateENV, dumpENV } = require("../utils/helpers/updateENV"); @@ -49,6 +45,7 @@ const { exportChatsAsType, } = require("../utils/helpers/chat/convertTo"); const { EventLogs } = require("../models/eventLogs"); +const { CollectorApi } = require("../utils/collectorApi"); function systemEndpoints(app) { if (!app) return; @@ -297,7 +294,7 @@ function systemEndpoints(app) { [validatedRequest], async (_, response) => { try { - const online = await checkProcessorAlive(); + const online = await new CollectorApi().online(); response.sendStatus(online ? 200 : 503); } catch (e) { console.log(e.message, e); @@ -311,7 +308,7 @@ function systemEndpoints(app) { [validatedRequest], async (_, response) => { try { - const types = await acceptedFileTypes(); + const types = await new CollectorApi().acceptedFileTypes(); if (!types) { response.sendStatus(404).end(); return; diff --git a/server/endpoints/workspaces.js b/server/endpoints/workspaces.js index 944be1de7..82040272a 100644 --- a/server/endpoints/workspaces.js +++ b/server/endpoints/workspaces.js @@ -5,11 +5,6 @@ const { DocumentVectors } = require("../models/vectors"); const { WorkspaceChats } = require("../models/workspaceChats"); const { getVectorDbClass } = require("../utils/helpers"); const { setupMulter } = require("../utils/files/multer"); -const { - checkProcessorAlive, - processDocument, - processLink, -} = require("../utils/files/documentProcessor"); const { validatedRequest } = require("../utils/middleware/validatedRequest"); const { Telemetry } = require("../models/telemetry"); const { @@ -22,6 +17,7 @@ const { } = require("../models/workspacesSuggestedMessages"); const { validWorkspaceSlug } = require("../utils/middleware/validWorkspace"); const { convertToChatHistory } = require("../utils/helpers/chat/responses"); +const { CollectorApi } = require("../utils/collectorApi"); const { handleUploads } = setupMulter(); function workspaceEndpoints(app) { @@ -98,8 +94,9 @@ function workspaceEndpoints(app) { [validatedRequest, flexUserRoleValid([ROLES.admin, ROLES.manager])], handleUploads.single("file"), async function (request, response) { + const Collector = new CollectorApi(); const { originalname } = request.file; - const processingOnline = await checkProcessorAlive(); + const processingOnline = await Collector.online(); if (!processingOnline) { response @@ -112,13 +109,13 @@ function workspaceEndpoints(app) { return; } - const { success, reason } = await processDocument(originalname); + const { success, reason } = await Collector.processDocument(originalname); if (!success) { response.status(500).json({ success: false, error: reason }).end(); return; } - console.log( + Collector.log( `Document ${originalname} uploaded processed and successfully. It is now available in documents.` ); await Telemetry.sendTelemetry("document_uploaded"); @@ -137,8 +134,9 @@ function workspaceEndpoints(app) { "/workspace/:slug/upload-link", [validatedRequest, flexUserRoleValid([ROLES.admin, ROLES.manager])], async (request, response) => { + const Collector = new CollectorApi(); const { link = "" } = reqBody(request); - const processingOnline = await checkProcessorAlive(); + const processingOnline = await Collector.online(); if (!processingOnline) { response @@ -151,13 +149,13 @@ function workspaceEndpoints(app) { return; } - const { success, reason } = await processLink(link); + const { success, reason } = await Collector.processLink(link); if (!success) { response.status(500).json({ success: false, error: reason }).end(); return; } - console.log( + Collector.log( `Link ${link} uploaded processed and successfully. It is now available in documents.` ); await Telemetry.sendTelemetry("link_uploaded"); diff --git a/server/swagger/openapi.json b/server/swagger/openapi.json index 4554a7aad..b300e3fa7 100644 --- a/server/swagger/openapi.json +++ b/server/swagger/openapi.json @@ -1140,22 +1140,13 @@ } } }, - "/v1/document/{docName}": { + "/v1/document/accepted-file-types": { "get": { "tags": [ "Documents" ], - "description": "Get a single document by its unique AnythingLLM document name", + "description": "Check available filetypes and MIMEs that can be uploaded.", "parameters": [ - { - "name": "docName", - "in": "path", - "required": true, - "schema": { - "type": "string" - }, - "description": "Unique document name to find (name in /documents)" - }, { "name": "Authorization", "in": "header", @@ -1172,18 +1163,22 @@ "schema": { "type": "object", "example": { - "localFiles": { - "name": "documents", - "type": "folder", - "items": [ - { - "name": "my-stored-document.txt-uuid1234.json", - "type": "file", - "id": "bb07c334-4dab-4419-9462-9d00065a49a1", - "url": "file://my-stored-document.txt", - "title": "my-stored-document.txt", - "cached": false - } + "types": { + "application/mbox": [ + ".mbox" + ], + "application/pdf": [ + ".pdf" + ], + "application/vnd.oasis.opendocument.text": [ + ".odt" + ], + "application/vnd.openxmlformats-officedocument.wordprocessingml.document": [ + ".docx" + ], + "text/plain": [ + ".txt", + ".md" ] } } @@ -1215,12 +1210,12 @@ } } }, - "/v1/document/accepted-file-types": { + "/v1/document/metadata-schema": { "get": { "tags": [ "Documents" ], - "description": "Check available filetypes and MIMEs that can be uploaded.", + "description": "Get the known available metadata schema for when doing a raw-text upload and the acceptable type of value for each key.", "parameters": [ { "name": "Authorization", @@ -1238,23 +1233,11 @@ "schema": { "type": "object", "example": { - "types": { - "application/mbox": [ - ".mbox" - ], - "application/pdf": [ - ".pdf" - ], - "application/vnd.oasis.opendocument.text": [ - ".odt" - ], - "application/vnd.openxmlformats-officedocument.wordprocessingml.document": [ - ".docx" - ], - "text/plain": [ - ".txt", - ".md" - ] + "schema": { + "keyOne": "string | number | nullable", + "keyTwo": "string | number | nullable", + "specialKey": "number", + "title": "string" } } } @@ -1276,22 +1259,28 @@ } } }, - "404": { - "description": "Not Found" - }, "500": { "description": "Internal Server Error" } } } }, - "/v1/document/metadata-schema": { + "/v1/document/{docName}": { "get": { "tags": [ "Documents" ], - "description": "Get the known available metadata schema for when doing a raw-text upload and the acceptable type of value for each key.", + "description": "Get a single document by its unique AnythingLLM document name", "parameters": [ + { + "name": "docName", + "in": "path", + "required": true, + "schema": { + "type": "string" + }, + "description": "Unique document name to find (name in /documents)" + }, { "name": "Authorization", "in": "header", @@ -1308,11 +1297,19 @@ "schema": { "type": "object", "example": { - "schema": { - "keyOne": "string | number | nullable", - "keyTwo": "string | number | nullable", - "specialKey": "number", - "title": "string" + "localFiles": { + "name": "documents", + "type": "folder", + "items": [ + { + "name": "my-stored-document.txt-uuid1234.json", + "type": "file", + "id": "bb07c334-4dab-4419-9462-9d00065a49a1", + "url": "file://my-stored-document.txt", + "title": "my-stored-document.txt", + "cached": false + } + ] } } } @@ -1334,6 +1331,9 @@ } } }, + "404": { + "description": "Not Found" + }, "500": { "description": "Internal Server Error" } diff --git a/server/utils/collectorApi/index.js b/server/utils/collectorApi/index.js new file mode 100644 index 000000000..7e8c11493 --- /dev/null +++ b/server/utils/collectorApi/index.js @@ -0,0 +1,117 @@ +// When running locally will occupy the 0.0.0.0 hostname space but when deployed inside +// of docker this endpoint is not exposed so it is only on the Docker instances internal network +// so no additional security is needed on the endpoint directly. Auth is done however by the express +// middleware prior to leaving the node-side of the application so that is good enough >:) + +class CollectorApi { + constructor() { + this.endpoint = "http://0.0.0.0:8888"; + } + + log(text, ...args) { + console.log(`\x1b[36m[CollectorApi]\x1b[0m ${text}`, ...args); + } + + async online() { + return await fetch(this.endpoint) + .then((res) => res.ok) + .catch(() => false); + } + + async acceptedFileTypes() { + return await fetch(`${this.endpoint}/accepts`) + .then((res) => { + if (!res.ok) throw new Error("failed to GET /accepts"); + return res.json(); + }) + .then((res) => res) + .catch((e) => { + this.log(e.message); + return null; + }); + } + + async processDocument(filename = "") { + if (!filename) return false; + return await fetch(`${this.endpoint}/process`, { + method: "POST", + headers: { + "Content-Type": "application/json", + }, + body: JSON.stringify({ filename }), + }) + .then((res) => { + if (!res.ok) throw new Error("Response could not be completed"); + return res.json(); + }) + .then((res) => res) + .catch((e) => { + this.log(e.message); + return { success: false, reason: e.message, documents: [] }; + }); + } + + async processLink(link = "") { + if (!link) return false; + + return await fetch(`${this.endpoint}/process-link`, { + method: "POST", + headers: { + "Content-Type": "application/json", + }, + body: JSON.stringify({ link }), + }) + .then((res) => { + if (!res.ok) throw new Error("Response could not be completed"); + return res.json(); + }) + .then((res) => res) + .catch((e) => { + this.log(e.message); + return { success: false, reason: e.message, documents: [] }; + }); + } + + async processRawText(textContent = "", metadata = {}) { + return await fetch(`${this.endpoint}/process-raw-text`, { + method: "POST", + headers: { + "Content-Type": "application/json", + }, + body: JSON.stringify({ textContent, metadata }), + }) + .then((res) => { + if (!res.ok) throw new Error("Response could not be completed"); + return res.json(); + }) + .then((res) => res) + .catch((e) => { + this.log(e.message); + return { success: false, reason: e.message, documents: [] }; + }); + } + + // We will not ever expose the document processor to the frontend API so instead we relay + // all requests through the server. You can use this function to directly expose a specific endpoint + // on the document processor. + async forwardExtensionRequest({ endpoint, method, body }) { + return await fetch(`${this.endpoint}${endpoint}`, { + method, + body, // Stringified JSON! + headers: { + "Content-Type": "application/json", + }, + }) + .then((res) => { + if (!res.ok) throw new Error("Response could not be completed"); + return res.json(); + }) + .then((res) => res) + .catch((e) => { + this.log(e.message); + return { success: false, data: {}, reason: e.message }; + }); + } +} + +module.exports.CollectorApi = CollectorApi; diff --git a/server/utils/files/documentProcessor.js b/server/utils/files/documentProcessor.js deleted file mode 100644 index ef8eba17c..000000000 --- a/server/utils/files/documentProcessor.js +++ /dev/null @@ -1,110 +0,0 @@ -// When running locally will occupy the 0.0.0.0 hostname space but when deployed inside -// of docker this endpoint is not exposed so it is only on the Docker instances internal network -// so no additional security is needed on the endpoint directly. Auth is done however by the express -// middleware prior to leaving the node-side of the application so that is good enough >:) -const PROCESSOR_API = "http://0.0.0.0:8888"; -async function checkProcessorAlive() { - return await fetch(`${PROCESSOR_API}`) - .then((res) => res.ok) - .catch((e) => false); -} - -async function acceptedFileTypes() { - return await fetch(`${PROCESSOR_API}/accepts`) - .then((res) => { - if (!res.ok) throw new Error("Could not reach"); - return res.json(); - }) - .then((res) => res) - .catch(() => null); -} - -async function processDocument(filename = "") { - if (!filename) return false; - return await fetch(`${PROCESSOR_API}/process`, { - method: "POST", - headers: { - "Content-Type": "application/json", - }, - body: JSON.stringify({ filename }), - }) - .then((res) => { - if (!res.ok) throw new Error("Response could not be completed"); - return res.json(); - }) - .then((res) => res) - .catch((e) => { - console.log(e.message); - return { success: false, reason: e.message, documents: [] }; - }); -} - -async function processLink(link = "") { - if (!link) return false; - return await fetch(`${PROCESSOR_API}/process-link`, { - method: "POST", - headers: { - "Content-Type": "application/json", - }, - body: JSON.stringify({ link }), - }) - .then((res) => { - if (!res.ok) throw new Error("Response could not be completed"); - return res.json(); - }) - .then((res) => res) - .catch((e) => { - console.log(e.message); - return { success: false, reason: e.message, documents: [] }; - }); -} - -async function processRawText(textContent = "", metadata = {}) { - return await fetch(`${PROCESSOR_API}/process-raw-text`, { - method: "POST", - headers: { - "Content-Type": "application/json", - }, - body: JSON.stringify({ textContent, metadata }), - }) - .then((res) => { - if (!res.ok) throw new Error("Response could not be completed"); - return res.json(); - }) - .then((res) => res) - .catch((e) => { - console.log(e.message); - return { success: false, reason: e.message, documents: [] }; - }); -} - -// We will not ever expose the document processor to the frontend API so instead we relay -// all requests through the server. You can use this function to directly expose a specific endpoint -// on the document processor. -async function forwardExtensionRequest({ endpoint, method, body }) { - return await fetch(`${PROCESSOR_API}${endpoint}`, { - method, - body, // Stringified JSON! - headers: { - "Content-Type": "application/json", - }, - }) - .then((res) => { - if (!res.ok) throw new Error("Response could not be completed"); - return res.json(); - }) - .then((res) => res) - .catch((e) => { - console.log(e.message); - return { success: false, data: {}, reason: e.message }; - }); -} - -module.exports = { - checkProcessorAlive, - processDocument, - processLink, - processRawText, - acceptedFileTypes, - forwardExtensionRequest, -}; -- GitLab