From 655c5188a4606cda9dadf00094bc4d2ccde52904 Mon Sep 17 00:00:00 2001 From: Trent Larson Date: Mon, 26 May 2025 16:28:33 -0600 Subject: [PATCH] add queueing to the DB service so that requests get processed in order --- src/services/AbsurdSqlDatabaseService.ts | 82 ++++++++++++++++++++++-- 1 file changed, 76 insertions(+), 6 deletions(-) diff --git a/src/services/AbsurdSqlDatabaseService.ts b/src/services/AbsurdSqlDatabaseService.ts index 3db18900..89add11d 100644 --- a/src/services/AbsurdSqlDatabaseService.ts +++ b/src/services/AbsurdSqlDatabaseService.ts @@ -19,11 +19,21 @@ interface AbsurdSqlDatabase { ) => Promise<{ changes: number; lastId?: number }>; } +interface QueuedOperation { + type: 'run' | 'query' | 'getOneRow' | 'getAll'; + sql: string; + params: unknown[]; + resolve: (value: any) => void; + reject: (reason: any) => void; +} + class AbsurdSqlDatabaseService implements DatabaseService { private static instance: AbsurdSqlDatabaseService | null = null; private db: AbsurdSqlDatabase | null; private initialized: boolean; private initializationPromise: Promise | null = null; + private operationQueue: QueuedOperation[] = []; + private isProcessingQueue: boolean = false; private constructor() { this.db = null; @@ -101,6 +111,68 @@ class AbsurdSqlDatabaseService implements DatabaseService { await runMigrations(sqlExec); this.initialized = true; + + // Start processing the queue after initialization + this.processQueue(); + } + + private async processQueue(): Promise { + if (this.isProcessingQueue || !this.initialized || !this.db) { + return; + } + + this.isProcessingQueue = true; + + while (this.operationQueue.length > 0) { + const operation = this.operationQueue.shift(); + if (!operation) continue; + + try { + let result; + switch (operation.type) { + case 'run': + result = await this.db.run(operation.sql, operation.params); + break; + case 'query': + result = await this.db.exec(operation.sql, operation.params); + break; + case 'getOneRow': + const queryResult = await this.db.exec(operation.sql, operation.params); + result = queryResult[0]?.values[0]; + break; + case 'getAll': + const allResult = await this.db.exec(operation.sql, operation.params); + result = allResult[0]?.values || []; + break; + } + operation.resolve(result); + } catch (error) { + operation.reject(error); + } + } + + this.isProcessingQueue = false; + } + + private async queueOperation( + type: QueuedOperation['type'], + sql: string, + params: unknown[] = [], + ): Promise { + return new Promise((resolve, reject) => { + this.operationQueue.push({ + type, + sql, + params, + resolve, + reject, + }); + + // If we're already initialized, start processing the queue + if (this.initialized && this.db) { + this.processQueue(); + } + }); } private async waitForInitialization(): Promise { @@ -133,13 +205,13 @@ class AbsurdSqlDatabaseService implements DatabaseService { params: unknown[] = [], ): Promise<{ changes: number; lastId?: number }> { await this.waitForInitialization(); - return this.db!.run(sql, params); + return this.queueOperation<{ changes: number; lastId?: number }>('run', sql, params); } // Note that the resulting array may be empty if there are no results from the query async query(sql: string, params: unknown[] = []): Promise { await this.waitForInitialization(); - return this.db!.exec(sql, params); + return this.queueOperation('query', sql, params); } async getOneRow( @@ -147,14 +219,12 @@ class AbsurdSqlDatabaseService implements DatabaseService { params: unknown[] = [], ): Promise { await this.waitForInitialization(); - const result = await this.db!.exec(sql, params); - return result[0]?.values[0]; + return this.queueOperation('getOneRow', sql, params); } async getAll(sql: string, params: unknown[] = []): Promise { await this.waitForInitialization(); - const result = await this.db!.exec(sql, params); - return result[0]?.values || []; + return this.queueOperation('getAll', sql, params); } }