Browse Source

add queueing to the DB service so that requests get processed in order

pull/137/head
Trent Larson 2 weeks ago
parent
commit
655c5188a4
  1. 82
      src/services/AbsurdSqlDatabaseService.ts

82
src/services/AbsurdSqlDatabaseService.ts

@ -19,11 +19,21 @@ interface AbsurdSqlDatabase {
) => Promise<{ changes: number; lastId?: number }>;
}
interface QueuedOperation {
type: 'run' | 'query' | 'getOneRow' | 'getAll';
sql: string;
params: unknown[];
resolve: (value: any) => void;
reject: (reason: any) => void;
}
class AbsurdSqlDatabaseService implements DatabaseService {
private static instance: AbsurdSqlDatabaseService | null = null;
private db: AbsurdSqlDatabase | null;
private initialized: boolean;
private initializationPromise: Promise<void> | null = null;
private operationQueue: QueuedOperation[] = [];
private isProcessingQueue: boolean = false;
private constructor() {
this.db = null;
@ -101,6 +111,68 @@ class AbsurdSqlDatabaseService implements DatabaseService {
await runMigrations(sqlExec);
this.initialized = true;
// Start processing the queue after initialization
this.processQueue();
}
private async processQueue(): Promise<void> {
if (this.isProcessingQueue || !this.initialized || !this.db) {
return;
}
this.isProcessingQueue = true;
while (this.operationQueue.length > 0) {
const operation = this.operationQueue.shift();
if (!operation) continue;
try {
let result;
switch (operation.type) {
case 'run':
result = await this.db.run(operation.sql, operation.params);
break;
case 'query':
result = await this.db.exec(operation.sql, operation.params);
break;
case 'getOneRow':
const queryResult = await this.db.exec(operation.sql, operation.params);
result = queryResult[0]?.values[0];
break;
case 'getAll':
const allResult = await this.db.exec(operation.sql, operation.params);
result = allResult[0]?.values || [];
break;
}
operation.resolve(result);
} catch (error) {
operation.reject(error);
}
}
this.isProcessingQueue = false;
}
private async queueOperation<T>(
type: QueuedOperation['type'],
sql: string,
params: unknown[] = [],
): Promise<T> {
return new Promise((resolve, reject) => {
this.operationQueue.push({
type,
sql,
params,
resolve,
reject,
});
// If we're already initialized, start processing the queue
if (this.initialized && this.db) {
this.processQueue();
}
});
}
private async waitForInitialization(): Promise<void> {
@ -133,13 +205,13 @@ class AbsurdSqlDatabaseService implements DatabaseService {
params: unknown[] = [],
): Promise<{ changes: number; lastId?: number }> {
await this.waitForInitialization();
return this.db!.run(sql, params);
return this.queueOperation<{ changes: number; lastId?: number }>('run', sql, params);
}
// Note that the resulting array may be empty if there are no results from the query
async query(sql: string, params: unknown[] = []): Promise<QueryExecResult[]> {
await this.waitForInitialization();
return this.db!.exec(sql, params);
return this.queueOperation<QueryExecResult[]>('query', sql, params);
}
async getOneRow(
@ -147,14 +219,12 @@ class AbsurdSqlDatabaseService implements DatabaseService {
params: unknown[] = [],
): Promise<unknown[] | undefined> {
await this.waitForInitialization();
const result = await this.db!.exec(sql, params);
return result[0]?.values[0];
return this.queueOperation<unknown[] | undefined>('getOneRow', sql, params);
}
async getAll(sql: string, params: unknown[] = []): Promise<unknown[][]> {
await this.waitForInitialization();
const result = await this.db!.exec(sql, params);
return result[0]?.values || [];
return this.queueOperation<unknown[][]>('getAll', sql, params);
}
}

Loading…
Cancel
Save