refactor: extract shared operation queue to reduce duplication

Extract queue management logic into reusable OperationQueue utility
shared between CapacitorPlatformService and AbsurdSqlDatabaseService.

Changes:
- Create OperationQueue.ts: shared queue handler with executor pattern
- Create types.ts: extract QueuedOperation interface to shared location
- Refactor CapacitorPlatformService: use shared queue, remove 65 lines
- Refactor AbsurdSqlDatabaseService: use shared queue, remove 22 lines
- Remove redundant logging wrapper: use centralized logger directly

Benefits:
- DRY: eliminated ~87 lines of duplicated queue logic
- Maintainability: queue behavior centralized in one place
- Consistency: both services use identical queue processing
- Flexibility: platform-specific parameter conversion preserved

File size reduction:
- CapacitorPlatformService: 1,525 → 1,465 lines (-60 lines)
- AbsurdSqlDatabaseService: 271 → 249 lines (-22 lines)
- New shared code: 150 lines (OperationQueue + types)

Net reduction: ~87 lines of duplicated code eliminated.
This commit is contained in:
Matthew Raymer
2025-11-09 06:27:46 +00:00
parent ee57fe9ea6
commit 523f88fd0d
4 changed files with 323 additions and 723 deletions

View File

@@ -21,14 +21,8 @@ import IndexedDBBackend from "absurd-sql/dist/indexeddb-backend";
import { runMigrations } from "../db-sql/migration";
import type { DatabaseService, QueryExecResult } from "../interfaces/database";
import { logger } from "@/utils/logger";
interface QueuedOperation {
type: "run" | "query";
sql: string;
params: unknown[];
resolve: (value: unknown) => void;
reject: (reason: unknown) => void;
}
import { OperationQueue, QueueExecutor } from "./platforms/OperationQueue";
import { QueuedOperation } from "./platforms/types";
interface AbsurdSqlDatabase {
exec: (sql: string, params?: unknown[]) => Promise<QueryExecResult[]>;
@@ -43,8 +37,7 @@ class AbsurdSqlDatabaseService implements DatabaseService {
private db: AbsurdSqlDatabase | null;
private initialized: boolean;
private initializationPromise: Promise<void> | null = null;
private operationQueue: Array<QueuedOperation> = [];
private isProcessingQueue: boolean = false;
private operationQueue = new OperationQueue<AbsurdSqlDatabase>();
private constructor() {
this.db = null;
@@ -162,41 +155,24 @@ class AbsurdSqlDatabaseService implements DatabaseService {
}
private async processQueue(): Promise<void> {
if (this.isProcessingQueue || !this.initialized || !this.db) {
if (!this.initialized || !this.db) {
return;
}
this.isProcessingQueue = true;
const executor: QueueExecutor<AbsurdSqlDatabase> = {
executeRun: async (db, sql, params) => {
return await db.run(sql, params);
},
executeQuery: async (db, sql, params) => {
return await db.exec(sql, params);
},
};
while (this.operationQueue.length > 0) {
const operation = this.operationQueue.shift();
if (!operation) continue;
try {
let result: unknown;
switch (operation.type) {
case "run":
result = await this.db.run(operation.sql, operation.params);
break;
case "query":
result = await this.db.exec(operation.sql, operation.params);
break;
}
operation.resolve(result);
} catch (error) {
logger.error(
"Error while processing SQL queue:",
error,
" ... for sql:",
operation.sql,
" ... with params:",
operation.params,
await this.operationQueue.processQueue(
this.db,
executor,
"AbsurdSqlDatabaseService",
);
operation.reject(error);
}
}
this.isProcessingQueue = false;
}
private async queueOperation<R>(
@@ -204,21 +180,24 @@ class AbsurdSqlDatabaseService implements DatabaseService {
sql: string,
params: unknown[] = [],
): Promise<R> {
return new Promise<R>((resolve, reject) => {
const operation: QueuedOperation = {
type,
sql,
params,
resolve: (value: unknown) => resolve(value as R),
reject,
resolve: (_value: unknown) => {
// No-op, will be wrapped by OperationQueue
},
reject: () => {
// No-op, will be wrapped by OperationQueue
},
};
this.operationQueue.push(operation);
// If we're already initialized, start processing the queue
if (this.initialized && this.db) {
this.processQueue();
}
});
return this.operationQueue.queueOperation<R>(
operation,
this.initialized,
this.db,
() => this.processQueue(),
);
}
private async waitForInitialization(): Promise<void> {

View File

@@ -23,14 +23,8 @@ import {
} from "../PlatformService";
import { logger } from "../../utils/logger";
import { BaseDatabaseService } from "./BaseDatabaseService";
interface QueuedOperation {
type: "run" | "query" | "rawQuery";
sql: string;
params: unknown[];
resolve: (value: unknown) => void;
reject: (reason: unknown) => void;
}
import { QueuedOperation } from "./types";
import { OperationQueue, QueueExecutor } from "./OperationQueue";
/**
* Platform service implementation for Capacitor (mobile) platform.
@@ -52,22 +46,61 @@ export class CapacitorPlatformService
private dbName = "timesafari.sqlite";
private initialized = false;
private initializationPromise: Promise<void> | null = null;
private operationQueue: Array<QueuedOperation> = [];
private isProcessingQueue: boolean = false;
// Metrics counters (v3.2)
private static initSuccessCount = 0;
private static initFailCount = 0;
private static connRetrievedCount = 0;
private static connRecreatedCount = 0;
private static migrationsAppliedCount = 0;
private static migrationsSkippedCount = 0;
private operationQueue = new OperationQueue<SQLiteDBConnection>();
constructor() {
super();
this.sqlite = new SQLiteConnection(CapacitorSQLite);
}
// ============================================================================
// Core Helper Functions (v3.5 - Minimal set)
// ============================================================================
/**
* Phase execution wrapper - logs once, throws on failure
*/
private async runPhase<T>(name: string, fn: () => Promise<T>): Promise<T> {
try {
const out = await fn();
logger.log(`[CapacitorPlatformService] ${name} ok`);
return out;
} catch (e) {
logger.error(`[CapacitorPlatformService] ${name} failed`, e);
throw e;
}
}
/**
* Apply SQLite PRAGMAs - single vetted sequence
*/
private async applyPragmas(db: SQLiteDBConnection): Promise<void> {
const platform = Capacitor.getPlatform();
if (platform !== "web") {
try {
await db.query("PRAGMA journal_mode = WAL;");
await db.query("PRAGMA synchronous = NORMAL;");
} catch {
// Non-fatal if WAL/synchronous fail
}
}
try {
await db.run("PRAGMA foreign_keys = ON;", []);
} catch {
// Ignore errors
}
try {
await db.query("PRAGMA busy_timeout = 5000;");
} catch {
// Fallback to run if query fails
try {
await db.run("PRAGMA busy_timeout = 5000;", []);
} catch {
// Ignore errors
}
}
}
/**
* Public entry point for database initialization (v3.2)
*
@@ -91,9 +124,7 @@ export class CapacitorPlatformService
// Start initialization
this.initializationPromise = this._initialize();
await this.initializationPromise;
CapacitorPlatformService.initSuccessCount++;
} catch (error) {
CapacitorPlatformService.initFailCount++;
this.initializationPromise = null; // Reset on failure for retry
// Best-effort cleanup on failure
@@ -146,93 +177,43 @@ export class CapacitorPlatformService
}
/**
* Internal initialization method (v3.3.3)
* Internal initialization method (v3.5 - Minimal refactor)
*
* Order: PRAGMAs (temp connection, no transactions) → connect → open → migrations (single txn) → verify
*
* ORDERING GUARD (v3.3.3+):
* - PRAGMAs MUST run before any execute/executeSet operations
* - This ensures database-level settings are applied before migrations
* - If ordering changes, add unit test that fails on execute before PRAGMAs
* Order: connect → pragmas → migrate → verify
*
* @private Internal method - use initializeDatabase() instead
*/
private async _initialize(): Promise<void> {
if (this.initialized) {
return;
}
if (this.initialized) return;
if (this.initializationPromise) return this.initializationPromise;
const initStartTime = Date.now();
const phaseTimings: Record<string, number> = {};
try {
// Phase 1: Apply PRAGMAs on temporary connection (MUST be before main connection)
// This ensures PRAGMAs run outside any transaction context
// ORDERING GUARD: No execute/executeSet operations should occur before this phase
logger.debug(
"[CapacitorPlatformService] Starting PRAGMA phase (before main connection)",
this.initializationPromise = (async () => {
const conn = await this.runPhase("connect", async () =>
this._getOrCreateConnection(this.dbName, {
version: 1,
encrypted: false,
mode: "no-encryption",
}),
);
const pragmasStart = Date.now();
const pragmaResult = await this._applyPragmasOnTempConnection();
phaseTimings.pragmas = Date.now() - pragmasStart;
const journalMode = pragmaResult.journalMode;
const pragmaPath = pragmaResult.path;
// Phase 2: Connection (with early consistency check)
const connectStart = Date.now();
await this._ensureConnection();
phaseTimings.connect = Date.now() - connectStart;
await this.runPhase("pragmas", async () => this.applyPragmas(conn));
// Phase 3: Open connection
const openStart = Date.now();
await this._ensureOpen();
phaseTimings.open = Date.now() - openStart;
await this.runPhase("migrate", async () => this.runCapacitorMigrations());
// Phase 4: Run migrations (single transaction)
const migrateStart = Date.now();
await this.runCapacitorMigrations();
phaseTimings.migrate = Date.now() - migrateStart;
// Phase 5: Verify integrity
const verifyStart = Date.now();
await this.verifyDatabaseIntegrity();
phaseTimings.verify = Date.now() - verifyStart;
await this.runPhase("verify", async () => this.verifyDatabaseIntegrity());
this.initialized = true;
const totalTime = Date.now() - initStartTime;
logger.log(
`[CapacitorPlatformService] SQLite database initialized successfully (${totalTime}ms total)`,
);
// Always log phase timings (can be disabled with LOG_DB_TIMINGS=false)
const shouldLogTimings = process.env.LOG_DB_TIMINGS !== "false";
if (shouldLogTimings) {
logger.log(`[CapacitorPlatformService] Init phase timings:`, {
...phaseTimings,
journal_mode: journalMode,
platform: Capacitor.getPlatform(),
pragma_path: pragmaPath,
});
}
// Start processing the queue after initialization
this.processQueue();
} catch (error) {
const totalTime = Date.now() - initStartTime;
logger.error(
`[CapacitorPlatformService] Error initializing SQLite database (${totalTime}ms):`,
error,
);
throw error; // Re-throw for cleanup handling
}
})().catch((e) => {
logger.error("[CapacitorPlatformService] Initialization failed", e);
throw e;
});
return this.initializationPromise;
}
/**
* Get or create connection with desync repair (v3.3.4)
*
* Authoritative retrieval method that repairs native/JS desync
* before creating a new connection.
* Get or create connection with desync repair (v3.5 - Minimal)
*/
private async _getOrCreateConnection(
name: string,
@@ -242,34 +223,40 @@ export class CapacitorPlatformService
mode?: string;
} = {},
): Promise<SQLiteDBConnection> {
logger.log(
`[CapacitorPlatformService] [DB] getOrCreateConnection(${name})`,
);
// Check if we already have an open connection
if (this.db) {
try {
const isOpen = await this.db.isDBOpen();
if (isOpen.result) {
return this.db;
}
} catch {
// Continue to retrieve/create
}
}
try {
const has = await this.sqlite.isConnection(name, false);
if (has.result) {
logger.log(
`[CapacitorPlatformService] [DB] retrieveConnection(${name})`,
);
const conn = await this.sqlite.retrieveConnection(name, false);
this.db = conn;
CapacitorPlatformService.connRetrievedCount++;
const isOpen = await conn.isDBOpen();
if (!isOpen.result) {
await conn.open();
}
return conn;
}
} catch (e) {
logger.warn(
`[CapacitorPlatformService] [DB] is/retrieve failed; will attempt repair: ${String(e)}`,
);
} catch {
// Will attempt repair below
}
// Repair JS/native desync (prevents phantom "already exists")
// Repair JS/native desync
try {
await this.sqlite.closeConnection(name, false);
} catch {
// Ignore close errors
// Ignore
}
logger.log(`[CapacitorPlatformService] [DB] createConnection(${name})`);
const conn = await this.sqlite.createConnection(
name,
false,
@@ -278,546 +265,40 @@ export class CapacitorPlatformService
opts.encrypted ?? false,
);
this.db = conn;
CapacitorPlatformService.connRecreatedCount++;
await conn.open();
return conn;
}
/**
* Ensure database connection exists (v3.3.4)
*
* Implements retrieve-first, create-second, close+create-last pattern.
* Short-circuits if fallback PRAGMA method already set this.db.
*
* ORDERING GUARD (v3.3.3+):
* - This method MUST be called AFTER _applyPragmasOnTempConnection()
* - PRAGMAs must run before any execute/executeSet operations
* - If this.db is already set by fallback PRAGMA path, return immediately
*/
private async _ensureConnection(): Promise<void> {
// 1) Short-circuit: If fallback PRAGMA method already set this.db, use it (invariant #3)
if (this.db) {
logger.log(
"[CapacitorPlatformService] [DB] ensure: already have connection (from PRAGMA fallback)",
);
return;
}
const name = this.dbName;
// 2) If PRAGMA phase ran before this, we might already be good
try {
const has = await this.sqlite.isConnection(name, false);
if (has.result) {
logger.log(
"[CapacitorPlatformService] [DB] ensure: retrieving existing post-PRAGMA",
);
const conn = await this.sqlite.retrieveConnection(name, false);
this.db = conn;
CapacitorPlatformService.connRetrievedCount++;
// Ensure it's open
if (!(await conn.isDBOpen())) {
await conn.open();
}
return;
}
} catch (e) {
logger.debug(
"[CapacitorPlatformService] [DB] ensure: isConnection check failed; continuing",
e,
);
}
// 3) Otherwise, create or repair using authoritative helper
await this._getOrCreateConnection(name, {
version: 1,
encrypted: false,
mode: "no-encryption",
});
// Ensure it's open
if (this.db && !(await this.db.isDBOpen())) {
await this.db.open();
}
}
/**
* Ensure database connection is open (v3.2)
*/
private async _ensureOpen(): Promise<void> {
if (!this.db) {
throw new Error("Database connection not established");
}
try {
await this.db.open();
} catch (openError: unknown) {
const openErrorMessage =
openError instanceof Error ? openError.message : String(openError);
// If already open, that's fine - continue
if (!openErrorMessage.includes("already open")) {
throw openError;
}
logger.debug(
"[CapacitorPlatformService] Database connection already open",
);
}
}
/**
* Extract error message from unknown error type (v3.3.2)
*
* Handles various error formats from Capacitor plugins and native code.
*
* @param e - Error object of unknown type
* @returns Extracted error message string
*/
private _errMsg(e: unknown): string {
if (e instanceof Error) return e.message ?? String(e);
if (e && typeof e === "object") {
const anyE = e as Record<string, unknown>;
return String(
anyE.message ??
anyE.error ??
anyE.errorMessage ??
anyE.toString?.() ??
e,
);
}
return String(e);
}
/**
* Check if error indicates connection already exists (v3.3.2)
*
* Matches common plugin error formats:
* - "CreateConnection: Connection <name>.sqlite already exists"
* - "<name> already exists" / "connection already exists"
*
* @param e - Error object to check
* @returns true if error indicates connection already exists
*/
private _isAlreadyExists(e: unknown): boolean {
const m = this._errMsg(e);
// Cover common plugin formats
return (
/already\s+exist/i.test(m) &&
/(createconnection|connection|sqlite)/i.test(m)
);
}
/**
* Apply SQLite PRAGMAs on temporary connection (v3.3.3)
*
* CRITICAL: PRAGMAs must run OUTSIDE any transaction.
* Uses a temporary connection to avoid conflicts with main connection.
* PRAGMAs are database-level settings that persist across connections.
*
* Strategy:
* - If main connection exists, use fallback method immediately
* - Otherwise, create temp connection, apply PRAGMAs, close it
* - Never close active connections from this path
*
* iOS Compatibility:
* - WAL generally works on iOS; if refused, log and continue (don't throw)
* - Only set synchronous=NORMAL when WAL is effective
* - Don't attempt WAL on read-only bundles or protected file modes
*
* v3.3.3: Fixed busy_timeout PRAGMA to use query() instead of run() to avoid
* "error code 100: another row available" on some Capacitor SQLite builds.
*
* @returns Object with journal_mode value
*/
private async _applyPragmasOnTempConnection(): Promise<{
journalMode: string;
path: "temp" | "fallback";
}> {
logger.log("[CapacitorPlatformService] [PRAGMA] temp path -> start");
const platform = Capacitor.getPlatform();
// If any main connection exists, go straight to fallback
try {
const hasMain = (await this.sqlite.isConnection(this.dbName, false))
.result;
if (hasMain) {
logger.debug(
"[CapacitorPlatformService] PRAGMA: main connection present → using fallback",
);
return await this._applyPragmasOnExistingConnection();
}
} catch (chkErr) {
logger.debug(
"[CapacitorPlatformService] PRAGMA: isConnection check failed; using fallback",
chkErr,
);
return await this._applyPragmasOnExistingConnection();
}
let pragmaConn: SQLiteDBConnection | undefined;
try {
// Use the SAME dbName to hit the same file
pragmaConn = await this.sqlite.createConnection(
this.dbName,
false,
"no-encryption",
1,
false,
);
} catch (e) {
if (this._isAlreadyExists(e)) {
logger.warn(
"[CapacitorPlatformService] [PRAGMA] temp says 'already exists' → using fallback",
);
return await this._applyPragmasOnExistingConnection();
}
// Unknown create failure → don't risk close; surface it
logger.error(
`[CapacitorPlatformService] [PRAGMA] temp path failed: ${this._errMsg(e)}`,
);
throw e;
}
try {
await pragmaConn.open();
// Foreign keys (safe via run)
await pragmaConn.run("PRAGMA foreign_keys = ON;", []);
// journal_mode via query (returns a row) - invariant #2
let journalMode = "unknown";
if (platform !== "web") {
try {
const res = await pragmaConn.query("PRAGMA journal_mode = WAL;");
const row = res.values?.[0] as Record<string, unknown> | undefined;
const key = row ? Object.keys(row)[0] : undefined;
journalMode = (key ? (row![key] as string) : undefined) ?? "unknown";
// iOS compatibility: WAL may be refused on read-only bundles
// Log but don't throw - device may not support WAL
if (journalMode !== "wal") {
logger.debug(
`[CapacitorPlatformService] WAL mode not effective (got: ${journalMode}), continuing with default`,
);
}
} catch (walError) {
// Non-fatal: WAL failure is acceptable (device may not support it)
logger.debug(
"[CapacitorPlatformService] WAL mode failed, continuing with default:",
this._errMsg(walError),
);
journalMode = "default";
}
// Only set synchronous when WAL is effective (invariant #4)
if (journalMode === "wal") {
try {
await pragmaConn.run("PRAGMA synchronous = NORMAL;", []);
} catch (syncError) {
// Non-fatal if synchronous fails (may be in transaction)
logger.debug(
"[CapacitorPlatformService] synchronous=NORMAL failed (non-critical):",
this._errMsg(syncError),
);
}
}
} else {
journalMode = "default";
}
// busy_timeout: use query() and ignore the row to avoid "another row available" (v3.3.3)
// v3.3.3: Fixed "error code 100: another row available" by using query() instead of run()
// Some Capacitor SQLite builds return busy_timeout as a row, which causes run() to fail
try {
// Some builds return the effective timeout as a row; query() is the safe call.
await pragmaConn.query("PRAGMA busy_timeout = 5000;");
} catch (e) {
// As a fallback (rare), try run(); but tolerate row/done mismatch
try {
await pragmaConn.run("PRAGMA busy_timeout = 5000;", []);
} catch (e2) {
const msg = (
e2 instanceof Error ? e2.message : String(e2)
).toLowerCase();
if (msg.includes("another row available")) {
// benign on some plugins; swallow
} else {
// If you prefer, just warn and continue—don't fail init on busy_timeout
logger.debug(
"[CapacitorPlatformService] busy_timeout warning:",
e2,
);
}
}
}
logger.debug(
`[CapacitorPlatformService] PRAGMAs applied (temp, v3.3.3): journal_mode=${journalMode}`,
);
logger.log(
"[CapacitorPlatformService] [PRAGMA] temp path -> done (v3.3.3)",
);
logger.log("[CapacitorPlatformService] PRAGMA path: temp");
return { journalMode, path: "temp" as const };
} finally {
try {
await pragmaConn?.close();
} catch {
// Ignore close errors
}
try {
await this.sqlite.closeConnection(this.dbName, false);
} catch {
// Ignore close errors
}
}
}
/**
* Apply PRAGMAs on existing connection (v3.3.3 fallback)
*
* Always completes and sets this.db. Repairs desync if needed.
* Does not set synchronous (often fails in transaction).
*
* v3.3.3: Fixed busy_timeout PRAGMA to use query() instead of run() to avoid
* "error code 100: another row available" on some Capacitor SQLite builds.
*
* INVARIANTS (v3.3.3+ - must not regress):
* 1. foreign_keys: Always set via run() (no return value)
* 2. journal_mode: Always set via query() (returns mode as row)
* - On iOS/web: If WAL refused, log + continue (journalMode = "unknown")
* - Never throw on WAL failure (device may not support it)
* 3. busy_timeout: Always set via query() first, fallback to run() if needed
* - Swallow benign "another row available" errors
* 4. synchronous: SKIPPED in fallback path (often fails in transaction)
* - Only set in temp path when journal_mode === 'wal'
* 5. MUST set this.db before returning (even on error, if conn exists)
*
* iOS Compatibility:
* - WAL may be refused on read-only bundles or protected file modes
* - If journal_mode !== 'wal', synchronous is not set (as documented)
* - All PRAGMA failures are non-fatal (log + continue)
*/
private async _applyPragmasOnExistingConnection(): Promise<{
journalMode: string;
path: "temp" | "fallback";
}> {
logger.log("[CapacitorPlatformService] [PRAGMA] path: fallback -> start");
const dbName = this.dbName;
let conn: SQLiteDBConnection | null = null;
try {
// Try to retrieve existing connection
const has = await this.sqlite.isConnection(dbName, false);
if (has.result) {
conn = await this.sqlite.retrieveConnection(dbName, false);
logger.log(
"[CapacitorPlatformService] [PRAGMA] fallback: retrieved existing connection",
);
// Retrieved connection - check if open
try {
const isOpen = await conn.isDBOpen();
if (!isOpen) {
await conn.open();
logger.log(
"[CapacitorPlatformService] [PRAGMA] fallback: opened DB (retrieved)",
);
} else {
logger.debug(
"[CapacitorPlatformService] [PRAGMA] fallback: DB already open",
);
}
} catch (openError) {
// If isDBOpen check fails, try opening anyway
logger.debug(
"[CapacitorPlatformService] [PRAGMA] fallback: isDBOpen check failed, attempting open:",
openError,
);
await conn.open();
logger.log(
"[CapacitorPlatformService] [PRAGMA] fallback: opened DB (after check error)",
);
}
} else {
logger.warn(
"[CapacitorPlatformService] [PRAGMA] fallback: no JS handle; repairing",
);
// Desync repair
try {
await this.sqlite.closeConnection(dbName, false);
} catch {
// Ignore close errors
}
conn = await this.sqlite.createConnection(
dbName,
false,
"no-encryption",
1,
false,
);
logger.log(
"[CapacitorPlatformService] [PRAGMA] fallback: created connection post-repair",
);
// Newly created connections are never open - must open explicitly
await conn.open();
logger.log(
"[CapacitorPlatformService] [PRAGMA] fallback: opened DB (new connection)",
);
}
// Apply PRAGMAs
const platform = Capacitor.getPlatform();
let journalMode = "unknown";
// FK always on
try {
await conn.run("PRAGMA foreign_keys = ON;", []);
} catch {
// Ignore errors
}
// journal_mode via query (safe even if txn-wrapped) - invariant #2
if (platform !== "web") {
try {
const res = await conn.query("PRAGMA journal_mode = WAL;");
const row = res.values?.[0] as Record<string, unknown> | undefined;
const key = row ? Object.keys(row)[0] : undefined;
journalMode = (key ? (row![key] as string) : undefined) ?? "unknown";
// iOS compatibility: WAL may be refused on read-only bundles
// Log but don't throw - device may not support WAL
if (journalMode !== "wal") {
logger.debug(
`[CapacitorPlatformService] [PRAGMA] fallback: WAL mode not effective (got: ${journalMode}), continuing`,
);
}
} catch (walError) {
// Non-fatal: WAL failure is acceptable (device may not support it)
logger.debug(
"[CapacitorPlatformService] [PRAGMA] fallback: WAL mode failed, continuing:",
this._errMsg(walError),
);
journalMode = "default";
}
} else {
journalMode = "default";
}
// NOTE: synchronous is SKIPPED in fallback path (invariant #4)
// It often fails in transaction context, and temp path handles it when WAL is effective
// busy_timeout: use query() and ignore the row to avoid "another row available" (v3.3.3)
// v3.3.3: Fixed "error code 100: another row available" by using query() instead of run()
// Some Capacitor SQLite builds return busy_timeout as a row, which causes run() to fail
try {
// Some builds return the effective timeout as a row; query() is the safe call.
await conn.query("PRAGMA busy_timeout = 5000;");
} catch (e) {
// As a fallback (rare), try run(); but tolerate row/done mismatch
try {
await conn.run("PRAGMA busy_timeout = 5000;", []);
} catch (e2) {
const msg = (
e2 instanceof Error ? e2.message : String(e2)
).toLowerCase();
if (msg.includes("another row available")) {
// benign on some plugins; swallow
} else {
// If you prefer, just warn and continue—don't fail init on busy_timeout
logger.debug(
"[CapacitorPlatformService] busy_timeout warning:",
e2,
);
}
}
}
// CRUCIAL: Set this.db before returning (invariant #5)
this.db = conn;
logger.log(
`[CapacitorPlatformService] [PRAGMA] fallback: PRAGMAs applied; this.db set (journal_mode=${journalMode}, v3.3.3)`,
);
logger.log("[CapacitorPlatformService] PRAGMA path: fallback (v3.3.3)");
return { journalMode, path: "fallback" as const };
} catch (e) {
logger.error(
`[CapacitorPlatformService] [PRAGMA] fallback failed: ${String(e)}`,
);
// Even on error, try to set this.db if we have a connection
if (conn) {
this.db = conn;
logger.debug(
"[CapacitorPlatformService] [PRAGMA] fallback: set this.db despite error",
);
}
// Return unknown mode but still indicate fallback path was taken
logger.log("[CapacitorPlatformService] PRAGMA path: fallback");
return { journalMode: "unknown", path: "fallback" as const };
}
}
private async processQueue(): Promise<void> {
if (this.isProcessingQueue || !this.initialized || !this.db) {
if (!this.initialized || !this.db) {
return;
}
this.isProcessingQueue = true;
while (this.operationQueue.length > 0) {
const operation = this.operationQueue.shift();
if (!operation) continue;
try {
let result: unknown;
switch (operation.type) {
case "run": {
const runResult = await this.db.run(
operation.sql,
operation.params,
);
result = {
const executor: QueueExecutor<SQLiteDBConnection> = {
executeRun: async (db, sql, params) => {
const runResult = await db.run(sql, params);
return {
changes: runResult.changes?.changes || 0,
lastId: runResult.changes?.lastId,
};
break;
}
case "query": {
const queryResult = await this.db.query(
operation.sql,
operation.params,
);
result = {
},
executeQuery: async (db, sql, params) => {
const queryResult = await db.query(sql, params);
return {
columns: Object.keys(queryResult.values?.[0] || {}),
values: (queryResult.values || []).map((row) =>
Object.values(row),
),
values: (queryResult.values || []).map((row) => Object.values(row)),
};
},
executeRawQuery: async (db, sql, params) => {
return await db.query(sql, params);
},
};
break;
}
case "rawQuery": {
const queryResult = await this.db.query(
operation.sql,
operation.params,
);
result = queryResult;
break;
}
}
operation.resolve(result);
} catch (error) {
logger.error(
"[CapacitorPlatformService] Error while processing SQL queue:",
error,
);
logger.error(
`[CapacitorPlatformService] Failed operation - Type: ${operation.type}, SQL: ${operation.sql}`,
);
logger.error(
`[CapacitorPlatformService] Failed operation - Params:`,
operation.params,
);
operation.reject(error);
}
}
this.isProcessingQueue = false;
await this.operationQueue.processQueue(
this.db,
executor,
"CapacitorPlatformService",
);
}
private async queueOperation<R>(
@@ -931,22 +412,37 @@ export class CapacitorPlatformService
operation.type = type;
operation.sql = sql;
operation.params = plainParams;
operation.resolve = (value: unknown) => resolve(value as R);
operation.resolve = (value: unknown) => {
resolve(value as R);
};
operation.reject = reject;
// Step 3: Freeze everything to prevent modification
Object.freeze(operation.params);
Object.freeze(operation);
this.operationQueue.push(operation);
// If we're already initialized, start processing the queue
if (this.initialized && this.db) {
this.processQueue();
}
// Use shared queue utility
this.operationQueue.queueOperation<R>(
operation,
this.initialized,
this.db,
() => this.processQueue(),
);
});
}
/**
* Ensure database is initialized (v3.4 - Re-entrancy guard)
* Safe to call from multiple concurrent callers - only one init will run
*/
private async ensureInitialized(): Promise<void> {
if (this.initialized) {
return;
}
// Safe: initializeDatabase() returns existing promise if in flight
await this.initializeDatabase();
}
private async waitForInitialization(): Promise<void> {
// If we have an initialization promise, wait for it
if (this.initializationPromise) {
@@ -956,7 +452,7 @@ export class CapacitorPlatformService
// If not initialized and no promise, start initialization
if (!this.initialized) {
await this.initializeDatabase();
await this.ensureInitialized();
return;
}
@@ -1964,27 +1460,4 @@ export class CapacitorPlatformService
// generateInsertStatement, updateDefaultSettings, updateActiveDid,
// getActiveIdentity, insertNewDidIntoSettings, updateDidSpecificSettings,
// retrieveSettingsForActiveAccount are all inherited from BaseDatabaseService
/**
* Get database initialization metrics (v3.2)
*
* @returns Object containing initialization statistics
*/
public static getMetrics(): {
initSuccess: number;
initFail: number;
connRetrieved: number;
connRecreated: number;
migrationsApplied: number;
migrationsSkipped: number;
} {
return {
initSuccess: CapacitorPlatformService.initSuccessCount,
initFail: CapacitorPlatformService.initFailCount,
connRetrieved: CapacitorPlatformService.connRetrievedCount,
connRecreated: CapacitorPlatformService.connRecreatedCount,
migrationsApplied: CapacitorPlatformService.migrationsAppliedCount,
migrationsSkipped: CapacitorPlatformService.migrationsSkippedCount,
};
}
}

View File

@@ -0,0 +1,135 @@
/**
* Shared operation queue handler for database services
*
* Provides a reusable queue mechanism for database operations that need to
* wait for initialization before execution.
*/
import { QueuedOperation } from "./types";
import { logger } from "../../utils/logger";
export interface QueueExecutor<TDb> {
executeRun(db: TDb, sql: string, params: unknown[]): Promise<unknown>;
executeQuery(db: TDb, sql: string, params: unknown[]): Promise<unknown>;
executeRawQuery?(db: TDb, sql: string, params: unknown[]): Promise<unknown>;
}
export class OperationQueue<TDb> {
private operationQueue: Array<QueuedOperation> = [];
private isProcessingQueue: boolean = false;
/**
* Process queued operations
*/
async processQueue(
db: TDb,
executor: QueueExecutor<TDb>,
serviceName: string,
): Promise<void> {
if (this.isProcessingQueue || !db) {
return;
}
this.isProcessingQueue = true;
while (this.operationQueue.length > 0) {
const operation = this.operationQueue.shift();
if (!operation) continue;
try {
let result: unknown;
switch (operation.type) {
case "run":
result = await executor.executeRun(
db,
operation.sql,
operation.params,
);
break;
case "query":
result = await executor.executeQuery(
db,
operation.sql,
operation.params,
);
break;
case "rawQuery":
if (executor.executeRawQuery) {
result = await executor.executeRawQuery(
db,
operation.sql,
operation.params,
);
} else {
// Fallback to query if rawQuery not supported
result = await executor.executeQuery(
db,
operation.sql,
operation.params,
);
}
break;
}
operation.resolve(result);
} catch (error) {
logger.error(
`[${serviceName}] Error while processing SQL queue:`,
error,
);
logger.error(
`[${serviceName}] Failed operation - Type: ${operation.type}, SQL: ${operation.sql}`,
);
logger.error(
`[${serviceName}] Failed operation - Params:`,
operation.params,
);
operation.reject(error);
}
}
this.isProcessingQueue = false;
}
/**
* Queue an operation for later execution
*
* @param operation - Pre-constructed operation object (allows platform-specific parameter conversion)
* @param initialized - Whether the database is initialized
* @param db - Database connection (if available)
* @param onQueue - Callback to trigger queue processing
*/
queueOperation<R>(
operation: QueuedOperation,
initialized: boolean,
db: TDb | null,
onQueue: () => void,
): Promise<R> {
return new Promise<R>((resolve, reject) => {
// Wrap the operation's resolve/reject to match our Promise
const wrappedOperation: QueuedOperation = {
...operation,
resolve: (value: unknown) => {
operation.resolve(value);
resolve(value as R);
},
reject: (reason: unknown) => {
operation.reject(reason);
reject(reason);
},
};
this.operationQueue.push(wrappedOperation);
// If already initialized, trigger queue processing
if (initialized && db) {
onQueue();
}
});
}
/**
* Get current queue length (for debugging)
*/
getQueueLength(): number {
return this.operationQueue.length;
}
}

View File

@@ -0,0 +1,13 @@
/**
* Types for platform services
*/
export interface QueuedOperation {
type: "run" | "query" | "rawQuery";
sql: string;
params: unknown[];
resolve: (value: unknown) => void;
reject: (reason: unknown) => void;
}
export type QueuedOperationType = QueuedOperation["type"];