feat(polling-contracts): add generic polling interface with TypeScript types and Zod schemas
- Add @timesafari/polling-contracts package with comprehensive type definitions - Implement GenericPollingRequest, PollingResult, and PollingScheduleConfig interfaces - Add Zod schemas for StarredProjectsRequest/Response and DeepLinkParams validation - Include calculateBackoffDelay utility with unified retry policy (exponential, linear, fixed) - Add OutboxPressureManager for storage pressure controls and back-pressure signals - Implement TelemetryManager with cardinality budgets and PII redaction - Add ClockSyncManager for JWT timestamp validation and skew tolerance - Include comprehensive unit tests with Jest snapshots and race condition testing - Add JWT_ID_PATTERN regex for canonical JWT ID format validation - Support idempotency with X-Idempotency-Key enforcement - Implement watermark CAS (Compare-and-Swap) for race condition prevention This establishes the foundation for the new generic polling system where host apps define request/response schemas and the plugin provides robust polling logic.
This commit is contained in:
265
packages/polling-contracts/src/__tests__/watermark-cas.test.ts
Normal file
265
packages/polling-contracts/src/__tests__/watermark-cas.test.ts
Normal file
@@ -0,0 +1,265 @@
|
||||
/**
|
||||
* Watermark CAS race condition tests
|
||||
*/
|
||||
|
||||
import { compareJwtIds } from '../validation';
|
||||
|
||||
describe('Watermark CAS Race Conditions', () => {
|
||||
const testJwtIds = [
|
||||
'1704067200_abc123_12345678', // 2024-01-01 00:00:00
|
||||
'1704153600_mno345_87654321', // 2024-01-02 00:00:00
|
||||
'1704240000_new123_abcdef01', // 2024-01-03 00:00:00
|
||||
'1704326400_xyz789_23456789', // 2024-01-04 00:00:00
|
||||
'1704412800_stu901_34567890' // 2024-01-05 00:00:00
|
||||
];
|
||||
|
||||
describe('Concurrent Bootstrap Race', () => {
|
||||
it('should handle two clients bootstrapping concurrently', async () => {
|
||||
// Simulate two clients fetching the same data concurrently
|
||||
const client1Bootstrap = testJwtIds[2]; // 2024-01-03
|
||||
const client2Bootstrap = testJwtIds[3]; // 2024-01-04 (newer)
|
||||
|
||||
// Client 1 attempts to set watermark
|
||||
const client1Result = await simulateWatermarkUpdate(null, client1Bootstrap);
|
||||
expect(client1Result.success).toBe(true);
|
||||
expect(client1Result.watermark).toBe(client1Bootstrap);
|
||||
|
||||
// Client 2 attempts to set watermark (should succeed due to CAS)
|
||||
const client2Result = await simulateWatermarkUpdate(null, client2Bootstrap);
|
||||
expect(client2Result.success).toBe(true);
|
||||
expect(client2Result.watermark).toBe(client2Bootstrap);
|
||||
|
||||
// Final watermark should be the maximum JWT ID
|
||||
const finalWatermark = await getCurrentWatermark();
|
||||
expect(finalWatermark).toBe(client2Bootstrap);
|
||||
if (finalWatermark && client1Bootstrap) {
|
||||
expect(compareJwtIds(finalWatermark, client1Bootstrap)).toBeGreaterThan(0);
|
||||
}
|
||||
});
|
||||
|
||||
it('should reject older watermark updates', async () => {
|
||||
// Set initial watermark
|
||||
await simulateWatermarkUpdate(null, testJwtIds[3]); // 2024-01-04
|
||||
|
||||
// Attempt to set older watermark (should fail)
|
||||
const result = await simulateWatermarkUpdate(testJwtIds[3], testJwtIds[1]); // 2024-01-02
|
||||
expect(result.success).toBe(false);
|
||||
expect(result.watermark).toBe(testJwtIds[3]); // Unchanged
|
||||
});
|
||||
|
||||
it('should handle null watermark bootstrap', async () => {
|
||||
// First client sets watermark from null
|
||||
const result1 = await simulateWatermarkUpdate(null, testJwtIds[2]);
|
||||
expect(result1.success).toBe(true);
|
||||
|
||||
// Second client attempts to set watermark from null (should fail)
|
||||
const result2 = await simulateWatermarkUpdate(null, testJwtIds[1]);
|
||||
expect(result2.success).toBe(false);
|
||||
expect(result2.watermark).toBe(testJwtIds[2]); // First client's watermark
|
||||
});
|
||||
});
|
||||
|
||||
describe('Overlapping Polls Race', () => {
|
||||
it('should handle overlapping polls with different results', async () => {
|
||||
// Set initial watermark
|
||||
await simulateWatermarkUpdate(null, testJwtIds[1]); // 2024-01-02
|
||||
|
||||
// Client 1 polls and finds changes up to 2024-01-03
|
||||
const client1Changes = [testJwtIds[2]]; // 2024-01-03
|
||||
const client1Result = await simulatePollAndUpdate(testJwtIds[1], client1Changes);
|
||||
expect(client1Result.success).toBe(true);
|
||||
expect(client1Result.newWatermark).toBe(testJwtIds[2]);
|
||||
|
||||
// Client 2 polls concurrently and finds changes up to 2024-01-04
|
||||
const client2Changes = [testJwtIds[2], testJwtIds[3]]; // 2024-01-03, 2024-01-04
|
||||
const client2Result = await simulatePollAndUpdate(testJwtIds[1], client2Changes);
|
||||
expect(client2Result.success).toBe(true);
|
||||
expect(client2Result.newWatermark).toBe(testJwtIds[3]);
|
||||
|
||||
// Final watermark should be the maximum
|
||||
const finalWatermark = await getCurrentWatermark();
|
||||
expect(finalWatermark).toBe(testJwtIds[3]);
|
||||
});
|
||||
|
||||
it('should prevent duplicate notifications from overlapping polls', async () => {
|
||||
// Set initial watermark
|
||||
await simulateWatermarkUpdate(null, testJwtIds[1]);
|
||||
|
||||
// Both clients find the same change
|
||||
const sharedChange = testJwtIds[2];
|
||||
|
||||
// Client 1 processes change
|
||||
const client1Result = await simulatePollAndUpdate(testJwtIds[1], [sharedChange]);
|
||||
expect(client1Result.success).toBe(true);
|
||||
expect(client1Result.notificationsGenerated).toBe(1);
|
||||
|
||||
// Client 2 attempts to process same change (should be no-op)
|
||||
const client2Result = await simulatePollAndUpdate(testJwtIds[1], [sharedChange]);
|
||||
expect(client2Result.success).toBe(false); // No new changes
|
||||
expect(client2Result.notificationsGenerated).toBe(0);
|
||||
});
|
||||
});
|
||||
|
||||
describe('Watermark Monotonicity', () => {
|
||||
it('should maintain monotonic watermark advancement', async () => {
|
||||
let currentWatermark = null;
|
||||
|
||||
// Process changes in order
|
||||
for (let i = 0; i < testJwtIds.length; i++) {
|
||||
const newWatermark = testJwtIds[i];
|
||||
const result = await simulateWatermarkUpdate(currentWatermark, newWatermark);
|
||||
|
||||
expect(result.success).toBe(true);
|
||||
expect(result.watermark).toBe(newWatermark);
|
||||
|
||||
// Verify monotonicity
|
||||
if (currentWatermark) {
|
||||
expect(compareJwtIds(newWatermark, currentWatermark)).toBeGreaterThan(0);
|
||||
}
|
||||
|
||||
currentWatermark = newWatermark;
|
||||
}
|
||||
});
|
||||
|
||||
it('should reject non-monotonic watermark updates', async () => {
|
||||
// Set watermark to middle value
|
||||
await simulateWatermarkUpdate(null, testJwtIds[2]);
|
||||
|
||||
// Attempt to set older watermark
|
||||
const result = await simulateWatermarkUpdate(testJwtIds[2], testJwtIds[0]);
|
||||
expect(result.success).toBe(false);
|
||||
|
||||
// Attempt to set same watermark
|
||||
const result2 = await simulateWatermarkUpdate(testJwtIds[2], testJwtIds[2]);
|
||||
expect(result2.success).toBe(false);
|
||||
});
|
||||
});
|
||||
|
||||
describe('Platform-Specific CAS Implementation', () => {
|
||||
it('should verify SQL CAS returns row update count', async () => {
|
||||
// Simulate SQL UPDATE ... WHERE condition
|
||||
const sqlResult = await simulateSqlWatermarkUpdate(null, testJwtIds[0]);
|
||||
expect(sqlResult.rowsAffected).toBe(1);
|
||||
|
||||
// Attempt to update with same condition (should affect 0 rows)
|
||||
const sqlResult2 = await simulateSqlWatermarkUpdate(null, testJwtIds[0]);
|
||||
expect(sqlResult2.rowsAffected).toBe(0);
|
||||
});
|
||||
|
||||
it('should verify Room CAS returns update count', async () => {
|
||||
// Simulate Room @Query with return type Int
|
||||
const roomResult = await simulateRoomWatermarkUpdate(null, testJwtIds[0]);
|
||||
expect(roomResult).toBe(1);
|
||||
|
||||
// Attempt to update with same condition
|
||||
const roomResult2 = await simulateRoomWatermarkUpdate(null, testJwtIds[0]);
|
||||
expect(roomResult2).toBe(0);
|
||||
});
|
||||
|
||||
it('should verify Core Data CAS returns success boolean', async () => {
|
||||
// Simulate Core Data compare-and-swap
|
||||
const coreDataResult = await simulateCoreDataWatermarkUpdate(null, testJwtIds[0]);
|
||||
expect(coreDataResult).toBe(true);
|
||||
|
||||
// Attempt to update with same condition
|
||||
const coreDataResult2 = await simulateCoreDataWatermarkUpdate(null, testJwtIds[0]);
|
||||
expect(coreDataResult2).toBe(false);
|
||||
});
|
||||
|
||||
it('should verify IndexedDB CAS returns success boolean', async () => {
|
||||
// Simulate IndexedDB transaction
|
||||
const idbResult = await simulateIndexedDBWatermarkUpdate(null, testJwtIds[0]);
|
||||
expect(idbResult).toBe(true);
|
||||
|
||||
// Attempt to update with same condition
|
||||
const idbResult2 = await simulateIndexedDBWatermarkUpdate(null, testJwtIds[0]);
|
||||
expect(idbResult2).toBe(false);
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
// Mock implementations for testing
|
||||
let mockWatermark: string | null = null;
|
||||
|
||||
async function simulateWatermarkUpdate(
|
||||
expectedWatermark: string | null,
|
||||
newWatermark: string
|
||||
): Promise<{ success: boolean; watermark: string | null }> {
|
||||
// Simulate CAS logic
|
||||
if (mockWatermark === expectedWatermark) {
|
||||
mockWatermark = newWatermark;
|
||||
return { success: true, watermark: newWatermark };
|
||||
}
|
||||
return { success: false, watermark: mockWatermark };
|
||||
}
|
||||
|
||||
async function simulatePollAndUpdate(
|
||||
currentWatermark: string | null,
|
||||
changes: string[]
|
||||
): Promise<{ success: boolean; newWatermark: string | null; notificationsGenerated: number }> {
|
||||
if (changes.length === 0) {
|
||||
return { success: false, newWatermark: currentWatermark, notificationsGenerated: 0 };
|
||||
}
|
||||
|
||||
const latestChange = changes[changes.length - 1];
|
||||
const result = await simulateWatermarkUpdate(currentWatermark, latestChange);
|
||||
|
||||
return {
|
||||
success: result.success,
|
||||
newWatermark: result.watermark,
|
||||
notificationsGenerated: result.success ? changes.length : 0
|
||||
};
|
||||
}
|
||||
|
||||
async function getCurrentWatermark(): Promise<string | null> {
|
||||
return mockWatermark;
|
||||
}
|
||||
|
||||
async function simulateSqlWatermarkUpdate(
|
||||
expectedWatermark: string | null,
|
||||
newWatermark: string
|
||||
): Promise<{ rowsAffected: number }> {
|
||||
if (mockWatermark === expectedWatermark) {
|
||||
mockWatermark = newWatermark;
|
||||
return { rowsAffected: 1 };
|
||||
}
|
||||
return { rowsAffected: 0 };
|
||||
}
|
||||
|
||||
async function simulateRoomWatermarkUpdate(
|
||||
expectedWatermark: string | null,
|
||||
newWatermark: string
|
||||
): Promise<number> {
|
||||
if (mockWatermark === expectedWatermark) {
|
||||
mockWatermark = newWatermark;
|
||||
return 1;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
async function simulateCoreDataWatermarkUpdate(
|
||||
expectedWatermark: string | null,
|
||||
newWatermark: string
|
||||
): Promise<boolean> {
|
||||
if (mockWatermark === expectedWatermark) {
|
||||
mockWatermark = newWatermark;
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
async function simulateIndexedDBWatermarkUpdate(
|
||||
expectedWatermark: string | null,
|
||||
newWatermark: string
|
||||
): Promise<boolean> {
|
||||
if (mockWatermark === expectedWatermark) {
|
||||
mockWatermark = newWatermark;
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
// Reset mock state before each test
|
||||
beforeEach(() => {
|
||||
mockWatermark = null;
|
||||
});
|
||||
Reference in New Issue
Block a user