add queueing to the DB service so that requests get processed in order
This commit is contained in:
@@ -19,11 +19,21 @@ interface AbsurdSqlDatabase {
|
||||
) => Promise<{ changes: number; lastId?: number }>;
|
||||
}
|
||||
|
||||
interface QueuedOperation {
|
||||
type: 'run' | 'query' | 'getOneRow' | 'getAll';
|
||||
sql: string;
|
||||
params: unknown[];
|
||||
resolve: (value: any) => void;
|
||||
reject: (reason: any) => void;
|
||||
}
|
||||
|
||||
class AbsurdSqlDatabaseService implements DatabaseService {
|
||||
private static instance: AbsurdSqlDatabaseService | null = null;
|
||||
private db: AbsurdSqlDatabase | null;
|
||||
private initialized: boolean;
|
||||
private initializationPromise: Promise<void> | null = null;
|
||||
private operationQueue: QueuedOperation[] = [];
|
||||
private isProcessingQueue: boolean = false;
|
||||
|
||||
private constructor() {
|
||||
this.db = null;
|
||||
@@ -101,6 +111,68 @@ class AbsurdSqlDatabaseService implements DatabaseService {
|
||||
await runMigrations(sqlExec);
|
||||
|
||||
this.initialized = true;
|
||||
|
||||
// Start processing the queue after initialization
|
||||
this.processQueue();
|
||||
}
|
||||
|
||||
private async processQueue(): Promise<void> {
|
||||
if (this.isProcessingQueue || !this.initialized || !this.db) {
|
||||
return;
|
||||
}
|
||||
|
||||
this.isProcessingQueue = true;
|
||||
|
||||
while (this.operationQueue.length > 0) {
|
||||
const operation = this.operationQueue.shift();
|
||||
if (!operation) continue;
|
||||
|
||||
try {
|
||||
let result;
|
||||
switch (operation.type) {
|
||||
case 'run':
|
||||
result = await this.db.run(operation.sql, operation.params);
|
||||
break;
|
||||
case 'query':
|
||||
result = await this.db.exec(operation.sql, operation.params);
|
||||
break;
|
||||
case 'getOneRow':
|
||||
const queryResult = await this.db.exec(operation.sql, operation.params);
|
||||
result = queryResult[0]?.values[0];
|
||||
break;
|
||||
case 'getAll':
|
||||
const allResult = await this.db.exec(operation.sql, operation.params);
|
||||
result = allResult[0]?.values || [];
|
||||
break;
|
||||
}
|
||||
operation.resolve(result);
|
||||
} catch (error) {
|
||||
operation.reject(error);
|
||||
}
|
||||
}
|
||||
|
||||
this.isProcessingQueue = false;
|
||||
}
|
||||
|
||||
private async queueOperation<T>(
|
||||
type: QueuedOperation['type'],
|
||||
sql: string,
|
||||
params: unknown[] = [],
|
||||
): Promise<T> {
|
||||
return new Promise((resolve, reject) => {
|
||||
this.operationQueue.push({
|
||||
type,
|
||||
sql,
|
||||
params,
|
||||
resolve,
|
||||
reject,
|
||||
});
|
||||
|
||||
// If we're already initialized, start processing the queue
|
||||
if (this.initialized && this.db) {
|
||||
this.processQueue();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private async waitForInitialization(): Promise<void> {
|
||||
@@ -133,13 +205,13 @@ class AbsurdSqlDatabaseService implements DatabaseService {
|
||||
params: unknown[] = [],
|
||||
): Promise<{ changes: number; lastId?: number }> {
|
||||
await this.waitForInitialization();
|
||||
return this.db!.run(sql, params);
|
||||
return this.queueOperation<{ changes: number; lastId?: number }>('run', sql, params);
|
||||
}
|
||||
|
||||
// Note that the resulting array may be empty if there are no results from the query
|
||||
async query(sql: string, params: unknown[] = []): Promise<QueryExecResult[]> {
|
||||
await this.waitForInitialization();
|
||||
return this.db!.exec(sql, params);
|
||||
return this.queueOperation<QueryExecResult[]>('query', sql, params);
|
||||
}
|
||||
|
||||
async getOneRow(
|
||||
@@ -147,14 +219,12 @@ class AbsurdSqlDatabaseService implements DatabaseService {
|
||||
params: unknown[] = [],
|
||||
): Promise<unknown[] | undefined> {
|
||||
await this.waitForInitialization();
|
||||
const result = await this.db!.exec(sql, params);
|
||||
return result[0]?.values[0];
|
||||
return this.queueOperation<unknown[] | undefined>('getOneRow', sql, params);
|
||||
}
|
||||
|
||||
async getAll(sql: string, params: unknown[] = []): Promise<unknown[][]> {
|
||||
await this.waitForInitialization();
|
||||
const result = await this.db!.exec(sql, params);
|
||||
return result[0]?.values || [];
|
||||
return this.queueOperation<unknown[][]>('getAll', sql, params);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user