You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

283 lines
8.3 KiB

/**
* Callback Registry Implementation
* Provides uniform callback lifecycle usable from any platform
*
* @author Matthew Raymer
* @version 1.1.0
*/
export type CallbackKind = 'http' | 'local' | 'queue';
export interface CallbackEvent {
id: string;
at: number;
type: 'onFetchStart' | 'onFetchSuccess' | 'onFetchFailure' |
'onNotifyStart' | 'onNotifyDelivered' | 'onNotifySkippedTTL' | 'onNotifyFailure';
payload?: unknown;
}
export type CallbackFunction = (e: CallbackEvent) => Promise<void> | void;
export interface CallbackRecord {
id: string;
kind: CallbackKind;
target: string;
headers?: Record<string, string>;
enabled: boolean;
createdAt: number;
retryCount?: number;
lastFailure?: number;
circuitOpen?: boolean;
}
export interface CallbackRegistry {
register(id: string, callback: CallbackRecord): Promise<void>;
unregister(id: string): Promise<void>;
fire(event: CallbackEvent): Promise<void>;
getRegistered(): Promise<CallbackRecord[]>;
getStatus(): Promise<{
total: number;
enabled: number;
circuitOpen: number;
lastActivity: number;
}>;
}
/**
* Callback Registry Implementation
* Handles callback registration, delivery, and circuit breaker logic
*/
export class CallbackRegistryImpl implements CallbackRegistry {
private callbacks = new Map<string, CallbackRecord>();
private localCallbacks = new Map<string, CallbackFunction>();
private retryQueue = new Map<string, CallbackEvent[]>();
private circuitBreakers = new Map<string, {
failures: number;
lastFailure: number;
open: boolean;
}>();
constructor() {
this.startRetryProcessor();
}
async register(id: string, callback: CallbackRecord): Promise<void> {
this.callbacks.set(id, callback);
// Initialize circuit breaker
if (!this.circuitBreakers.has(id)) {
this.circuitBreakers.set(id, {
failures: 0,
lastFailure: 0,
open: false
});
}
console.log(`DNP-CB-REGISTER: Callback ${id} registered (${callback.kind})`);
}
async unregister(id: string): Promise<void> {
this.callbacks.delete(id);
this.localCallbacks.delete(id);
this.retryQueue.delete(id);
this.circuitBreakers.delete(id);
console.log(`DNP-CB-UNREGISTER: Callback ${id} unregistered`);
}
async fire(event: CallbackEvent): Promise<void> {
const enabledCallbacks = Array.from(this.callbacks.values())
.filter(cb => cb.enabled);
console.log(`DNP-CB-FIRE: Firing event ${event.type} to ${enabledCallbacks.length} callbacks`);
for (const callback of enabledCallbacks) {
try {
await this.deliverCallback(callback, event);
} catch (error) {
console.error(`DNP-CB-FIRE-ERROR: Failed to deliver to ${callback.id}`, error);
await this.handleCallbackFailure(callback, event, error);
}
}
}
async getRegistered(): Promise<CallbackRecord[]> {
return Array.from(this.callbacks.values());
}
async getStatus(): Promise<{
total: number;
enabled: number;
circuitOpen: number;
lastActivity: number;
}> {
const callbacks = Array.from(this.callbacks.values());
const circuitBreakers = Array.from(this.circuitBreakers.values());
return {
total: callbacks.length,
enabled: callbacks.filter(cb => cb.enabled).length,
circuitOpen: circuitBreakers.filter(cb => cb.open).length,
lastActivity: Math.max(
...callbacks.map(cb => cb.createdAt),
...circuitBreakers.map(cb => cb.lastFailure)
)
};
}
private async deliverCallback(callback: CallbackRecord, event: CallbackEvent): Promise<void> {
const circuitBreaker = this.circuitBreakers.get(callback.id);
// Check circuit breaker
if (circuitBreaker?.open) {
console.warn(`DNP-CB-CIRCUIT: Circuit open for ${callback.id}, skipping delivery`);
return;
}
const start = performance.now();
try {
switch (callback.kind) {
case 'http':
await this.deliverHttpCallback(callback, event);
break;
case 'local':
await this.deliverLocalCallback(callback, event);
break;
case 'queue':
await this.deliverQueueCallback(callback, event);
break;
default:
throw new Error(`Unknown callback kind: ${callback.kind}`);
}
// Reset circuit breaker on success
if (circuitBreaker) {
circuitBreaker.failures = 0;
circuitBreaker.open = false;
}
const duration = performance.now() - start;
console.log(`DNP-CB-SUCCESS: Delivered to ${callback.id} in ${duration.toFixed(2)}ms`);
} catch (error) {
throw error;
}
}
private async deliverHttpCallback(callback: CallbackRecord, event: CallbackEvent): Promise<void> {
const response = await fetch(callback.target, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
...callback.headers
},
body: JSON.stringify({
...event,
callbackId: callback.id,
timestamp: Date.now()
})
});
if (!response.ok) {
throw new Error(`HTTP ${response.status}: ${response.statusText}`);
}
}
private async deliverLocalCallback(callback: CallbackRecord, event: CallbackEvent): Promise<void> {
const localCallback = this.localCallbacks.get(callback.id);
if (!localCallback) {
throw new Error(`Local callback ${callback.id} not found`);
}
await localCallback(event);
}
private async deliverQueueCallback(callback: CallbackRecord, event: CallbackEvent): Promise<void> {
// Queue callback implementation would go here
// For now, just log the event
console.log(`DNP-CB-QUEUE: Queued event ${event.type} for ${callback.id}`);
}
private async handleCallbackFailure(
callback: CallbackRecord,
event: CallbackEvent,
error: unknown
): Promise<void> {
const circuitBreaker = this.circuitBreakers.get(callback.id);
if (circuitBreaker) {
circuitBreaker.failures++;
circuitBreaker.lastFailure = Date.now();
// Open circuit after 5 consecutive failures
if (circuitBreaker.failures >= 5) {
circuitBreaker.open = true;
console.error(`DNP-CB-CIRCUIT-OPEN: Circuit opened for ${callback.id} after ${circuitBreaker.failures} failures`);
}
}
// Schedule retry with exponential backoff
await this.scheduleRetry(callback, event);
console.error(`DNP-CB-FAILURE: Callback ${callback.id} failed`, error);
}
private async scheduleRetry(callback: CallbackRecord, event: CallbackEvent): Promise<void> {
const retryCount = callback.retryCount || 0;
if (retryCount >= 5) {
console.warn(`DNP-CB-RETRY-LIMIT: Max retries reached for ${callback.id}`);
return;
}
const backoffMs = Math.min(1000 * Math.pow(2, retryCount), 60000); // Cap at 1 minute
const retryEvent = { ...event, retryCount: retryCount + 1 };
if (!this.retryQueue.has(callback.id)) {
this.retryQueue.set(callback.id, []);
}
this.retryQueue.get(callback.id)!.push(retryEvent);
console.log(`DNP-CB-RETRY: Scheduled retry ${retryCount + 1} for ${callback.id} in ${backoffMs}ms`);
}
private startRetryProcessor(): void {
setInterval(async () => {
for (const [callbackId, events] of this.retryQueue.entries()) {
if (events.length === 0) continue;
const callback = this.callbacks.get(callbackId);
if (!callback) {
this.retryQueue.delete(callbackId);
continue;
}
const event = events.shift();
if (!event) continue;
try {
await this.deliverCallback(callback, event);
} catch (error) {
console.error(`DNP-CB-RETRY-FAILED: Retry failed for ${callbackId}`, error);
}
}
}, 5000); // Process retries every 5 seconds
}
// Register local callback function
registerLocalCallback(id: string, callback: CallbackFunction): void {
this.localCallbacks.set(id, callback);
console.log(`DNP-CB-LOCAL: Local callback ${id} registered`);
}
// Unregister local callback function
unregisterLocalCallback(id: string): void {
this.localCallbacks.delete(id);
console.log(`DNP-CB-LOCAL: Local callback ${id} unregistered`);
}
}
// Singleton instance
export const callbackRegistry = new CallbackRegistryImpl();