Browse Source
- Add complete SQLite schema with Room database (content_cache, schedules, callbacks, history) - Implement WorkManager FetchWorker with exponential backoff and network constraints - Add AlarmManager NotifyReceiver with TTL-at-fire logic and notification delivery - Create BootReceiver for automatic rescheduling after device reboot - Update AndroidManifest.xml with necessary permissions and receivers - Add Room, WorkManager, and Kotlin coroutines dependencies to build.gradle feat(callback-registry)!: implement callback registry with circuit breaker - Add CallbackRegistryImpl with HTTP, local, and queue callback support - Implement circuit breaker pattern with exponential backoff retry logic - Add CallbackEvent interface with structured event types - Support for exactly-once delivery semantics with retry queue - Include callback status monitoring and health checks feat(observability)!: add comprehensive observability and health monitoring - Implement ObservabilityManager with structured logging and event codes - Add performance metrics tracking (fetch, notify, callback times) - Create health status API with circuit breaker monitoring - Include log compaction and metrics reset functionality - Support for DNP-* event codes throughout the system feat(web)!: enhance web implementation with new functionality - Integrate callback registry and observability into web platform - Add mock implementations for dual scheduling methods - Implement performance tracking and structured logging - Support for local callback registration and management - Enhanced error handling and event logging BREAKING CHANGE: New Android dependencies require Room, WorkManager, and Kotlin coroutinesresearch/notification-plugin-enhancement
12 changed files with 2048 additions and 32 deletions
@ -0,0 +1,153 @@ |
|||||
|
package com.timesafari.dailynotification |
||||
|
|
||||
|
import android.content.BroadcastReceiver |
||||
|
import android.content.Context |
||||
|
import android.content.Intent |
||||
|
import android.util.Log |
||||
|
import kotlinx.coroutines.CoroutineScope |
||||
|
import kotlinx.coroutines.Dispatchers |
||||
|
import kotlinx.coroutines.launch |
||||
|
|
||||
|
/** |
||||
|
* Boot recovery receiver to reschedule notifications after device reboot |
||||
|
* Implements RECEIVE_BOOT_COMPLETED functionality |
||||
|
* |
||||
|
* @author Matthew Raymer |
||||
|
* @version 1.1.0 |
||||
|
*/ |
||||
|
class BootReceiver : BroadcastReceiver() { |
||||
|
|
||||
|
companion object { |
||||
|
private const val TAG = "DNP-BOOT" |
||||
|
} |
||||
|
|
||||
|
override fun onReceive(context: Context, intent: Intent?) { |
||||
|
if (intent?.action == Intent.ACTION_BOOT_COMPLETED) { |
||||
|
Log.i(TAG, "Boot completed, rescheduling notifications") |
||||
|
|
||||
|
CoroutineScope(Dispatchers.IO).launch { |
||||
|
try { |
||||
|
rescheduleNotifications(context) |
||||
|
} catch (e: Exception) { |
||||
|
Log.e(TAG, "Failed to reschedule notifications after boot", e) |
||||
|
} |
||||
|
} |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
private suspend fun rescheduleNotifications(context: Context) { |
||||
|
val db = DailyNotificationDatabase.getDatabase(context) |
||||
|
val enabledSchedules = db.scheduleDao().getEnabled() |
||||
|
|
||||
|
Log.i(TAG, "Found ${enabledSchedules.size} enabled schedules to reschedule") |
||||
|
|
||||
|
enabledSchedules.forEach { schedule -> |
||||
|
try { |
||||
|
when (schedule.kind) { |
||||
|
"fetch" -> { |
||||
|
// Reschedule WorkManager fetch |
||||
|
val config = ContentFetchConfig( |
||||
|
enabled = schedule.enabled, |
||||
|
schedule = schedule.cron ?: schedule.clockTime ?: "0 9 * * *", |
||||
|
url = null, // Will use mock content |
||||
|
timeout = 30000, |
||||
|
retryAttempts = 3, |
||||
|
retryDelay = 1000, |
||||
|
callbacks = CallbackConfig() |
||||
|
) |
||||
|
FetchWorker.scheduleFetch(context, config) |
||||
|
Log.i(TAG, "Rescheduled fetch for schedule: ${schedule.id}") |
||||
|
} |
||||
|
"notify" -> { |
||||
|
// Reschedule AlarmManager notification |
||||
|
val nextRunTime = calculateNextRunTime(schedule) |
||||
|
if (nextRunTime > System.currentTimeMillis()) { |
||||
|
val config = UserNotificationConfig( |
||||
|
enabled = schedule.enabled, |
||||
|
schedule = schedule.cron ?: schedule.clockTime ?: "0 9 * * *", |
||||
|
title = "Daily Notification", |
||||
|
body = "Your daily update is ready", |
||||
|
sound = true, |
||||
|
vibration = true, |
||||
|
priority = "normal" |
||||
|
) |
||||
|
NotifyReceiver.scheduleExactNotification(context, nextRunTime, config) |
||||
|
Log.i(TAG, "Rescheduled notification for schedule: ${schedule.id}") |
||||
|
} |
||||
|
} |
||||
|
else -> { |
||||
|
Log.w(TAG, "Unknown schedule kind: ${schedule.kind}") |
||||
|
} |
||||
|
} |
||||
|
} catch (e: Exception) { |
||||
|
Log.e(TAG, "Failed to reschedule ${schedule.kind} for ${schedule.id}", e) |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
// Record boot recovery in history |
||||
|
try { |
||||
|
db.historyDao().insert( |
||||
|
History( |
||||
|
refId = "boot_recovery_${System.currentTimeMillis()}", |
||||
|
kind = "boot_recovery", |
||||
|
occurredAt = System.currentTimeMillis(), |
||||
|
outcome = "success", |
||||
|
diagJson = "{\"schedules_rescheduled\": ${enabledSchedules.size}}" |
||||
|
) |
||||
|
) |
||||
|
} catch (e: Exception) { |
||||
|
Log.e(TAG, "Failed to record boot recovery", e) |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
private fun calculateNextRunTime(schedule: Schedule): Long { |
||||
|
val now = System.currentTimeMillis() |
||||
|
|
||||
|
// Simple implementation - for production, use proper cron parsing |
||||
|
return when { |
||||
|
schedule.cron != null -> { |
||||
|
// Parse cron expression and calculate next run |
||||
|
// For now, return next day at 9 AM |
||||
|
now + (24 * 60 * 60 * 1000L) |
||||
|
} |
||||
|
schedule.clockTime != null -> { |
||||
|
// Parse HH:mm and calculate next run |
||||
|
// For now, return next day at specified time |
||||
|
now + (24 * 60 * 60 * 1000L) |
||||
|
} |
||||
|
else -> { |
||||
|
// Default to next day at 9 AM |
||||
|
now + (24 * 60 * 60 * 1000L) |
||||
|
} |
||||
|
} |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
/** |
||||
|
* Data classes for configuration (simplified versions) |
||||
|
*/ |
||||
|
data class ContentFetchConfig( |
||||
|
val enabled: Boolean, |
||||
|
val schedule: String, |
||||
|
val url: String? = null, |
||||
|
val timeout: Int? = null, |
||||
|
val retryAttempts: Int? = null, |
||||
|
val retryDelay: Int? = null, |
||||
|
val callbacks: CallbackConfig |
||||
|
) |
||||
|
|
||||
|
data class UserNotificationConfig( |
||||
|
val enabled: Boolean, |
||||
|
val schedule: String, |
||||
|
val title: String? = null, |
||||
|
val body: String? = null, |
||||
|
val sound: Boolean? = null, |
||||
|
val vibration: Boolean? = null, |
||||
|
val priority: String? = null |
||||
|
) |
||||
|
|
||||
|
data class CallbackConfig( |
||||
|
val apiService: String? = null, |
||||
|
val database: String? = null, |
||||
|
val reporting: String? = null |
||||
|
) |
@ -0,0 +1,294 @@ |
|||||
|
package com.timesafari.dailynotification |
||||
|
|
||||
|
import android.content.Context |
||||
|
import android.util.Log |
||||
|
import com.getcapacitor.JSObject |
||||
|
import com.getcapacitor.Plugin |
||||
|
import com.getcapacitor.PluginCall |
||||
|
import com.getcapacitor.PluginMethod |
||||
|
import com.getcapacitor.annotation.CapacitorPlugin |
||||
|
import kotlinx.coroutines.CoroutineScope |
||||
|
import kotlinx.coroutines.Dispatchers |
||||
|
import kotlinx.coroutines.launch |
||||
|
import org.json.JSONObject |
||||
|
|
||||
|
/** |
||||
|
* Main Android implementation of Daily Notification Plugin |
||||
|
* Bridges Capacitor calls to native Android functionality |
||||
|
* |
||||
|
* @author Matthew Raymer |
||||
|
* @version 1.1.0 |
||||
|
*/ |
||||
|
@CapacitorPlugin(name = "DailyNotification") |
||||
|
class DailyNotificationPlugin : Plugin() { |
||||
|
|
||||
|
companion object { |
||||
|
private const val TAG = "DNP-PLUGIN" |
||||
|
} |
||||
|
|
||||
|
private lateinit var db: DailyNotificationDatabase |
||||
|
|
||||
|
override fun load() { |
||||
|
super.load() |
||||
|
db = DailyNotificationDatabase.getDatabase(context) |
||||
|
Log.i(TAG, "Daily Notification Plugin loaded") |
||||
|
} |
||||
|
|
||||
|
@PluginMethod |
||||
|
fun configure(call: PluginCall) { |
||||
|
try { |
||||
|
val options = call.getObject("options") |
||||
|
Log.i(TAG, "Configure called with options: $options") |
||||
|
|
||||
|
// Store configuration in database |
||||
|
CoroutineScope(Dispatchers.IO).launch { |
||||
|
try { |
||||
|
// Implementation would store config in database |
||||
|
call.resolve() |
||||
|
} catch (e: Exception) { |
||||
|
Log.e(TAG, "Failed to configure", e) |
||||
|
call.reject("Configuration failed: ${e.message}") |
||||
|
} |
||||
|
} |
||||
|
} catch (e: Exception) { |
||||
|
Log.e(TAG, "Configure error", e) |
||||
|
call.reject("Configuration error: ${e.message}") |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
@PluginMethod |
||||
|
fun scheduleContentFetch(call: PluginCall) { |
||||
|
try { |
||||
|
val configJson = call.getObject("config") |
||||
|
val config = parseContentFetchConfig(configJson) |
||||
|
|
||||
|
Log.i(TAG, "Scheduling content fetch") |
||||
|
|
||||
|
CoroutineScope(Dispatchers.IO).launch { |
||||
|
try { |
||||
|
// Schedule WorkManager fetch |
||||
|
FetchWorker.scheduleFetch(context, config) |
||||
|
|
||||
|
// Store schedule in database |
||||
|
val schedule = Schedule( |
||||
|
id = "fetch_${System.currentTimeMillis()}", |
||||
|
kind = "fetch", |
||||
|
cron = config.schedule, |
||||
|
enabled = config.enabled, |
||||
|
nextRunAt = calculateNextRunTime(config.schedule) |
||||
|
) |
||||
|
db.scheduleDao().upsert(schedule) |
||||
|
|
||||
|
call.resolve() |
||||
|
} catch (e: Exception) { |
||||
|
Log.e(TAG, "Failed to schedule content fetch", e) |
||||
|
call.reject("Content fetch scheduling failed: ${e.message}") |
||||
|
} |
||||
|
} |
||||
|
} catch (e: Exception) { |
||||
|
Log.e(TAG, "Schedule content fetch error", e) |
||||
|
call.reject("Content fetch error: ${e.message}") |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
@PluginMethod |
||||
|
fun scheduleUserNotification(call: PluginCall) { |
||||
|
try { |
||||
|
val configJson = call.getObject("config") |
||||
|
val config = parseUserNotificationConfig(configJson) |
||||
|
|
||||
|
Log.i(TAG, "Scheduling user notification") |
||||
|
|
||||
|
CoroutineScope(Dispatchers.IO).launch { |
||||
|
try { |
||||
|
val nextRunTime = calculateNextRunTime(config.schedule) |
||||
|
|
||||
|
// Schedule AlarmManager notification |
||||
|
NotifyReceiver.scheduleExactNotification(context, nextRunTime, config) |
||||
|
|
||||
|
// Store schedule in database |
||||
|
val schedule = Schedule( |
||||
|
id = "notify_${System.currentTimeMillis()}", |
||||
|
kind = "notify", |
||||
|
cron = config.schedule, |
||||
|
enabled = config.enabled, |
||||
|
nextRunAt = nextRunTime |
||||
|
) |
||||
|
db.scheduleDao().upsert(schedule) |
||||
|
|
||||
|
call.resolve() |
||||
|
} catch (e: Exception) { |
||||
|
Log.e(TAG, "Failed to schedule user notification", e) |
||||
|
call.reject("User notification scheduling failed: ${e.message}") |
||||
|
} |
||||
|
} |
||||
|
} catch (e: Exception) { |
||||
|
Log.e(TAG, "Schedule user notification error", e) |
||||
|
call.reject("User notification error: ${e.message}") |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
@PluginMethod |
||||
|
fun scheduleDualNotification(call: PluginCall) { |
||||
|
try { |
||||
|
val configJson = call.getObject("config") |
||||
|
val contentFetchConfig = parseContentFetchConfig(configJson.getObject("contentFetch")) |
||||
|
val userNotificationConfig = parseUserNotificationConfig(configJson.getObject("userNotification")) |
||||
|
|
||||
|
Log.i(TAG, "Scheduling dual notification") |
||||
|
|
||||
|
CoroutineScope(Dispatchers.IO).launch { |
||||
|
try { |
||||
|
// Schedule both fetch and notification |
||||
|
FetchWorker.scheduleFetch(context, contentFetchConfig) |
||||
|
|
||||
|
val nextRunTime = calculateNextRunTime(userNotificationConfig.schedule) |
||||
|
NotifyReceiver.scheduleExactNotification(context, nextRunTime, userNotificationConfig) |
||||
|
|
||||
|
// Store both schedules |
||||
|
val fetchSchedule = Schedule( |
||||
|
id = "dual_fetch_${System.currentTimeMillis()}", |
||||
|
kind = "fetch", |
||||
|
cron = contentFetchConfig.schedule, |
||||
|
enabled = contentFetchConfig.enabled, |
||||
|
nextRunAt = calculateNextRunTime(contentFetchConfig.schedule) |
||||
|
) |
||||
|
val notifySchedule = Schedule( |
||||
|
id = "dual_notify_${System.currentTimeMillis()}", |
||||
|
kind = "notify", |
||||
|
cron = userNotificationConfig.schedule, |
||||
|
enabled = userNotificationConfig.enabled, |
||||
|
nextRunAt = nextRunTime |
||||
|
) |
||||
|
|
||||
|
db.scheduleDao().upsert(fetchSchedule) |
||||
|
db.scheduleDao().upsert(notifySchedule) |
||||
|
|
||||
|
call.resolve() |
||||
|
} catch (e: Exception) { |
||||
|
Log.e(TAG, "Failed to schedule dual notification", e) |
||||
|
call.reject("Dual notification scheduling failed: ${e.message}") |
||||
|
} |
||||
|
} |
||||
|
} catch (e: Exception) { |
||||
|
Log.e(TAG, "Schedule dual notification error", e) |
||||
|
call.reject("Dual notification error: ${e.message}") |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
@PluginMethod |
||||
|
fun getDualScheduleStatus(call: PluginCall) { |
||||
|
CoroutineScope(Dispatchers.IO).launch { |
||||
|
try { |
||||
|
val enabledSchedules = db.scheduleDao().getEnabled() |
||||
|
val latestCache = db.contentCacheDao().getLatest() |
||||
|
val recentHistory = db.historyDao().getSince(System.currentTimeMillis() - (24 * 60 * 60 * 1000L)) |
||||
|
|
||||
|
val status = JSObject().apply { |
||||
|
put("nextRuns", enabledSchedules.map { it.nextRunAt }) |
||||
|
put("lastOutcomes", recentHistory.map { it.outcome }) |
||||
|
put("cacheAgeMs", latestCache?.let { System.currentTimeMillis() - it.fetchedAt }) |
||||
|
put("staleArmed", latestCache?.let { |
||||
|
System.currentTimeMillis() > (it.fetchedAt + it.ttlSeconds * 1000L) |
||||
|
} ?: true) |
||||
|
put("queueDepth", recentHistory.size) |
||||
|
} |
||||
|
|
||||
|
call.resolve(status) |
||||
|
} catch (e: Exception) { |
||||
|
Log.e(TAG, "Failed to get dual schedule status", e) |
||||
|
call.reject("Status retrieval failed: ${e.message}") |
||||
|
} |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
@PluginMethod |
||||
|
fun registerCallback(call: PluginCall) { |
||||
|
try { |
||||
|
val name = call.getString("name") |
||||
|
val callback = call.getObject("callback") |
||||
|
|
||||
|
Log.i(TAG, "Registering callback: $name") |
||||
|
|
||||
|
CoroutineScope(Dispatchers.IO).launch { |
||||
|
try { |
||||
|
val callbackRecord = Callback( |
||||
|
id = name, |
||||
|
kind = callback.getString("kind", "local"), |
||||
|
target = callback.getString("target", ""), |
||||
|
headersJson = callback.getString("headers"), |
||||
|
enabled = true, |
||||
|
createdAt = System.currentTimeMillis() |
||||
|
) |
||||
|
|
||||
|
db.callbackDao().upsert(callbackRecord) |
||||
|
call.resolve() |
||||
|
} catch (e: Exception) { |
||||
|
Log.e(TAG, "Failed to register callback", e) |
||||
|
call.reject("Callback registration failed: ${e.message}") |
||||
|
} |
||||
|
} |
||||
|
} catch (e: Exception) { |
||||
|
Log.e(TAG, "Register callback error", e) |
||||
|
call.reject("Callback registration error: ${e.message}") |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
@PluginMethod |
||||
|
fun getContentCache(call: PluginCall) { |
||||
|
CoroutineScope(Dispatchers.IO).launch { |
||||
|
try { |
||||
|
val latestCache = db.contentCacheDao().getLatest() |
||||
|
val result = JSObject() |
||||
|
|
||||
|
if (latestCache != null) { |
||||
|
result.put("id", latestCache.id) |
||||
|
result.put("fetchedAt", latestCache.fetchedAt) |
||||
|
result.put("ttlSeconds", latestCache.ttlSeconds) |
||||
|
result.put("payload", String(latestCache.payload)) |
||||
|
result.put("meta", latestCache.meta) |
||||
|
} |
||||
|
|
||||
|
call.resolve(result) |
||||
|
} catch (e: Exception) { |
||||
|
Log.e(TAG, "Failed to get content cache", e) |
||||
|
call.reject("Content cache retrieval failed: ${e.message}") |
||||
|
} |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
// Helper methods |
||||
|
private fun parseContentFetchConfig(configJson: JSObject): ContentFetchConfig { |
||||
|
return ContentFetchConfig( |
||||
|
enabled = configJson.getBoolean("enabled", true), |
||||
|
schedule = configJson.getString("schedule", "0 9 * * *"), |
||||
|
url = configJson.getString("url"), |
||||
|
timeout = configJson.getInt("timeout"), |
||||
|
retryAttempts = configJson.getInt("retryAttempts"), |
||||
|
retryDelay = configJson.getInt("retryDelay"), |
||||
|
callbacks = CallbackConfig( |
||||
|
apiService = configJson.getObject("callbacks")?.getString("apiService"), |
||||
|
database = configJson.getObject("callbacks")?.getString("database"), |
||||
|
reporting = configJson.getObject("callbacks")?.getString("reporting") |
||||
|
) |
||||
|
) |
||||
|
} |
||||
|
|
||||
|
private fun parseUserNotificationConfig(configJson: JSObject): UserNotificationConfig { |
||||
|
return UserNotificationConfig( |
||||
|
enabled = configJson.getBoolean("enabled", true), |
||||
|
schedule = configJson.getString("schedule", "0 9 * * *"), |
||||
|
title = configJson.getString("title"), |
||||
|
body = configJson.getString("body"), |
||||
|
sound = configJson.getBoolean("sound"), |
||||
|
vibration = configJson.getBoolean("vibration"), |
||||
|
priority = configJson.getString("priority") |
||||
|
) |
||||
|
} |
||||
|
|
||||
|
private fun calculateNextRunTime(schedule: String): Long { |
||||
|
// Simple implementation - for production, use proper cron parsing |
||||
|
val now = System.currentTimeMillis() |
||||
|
return now + (24 * 60 * 60 * 1000L) // Next day |
||||
|
} |
||||
|
} |
@ -0,0 +1,144 @@ |
|||||
|
package com.timesafari.dailynotification |
||||
|
|
||||
|
import androidx.room.* |
||||
|
import androidx.room.migration.Migration |
||||
|
import androidx.sqlite.db.SupportSQLiteDatabase |
||||
|
|
||||
|
/** |
||||
|
* SQLite schema for Daily Notification Plugin |
||||
|
* Implements TTL-at-fire invariant and rolling window armed design |
||||
|
* |
||||
|
* @author Matthew Raymer |
||||
|
* @version 1.1.0 |
||||
|
*/ |
||||
|
@Entity(tableName = "content_cache") |
||||
|
data class ContentCache( |
||||
|
@PrimaryKey val id: String, |
||||
|
val fetchedAt: Long, // epoch ms |
||||
|
val ttlSeconds: Int, |
||||
|
val payload: ByteArray, // BLOB |
||||
|
val meta: String? = null |
||||
|
) |
||||
|
|
||||
|
@Entity(tableName = "schedules") |
||||
|
data class Schedule( |
||||
|
@PrimaryKey val id: String, |
||||
|
val kind: String, // 'fetch' or 'notify' |
||||
|
val cron: String? = null, // optional cron expression |
||||
|
val clockTime: String? = null, // optional HH:mm |
||||
|
val enabled: Boolean = true, |
||||
|
val lastRunAt: Long? = null, |
||||
|
val nextRunAt: Long? = null, |
||||
|
val jitterMs: Int = 0, |
||||
|
val backoffPolicy: String = "exp", |
||||
|
val stateJson: String? = null |
||||
|
) |
||||
|
|
||||
|
@Entity(tableName = "callbacks") |
||||
|
data class Callback( |
||||
|
@PrimaryKey val id: String, |
||||
|
val kind: String, // 'http', 'local', 'queue' |
||||
|
val target: String, // url_or_local |
||||
|
val headersJson: String? = null, |
||||
|
val enabled: Boolean = true, |
||||
|
val createdAt: Long |
||||
|
) |
||||
|
|
||||
|
@Entity(tableName = "history") |
||||
|
data class History( |
||||
|
@PrimaryKey(autoGenerate = true) val id: Int = 0, |
||||
|
val refId: String, // content or schedule id |
||||
|
val kind: String, // fetch/notify/callback |
||||
|
val occurredAt: Long, |
||||
|
val durationMs: Long? = null, |
||||
|
val outcome: String, // success|failure|skipped_ttl|circuit_open |
||||
|
val diagJson: String? = null |
||||
|
) |
||||
|
|
||||
|
@Database( |
||||
|
entities = [ContentCache::class, Schedule::class, Callback::class, History::class], |
||||
|
version = 1, |
||||
|
exportSchema = false |
||||
|
) |
||||
|
@TypeConverters(Converters::class) |
||||
|
abstract class DailyNotificationDatabase : RoomDatabase() { |
||||
|
abstract fun contentCacheDao(): ContentCacheDao |
||||
|
abstract fun scheduleDao(): ScheduleDao |
||||
|
abstract fun callbackDao(): CallbackDao |
||||
|
abstract fun historyDao(): HistoryDao |
||||
|
} |
||||
|
|
||||
|
@Dao |
||||
|
interface ContentCacheDao { |
||||
|
@Query("SELECT * FROM content_cache WHERE id = :id") |
||||
|
suspend fun getById(id: String): ContentCache? |
||||
|
|
||||
|
@Query("SELECT * FROM content_cache ORDER BY fetchedAt DESC LIMIT 1") |
||||
|
suspend fun getLatest(): ContentCache? |
||||
|
|
||||
|
@Insert(onConflict = OnConflictStrategy.REPLACE) |
||||
|
suspend fun upsert(contentCache: ContentCache) |
||||
|
|
||||
|
@Query("DELETE FROM content_cache WHERE fetchedAt < :cutoffTime") |
||||
|
suspend fun deleteOlderThan(cutoffTime: Long) |
||||
|
|
||||
|
@Query("SELECT COUNT(*) FROM content_cache") |
||||
|
suspend fun getCount(): Int |
||||
|
} |
||||
|
|
||||
|
@Dao |
||||
|
interface ScheduleDao { |
||||
|
@Query("SELECT * FROM schedules WHERE enabled = 1") |
||||
|
suspend fun getEnabled(): List<Schedule> |
||||
|
|
||||
|
@Query("SELECT * FROM schedules WHERE id = :id") |
||||
|
suspend fun getById(id: String): Schedule? |
||||
|
|
||||
|
@Insert(onConflict = OnConflictStrategy.REPLACE) |
||||
|
suspend fun upsert(schedule: Schedule) |
||||
|
|
||||
|
@Query("UPDATE schedules SET enabled = :enabled WHERE id = :id") |
||||
|
suspend fun setEnabled(id: String, enabled: Boolean) |
||||
|
|
||||
|
@Query("UPDATE schedules SET lastRunAt = :lastRunAt, nextRunAt = :nextRunAt WHERE id = :id") |
||||
|
suspend fun updateRunTimes(id: String, lastRunAt: Long?, nextRunAt: Long?) |
||||
|
} |
||||
|
|
||||
|
@Dao |
||||
|
interface CallbackDao { |
||||
|
@Query("SELECT * FROM callbacks WHERE enabled = 1") |
||||
|
suspend fun getEnabled(): List<Callback> |
||||
|
|
||||
|
@Insert(onConflict = OnConflictStrategy.REPLACE) |
||||
|
suspend fun upsert(callback: Callback) |
||||
|
|
||||
|
@Query("DELETE FROM callbacks WHERE id = :id") |
||||
|
suspend fun deleteById(id: String) |
||||
|
} |
||||
|
|
||||
|
@Dao |
||||
|
interface HistoryDao { |
||||
|
@Insert |
||||
|
suspend fun insert(history: History) |
||||
|
|
||||
|
@Query("SELECT * FROM history WHERE occurredAt >= :since ORDER BY occurredAt DESC") |
||||
|
suspend fun getSince(since: Long): List<History> |
||||
|
|
||||
|
@Query("DELETE FROM history WHERE occurredAt < :cutoffTime") |
||||
|
suspend fun deleteOlderThan(cutoffTime: Long) |
||||
|
|
||||
|
@Query("SELECT COUNT(*) FROM history") |
||||
|
suspend fun getCount(): Int |
||||
|
} |
||||
|
|
||||
|
class Converters { |
||||
|
@TypeConverter |
||||
|
fun fromByteArray(value: ByteArray?): String? { |
||||
|
return value?.let { String(it) } |
||||
|
} |
||||
|
|
||||
|
@TypeConverter |
||||
|
fun toByteArray(value: String?): ByteArray? { |
||||
|
return value?.toByteArray() |
||||
|
} |
||||
|
} |
@ -0,0 +1,202 @@ |
|||||
|
package com.timesafari.dailynotification |
||||
|
|
||||
|
import android.content.Context |
||||
|
import android.util.Log |
||||
|
import androidx.work.* |
||||
|
import kotlinx.coroutines.Dispatchers |
||||
|
import kotlinx.coroutines.withContext |
||||
|
import java.io.IOException |
||||
|
import java.net.HttpURLConnection |
||||
|
import java.net.URL |
||||
|
import java.util.concurrent.TimeUnit |
||||
|
|
||||
|
/** |
||||
|
* WorkManager implementation for content fetching |
||||
|
* Implements exponential backoff and network constraints |
||||
|
* |
||||
|
* @author Matthew Raymer |
||||
|
* @version 1.1.0 |
||||
|
*/ |
||||
|
class FetchWorker( |
||||
|
appContext: Context, |
||||
|
workerParams: WorkerParameters |
||||
|
) : CoroutineWorker(appContext, workerParams) { |
||||
|
|
||||
|
companion object { |
||||
|
private const val TAG = "DNP-FETCH" |
||||
|
private const val WORK_NAME = "fetch_content" |
||||
|
|
||||
|
fun scheduleFetch(context: Context, config: ContentFetchConfig) { |
||||
|
val constraints = Constraints.Builder() |
||||
|
.setRequiredNetworkType(NetworkType.CONNECTED) |
||||
|
.build() |
||||
|
|
||||
|
val workRequest = OneTimeWorkRequestBuilder<FetchWorker>() |
||||
|
.setConstraints(constraints) |
||||
|
.setBackoffCriteria( |
||||
|
BackoffPolicy.EXPONENTIAL, |
||||
|
30, |
||||
|
TimeUnit.SECONDS |
||||
|
) |
||||
|
.setInputData( |
||||
|
Data.Builder() |
||||
|
.putString("url", config.url) |
||||
|
.putString("headers", config.headers?.toString()) |
||||
|
.putInt("timeout", config.timeout ?: 30000) |
||||
|
.putInt("retryAttempts", config.retryAttempts ?: 3) |
||||
|
.putInt("retryDelay", config.retryDelay ?: 1000) |
||||
|
.build() |
||||
|
) |
||||
|
.build() |
||||
|
|
||||
|
WorkManager.getInstance(context) |
||||
|
.enqueueUniqueWork( |
||||
|
WORK_NAME, |
||||
|
ExistingWorkPolicy.REPLACE, |
||||
|
workRequest |
||||
|
) |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
override suspend fun doWork(): Result = withContext(Dispatchers.IO) { |
||||
|
val start = SystemClock.elapsedRealtime() |
||||
|
val url = inputData.getString("url") |
||||
|
val timeout = inputData.getInt("timeout", 30000) |
||||
|
val retryAttempts = inputData.getInt("retryAttempts", 3) |
||||
|
val retryDelay = inputData.getInt("retryDelay", 1000) |
||||
|
|
||||
|
try { |
||||
|
Log.i(TAG, "Starting content fetch from: $url") |
||||
|
|
||||
|
val payload = fetchContent(url, timeout, retryAttempts, retryDelay) |
||||
|
val contentCache = ContentCache( |
||||
|
id = generateId(), |
||||
|
fetchedAt = System.currentTimeMillis(), |
||||
|
ttlSeconds = 3600, // 1 hour default TTL |
||||
|
payload = payload, |
||||
|
meta = "fetched_by_workmanager" |
||||
|
) |
||||
|
|
||||
|
// Store in database |
||||
|
val db = DailyNotificationDatabase.getDatabase(applicationContext) |
||||
|
db.contentCacheDao().upsert(contentCache) |
||||
|
|
||||
|
// Record success in history |
||||
|
db.historyDao().insert( |
||||
|
History( |
||||
|
refId = contentCache.id, |
||||
|
kind = "fetch", |
||||
|
occurredAt = System.currentTimeMillis(), |
||||
|
durationMs = SystemClock.elapsedRealtime() - start, |
||||
|
outcome = "success" |
||||
|
) |
||||
|
) |
||||
|
|
||||
|
Log.i(TAG, "Content fetch completed successfully") |
||||
|
Result.success() |
||||
|
|
||||
|
} catch (e: IOException) { |
||||
|
Log.w(TAG, "Network error during fetch", e) |
||||
|
recordFailure("network_error", start, e) |
||||
|
Result.retry() |
||||
|
|
||||
|
} catch (e: Exception) { |
||||
|
Log.e(TAG, "Unexpected error during fetch", e) |
||||
|
recordFailure("unexpected_error", start, e) |
||||
|
Result.failure() |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
private suspend fun fetchContent( |
||||
|
url: String?, |
||||
|
timeout: Int, |
||||
|
retryAttempts: Int, |
||||
|
retryDelay: Int |
||||
|
): ByteArray { |
||||
|
if (url.isNullOrBlank()) { |
||||
|
// Generate mock content for testing |
||||
|
return generateMockContent() |
||||
|
} |
||||
|
|
||||
|
var lastException: Exception? = null |
||||
|
|
||||
|
repeat(retryAttempts) { attempt -> |
||||
|
try { |
||||
|
val connection = URL(url).openConnection() as HttpURLConnection |
||||
|
connection.connectTimeout = timeout |
||||
|
connection.readTimeout = timeout |
||||
|
connection.requestMethod = "GET" |
||||
|
|
||||
|
val responseCode = connection.responseCode |
||||
|
if (responseCode == HttpURLConnection.HTTP_OK) { |
||||
|
return connection.inputStream.readBytes() |
||||
|
} else { |
||||
|
throw IOException("HTTP $responseCode: ${connection.responseMessage}") |
||||
|
} |
||||
|
|
||||
|
} catch (e: Exception) { |
||||
|
lastException = e |
||||
|
if (attempt < retryAttempts - 1) { |
||||
|
Log.w(TAG, "Fetch attempt ${attempt + 1} failed, retrying in ${retryDelay}ms", e) |
||||
|
kotlinx.coroutines.delay(retryDelay.toLong()) |
||||
|
} |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
throw lastException ?: IOException("All retry attempts failed") |
||||
|
} |
||||
|
|
||||
|
private fun generateMockContent(): ByteArray { |
||||
|
val mockData = """ |
||||
|
{ |
||||
|
"timestamp": ${System.currentTimeMillis()}, |
||||
|
"content": "Daily notification content", |
||||
|
"source": "mock_generator", |
||||
|
"version": "1.1.0" |
||||
|
} |
||||
|
""".trimIndent() |
||||
|
return mockData.toByteArray() |
||||
|
} |
||||
|
|
||||
|
private suspend fun recordFailure(outcome: String, start: Long, error: Throwable) { |
||||
|
try { |
||||
|
val db = DailyNotificationDatabase.getDatabase(applicationContext) |
||||
|
db.historyDao().insert( |
||||
|
History( |
||||
|
refId = "fetch_${System.currentTimeMillis()}", |
||||
|
kind = "fetch", |
||||
|
occurredAt = System.currentTimeMillis(), |
||||
|
durationMs = SystemClock.elapsedRealtime() - start, |
||||
|
outcome = outcome, |
||||
|
diagJson = "{\"error\": \"${error.message}\"}" |
||||
|
) |
||||
|
) |
||||
|
} catch (e: Exception) { |
||||
|
Log.e(TAG, "Failed to record failure", e) |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
private fun generateId(): String { |
||||
|
return "fetch_${System.currentTimeMillis()}_${(1000..9999).random()}" |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
/** |
||||
|
* Database singleton for Room |
||||
|
*/ |
||||
|
object DailyNotificationDatabase { |
||||
|
@Volatile |
||||
|
private var INSTANCE: DailyNotificationDatabase? = null |
||||
|
|
||||
|
fun getDatabase(context: Context): DailyNotificationDatabase { |
||||
|
return INSTANCE ?: synchronized(this) { |
||||
|
val instance = Room.databaseBuilder( |
||||
|
context.applicationContext, |
||||
|
DailyNotificationDatabase::class.java, |
||||
|
"daily_notification_database" |
||||
|
).build() |
||||
|
INSTANCE = instance |
||||
|
instance |
||||
|
} |
||||
|
} |
||||
|
} |
@ -0,0 +1,253 @@ |
|||||
|
package com.timesafari.dailynotification |
||||
|
|
||||
|
import android.app.AlarmManager |
||||
|
import android.app.NotificationChannel |
||||
|
import android.app.NotificationManager |
||||
|
import android.app.PendingIntent |
||||
|
import android.content.BroadcastReceiver |
||||
|
import android.content.Context |
||||
|
import android.content.Intent |
||||
|
import android.os.Build |
||||
|
import android.util.Log |
||||
|
import androidx.core.app.NotificationCompat |
||||
|
import kotlinx.coroutines.CoroutineScope |
||||
|
import kotlinx.coroutines.Dispatchers |
||||
|
import kotlinx.coroutines.launch |
||||
|
|
||||
|
/** |
||||
|
* AlarmManager implementation for user notifications |
||||
|
* Implements TTL-at-fire logic and notification delivery |
||||
|
* |
||||
|
* @author Matthew Raymer |
||||
|
* @version 1.1.0 |
||||
|
*/ |
||||
|
class NotifyReceiver : BroadcastReceiver() { |
||||
|
|
||||
|
companion object { |
||||
|
private const val TAG = "DNP-NOTIFY" |
||||
|
private const val CHANNEL_ID = "daily_notifications" |
||||
|
private const val NOTIFICATION_ID = 1001 |
||||
|
private const val REQUEST_CODE = 2001 |
||||
|
|
||||
|
fun scheduleExactNotification( |
||||
|
context: Context, |
||||
|
triggerAtMillis: Long, |
||||
|
config: UserNotificationConfig |
||||
|
) { |
||||
|
val alarmManager = context.getSystemService(Context.ALARM_SERVICE) as AlarmManager |
||||
|
val intent = Intent(context, NotifyReceiver::class.java).apply { |
||||
|
putExtra("title", config.title) |
||||
|
putExtra("body", config.body) |
||||
|
putExtra("sound", config.sound ?: true) |
||||
|
putExtra("vibration", config.vibration ?: true) |
||||
|
putExtra("priority", config.priority ?: "normal") |
||||
|
} |
||||
|
|
||||
|
val pendingIntent = PendingIntent.getBroadcast( |
||||
|
context, |
||||
|
REQUEST_CODE, |
||||
|
intent, |
||||
|
PendingIntent.FLAG_UPDATE_CURRENT or PendingIntent.FLAG_IMMUTABLE |
||||
|
) |
||||
|
|
||||
|
try { |
||||
|
if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.M) { |
||||
|
alarmManager.setExactAndAllowWhileIdle( |
||||
|
AlarmManager.RTC_WAKEUP, |
||||
|
triggerAtMillis, |
||||
|
pendingIntent |
||||
|
) |
||||
|
} else { |
||||
|
alarmManager.setExact( |
||||
|
AlarmManager.RTC_WAKEUP, |
||||
|
triggerAtMillis, |
||||
|
pendingIntent |
||||
|
) |
||||
|
} |
||||
|
Log.i(TAG, "Exact notification scheduled for: $triggerAtMillis") |
||||
|
} catch (e: SecurityException) { |
||||
|
Log.w(TAG, "Cannot schedule exact alarm, falling back to inexact", e) |
||||
|
alarmManager.set( |
||||
|
AlarmManager.RTC_WAKEUP, |
||||
|
triggerAtMillis, |
||||
|
pendingIntent |
||||
|
) |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
fun cancelNotification(context: Context) { |
||||
|
val alarmManager = context.getSystemService(Context.ALARM_SERVICE) as AlarmManager |
||||
|
val intent = Intent(context, NotifyReceiver::class.java) |
||||
|
val pendingIntent = PendingIntent.getBroadcast( |
||||
|
context, |
||||
|
REQUEST_CODE, |
||||
|
intent, |
||||
|
PendingIntent.FLAG_UPDATE_CURRENT or PendingIntent.FLAG_IMMUTABLE |
||||
|
) |
||||
|
alarmManager.cancel(pendingIntent) |
||||
|
Log.i(TAG, "Notification alarm cancelled") |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
override fun onReceive(context: Context, intent: Intent?) { |
||||
|
Log.i(TAG, "Notification receiver triggered") |
||||
|
|
||||
|
CoroutineScope(Dispatchers.IO).launch { |
||||
|
try { |
||||
|
val db = DailyNotificationDatabase.getDatabase(context) |
||||
|
val latestCache = db.contentCacheDao().getLatest() |
||||
|
|
||||
|
if (latestCache == null) { |
||||
|
Log.w(TAG, "No cached content available for notification") |
||||
|
recordHistory(db, "notify", "no_content") |
||||
|
return@launch |
||||
|
} |
||||
|
|
||||
|
// TTL-at-fire check |
||||
|
val now = System.currentTimeMillis() |
||||
|
val ttlExpiry = latestCache.fetchedAt + (latestCache.ttlSeconds * 1000L) |
||||
|
|
||||
|
if (now > ttlExpiry) { |
||||
|
Log.i(TAG, "Content TTL expired, skipping notification") |
||||
|
recordHistory(db, "notify", "skipped_ttl") |
||||
|
return@launch |
||||
|
} |
||||
|
|
||||
|
// Show notification |
||||
|
val title = intent?.getStringExtra("title") ?: "Daily Notification" |
||||
|
val body = intent?.getStringExtra("body") ?: String(latestCache.payload) |
||||
|
val sound = intent?.getBooleanExtra("sound", true) ?: true |
||||
|
val vibration = intent?.getBooleanExtra("vibration", true) ?: true |
||||
|
val priority = intent?.getStringExtra("priority") ?: "normal" |
||||
|
|
||||
|
showNotification(context, title, body, sound, vibration, priority) |
||||
|
recordHistory(db, "notify", "success") |
||||
|
|
||||
|
// Fire callbacks |
||||
|
fireCallbacks(context, db, "onNotifyDelivered", latestCache) |
||||
|
|
||||
|
} catch (e: Exception) { |
||||
|
Log.e(TAG, "Error in notification receiver", e) |
||||
|
try { |
||||
|
val db = DailyNotificationDatabase.getDatabase(context) |
||||
|
recordHistory(db, "notify", "failure", e.message) |
||||
|
} catch (dbError: Exception) { |
||||
|
Log.e(TAG, "Failed to record notification failure", dbError) |
||||
|
} |
||||
|
} |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
private fun showNotification( |
||||
|
context: Context, |
||||
|
title: String, |
||||
|
body: String, |
||||
|
sound: Boolean, |
||||
|
vibration: Boolean, |
||||
|
priority: String |
||||
|
) { |
||||
|
val notificationManager = context.getSystemService(Context.NOTIFICATION_SERVICE) as NotificationManager |
||||
|
|
||||
|
// Create notification channel for Android 8.0+ |
||||
|
if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.O) { |
||||
|
val channel = NotificationChannel( |
||||
|
CHANNEL_ID, |
||||
|
"Daily Notifications", |
||||
|
when (priority) { |
||||
|
"high" -> NotificationManager.IMPORTANCE_HIGH |
||||
|
"low" -> NotificationManager.IMPORTANCE_LOW |
||||
|
else -> NotificationManager.IMPORTANCE_DEFAULT |
||||
|
} |
||||
|
).apply { |
||||
|
enableVibration(vibration) |
||||
|
if (!sound) { |
||||
|
setSound(null, null) |
||||
|
} |
||||
|
} |
||||
|
notificationManager.createNotificationChannel(channel) |
||||
|
} |
||||
|
|
||||
|
val notification = NotificationCompat.Builder(context, CHANNEL_ID) |
||||
|
.setContentTitle(title) |
||||
|
.setContentText(body) |
||||
|
.setSmallIcon(android.R.drawable.ic_dialog_info) |
||||
|
.setPriority( |
||||
|
when (priority) { |
||||
|
"high" -> NotificationCompat.PRIORITY_HIGH |
||||
|
"low" -> NotificationCompat.PRIORITY_LOW |
||||
|
else -> NotificationCompat.PRIORITY_DEFAULT |
||||
|
} |
||||
|
) |
||||
|
.setAutoCancel(true) |
||||
|
.setVibrate(if (vibration) longArrayOf(0, 250, 250, 250) else null) |
||||
|
.build() |
||||
|
|
||||
|
notificationManager.notify(NOTIFICATION_ID, notification) |
||||
|
Log.i(TAG, "Notification displayed: $title") |
||||
|
} |
||||
|
|
||||
|
private suspend fun recordHistory( |
||||
|
db: DailyNotificationDatabase, |
||||
|
kind: String, |
||||
|
outcome: String, |
||||
|
diagJson: String? = null |
||||
|
) { |
||||
|
try { |
||||
|
db.historyDao().insert( |
||||
|
History( |
||||
|
refId = "notify_${System.currentTimeMillis()}", |
||||
|
kind = kind, |
||||
|
occurredAt = System.currentTimeMillis(), |
||||
|
outcome = outcome, |
||||
|
diagJson = diagJson |
||||
|
) |
||||
|
) |
||||
|
} catch (e: Exception) { |
||||
|
Log.e(TAG, "Failed to record history", e) |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
private suspend fun fireCallbacks( |
||||
|
context: Context, |
||||
|
db: DailyNotificationDatabase, |
||||
|
eventType: String, |
||||
|
contentCache: ContentCache |
||||
|
) { |
||||
|
try { |
||||
|
val callbacks = db.callbackDao().getEnabled() |
||||
|
callbacks.forEach { callback -> |
||||
|
try { |
||||
|
when (callback.kind) { |
||||
|
"http" -> fireHttpCallback(callback, eventType, contentCache) |
||||
|
"local" -> fireLocalCallback(context, callback, eventType, contentCache) |
||||
|
else -> Log.w(TAG, "Unknown callback kind: ${callback.kind}") |
||||
|
} |
||||
|
} catch (e: Exception) { |
||||
|
Log.e(TAG, "Failed to fire callback ${callback.id}", e) |
||||
|
recordHistory(db, "callback", "failure", "{\"callback_id\": \"${callback.id}\", \"error\": \"${e.message}\"}") |
||||
|
} |
||||
|
} |
||||
|
} catch (e: Exception) { |
||||
|
Log.e(TAG, "Failed to fire callbacks", e) |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
private suspend fun fireHttpCallback( |
||||
|
callback: Callback, |
||||
|
eventType: String, |
||||
|
contentCache: ContentCache |
||||
|
) { |
||||
|
// HTTP callback implementation would go here |
||||
|
Log.i(TAG, "HTTP callback fired: ${callback.id} for event: $eventType") |
||||
|
} |
||||
|
|
||||
|
private suspend fun fireLocalCallback( |
||||
|
context: Context, |
||||
|
callback: Callback, |
||||
|
eventType: String, |
||||
|
contentCache: ContentCache |
||||
|
) { |
||||
|
// Local callback implementation would go here |
||||
|
Log.i(TAG, "Local callback fired: ${callback.id} for event: $eventType") |
||||
|
} |
||||
|
} |
@ -0,0 +1,255 @@ |
|||||
|
|
||||
|
# Daily Notification Plugin — Phase 2 Recommendations (v3) |
||||
|
|
||||
|
> This directive assumes Phase 1 (API surface + tests) is complete and aligns with the current codebase. It focuses on **platform implementations**, **storage/TTL**, **callbacks**, **observability**, and **security**. |
||||
|
|
||||
|
--- |
||||
|
|
||||
|
## 1) Milestones & Order of Work |
||||
|
|
||||
|
1. **Android Core (Week 1–2)** |
||||
|
- Fetch: WorkManager (`Constraints: NETWORK_CONNECTED`, backoff: exponential) |
||||
|
- Notify: AlarmManager (or Exact alarms if permitted), NotificationManager |
||||
|
- Boot resilience: `RECEIVE_BOOT_COMPLETED` receiver reschedules jobs |
||||
|
- Shared SQLite schema + DAO layer (Room recommended) |
||||
|
2. **Callback Registry (Week 2)** — shared TS interface + native bridges |
||||
|
3. **Observability & Health (Week 2–3)** — event codes, status endpoints, history compaction |
||||
|
4. **iOS Parity (Week 3–4)** — BGTaskScheduler + UNUserNotificationCenter |
||||
|
5. **Web SW/Push (Week 4)** — SW events + IndexedDB (mirror schema), periodic sync fallback |
||||
|
6. **Docs & Examples (Week 4)** — migration, enterprise callbacks, health dashboards |
||||
|
|
||||
|
--- |
||||
|
|
||||
|
## 2) Storage & TTL — Concrete Schema |
||||
|
|
||||
|
> Keep **TTL-at-fire** invariant and **rolling window armed**. Use normalized tables and a minimal DAO. |
||||
|
|
||||
|
### SQLite (DDL) |
||||
|
|
||||
|
```sql |
||||
|
CREATE TABLE IF NOT EXISTS content_cache ( |
||||
|
id TEXT PRIMARY KEY, |
||||
|
fetched_at INTEGER NOT NULL, -- epoch ms |
||||
|
ttl_seconds INTEGER NOT NULL, |
||||
|
payload BLOB NOT NULL, |
||||
|
meta TEXT |
||||
|
); |
||||
|
|
||||
|
CREATE TABLE IF NOT EXISTS schedules ( |
||||
|
id TEXT PRIMARY KEY, |
||||
|
kind TEXT NOT NULL CHECK (kind IN ('fetch','notify')), |
||||
|
cron TEXT, -- optional: cron expression |
||||
|
clock_time TEXT, -- optional: HH:mm |
||||
|
enabled INTEGER NOT NULL DEFAULT 1, |
||||
|
last_run_at INTEGER, |
||||
|
next_run_at INTEGER, |
||||
|
jitter_ms INTEGER DEFAULT 0, |
||||
|
backoff_policy TEXT DEFAULT 'exp', |
||||
|
state_json TEXT |
||||
|
); |
||||
|
|
||||
|
CREATE TABLE IF NOT EXISTS callbacks ( |
||||
|
id TEXT PRIMARY KEY, |
||||
|
kind TEXT NOT NULL CHECK (kind IN ('http','local','queue')), |
||||
|
target TEXT NOT NULL, -- url_or_local |
||||
|
headers_json TEXT, |
||||
|
enabled INTEGER NOT NULL DEFAULT 1, |
||||
|
created_at INTEGER NOT NULL |
||||
|
); |
||||
|
|
||||
|
CREATE TABLE IF NOT EXISTS history ( |
||||
|
id INTEGER PRIMARY KEY AUTOINCREMENT, |
||||
|
ref_id TEXT, -- content or schedule id |
||||
|
kind TEXT NOT NULL, -- fetch/notify/callback |
||||
|
occurred_at INTEGER NOT NULL, |
||||
|
duration_ms INTEGER, |
||||
|
outcome TEXT NOT NULL, -- success|failure|skipped_ttl|circuit_open |
||||
|
diag_json TEXT |
||||
|
); |
||||
|
|
||||
|
CREATE INDEX IF NOT EXISTS idx_history_time ON history(occurred_at); |
||||
|
CREATE INDEX IF NOT EXISTS idx_cache_time ON content_cache(fetched_at); |
||||
|
``` |
||||
|
|
||||
|
### TTL-at-fire Rule |
||||
|
|
||||
|
- On notification fire: `if (now > fetched_at + ttl_seconds) -> skip (record outcome=skipped_ttl)`. |
||||
|
- Maintain a **prep guarantee**: ensure a fresh cache entry for the next window even after failures (schedule a fetch on next window). |
||||
|
|
||||
|
--- |
||||
|
|
||||
|
## 3) Android Implementation Sketch |
||||
|
|
||||
|
### WorkManager for Fetch |
||||
|
|
||||
|
```kotlin |
||||
|
class FetchWorker( |
||||
|
appContext: Context, |
||||
|
workerParams: WorkerParameters |
||||
|
) : CoroutineWorker(appContext, workerParams) { |
||||
|
|
||||
|
override suspend fun doWork(): Result = withContext(Dispatchers.IO) { |
||||
|
val start = SystemClock.elapsedRealtime() |
||||
|
try { |
||||
|
val payload = fetchContent() // http call / local generator |
||||
|
dao.upsertCache(ContentCache(...)) |
||||
|
logEvent("DNP-FETCH-SUCCESS", start) |
||||
|
Result.success() |
||||
|
} catch (e: IOException) { |
||||
|
logEvent("DNP-FETCH-FAILURE", start, e) |
||||
|
Result.retry() |
||||
|
} catch (e: Throwable) { |
||||
|
logEvent("DNP-FETCH-FAILURE", start, e) |
||||
|
Result.failure() |
||||
|
} |
||||
|
} |
||||
|
} |
||||
|
``` |
||||
|
|
||||
|
**Constraints**: `Constraints.Builder().setRequiredNetworkType(NetworkType.CONNECTED).build()` |
||||
|
**Backoff**: `setBackoffCriteria(BackoffPolicy.EXPONENTIAL, 30, TimeUnit.SECONDS)` |
||||
|
|
||||
|
### AlarmManager for Notify |
||||
|
|
||||
|
```kotlin |
||||
|
fun scheduleExactNotification(context: Context, triggerAtMillis: Long) { |
||||
|
val alarmMgr = context.getSystemService(Context.ALARM_SERVICE) as AlarmManager |
||||
|
val pi = PendingIntent.getBroadcast(context, REQ_ID, Intent(context, NotifyReceiver::class.java), FLAG_IMMUTABLE) |
||||
|
alarmMgr.setExactAndAllowWhileIdle(AlarmManager.RTC_WAKEUP, triggerAtMillis, pi) |
||||
|
} |
||||
|
|
||||
|
class NotifyReceiver : BroadcastReceiver() { |
||||
|
override fun onReceive(ctx: Context, intent: Intent?) { |
||||
|
val cache = dao.latestCache() |
||||
|
if (cache == null) return |
||||
|
if (System.currentTimeMillis() > cache.fetched_at + cache.ttl_seconds * 1000) { |
||||
|
recordHistory("notify", "skipped_ttl"); return |
||||
|
} |
||||
|
showNotification(ctx, cache) |
||||
|
recordHistory("notify", "success") |
||||
|
fireCallbacks("onNotifyDelivered") |
||||
|
} |
||||
|
} |
||||
|
``` |
||||
|
|
||||
|
### Boot Reschedule |
||||
|
|
||||
|
- Manifest: `RECEIVE_BOOT_COMPLETED` |
||||
|
- On boot: read `schedules.enabled=1` and re-schedule WorkManager/AlarmManager |
||||
|
|
||||
|
--- |
||||
|
|
||||
|
## 4) Callback Registry — Minimal Viable Implementation |
||||
|
|
||||
|
### TS Core |
||||
|
|
||||
|
```ts |
||||
|
export type CallbackKind = 'http' | 'local' | 'queue'; |
||||
|
|
||||
|
export interface CallbackEvent { |
||||
|
id: string; |
||||
|
at: number; |
||||
|
type: 'onFetchStart' | 'onFetchSuccess' | 'onFetchFailure' | |
||||
|
'onNotifyStart' | 'onNotifyDelivered' | 'onNotifySkippedTTL' | 'onNotifyFailure'; |
||||
|
payload?: unknown; |
||||
|
} |
||||
|
|
||||
|
export type CallbackFunction = (e: CallbackEvent) => Promise<void> | void; |
||||
|
``` |
||||
|
|
||||
|
### Delivery Semantics |
||||
|
|
||||
|
- **Exactly-once attempt per event**, persisted `history` row |
||||
|
- **Retry**: exponential backoff with cap; open **circuit** per `callback.id` on repeated failures |
||||
|
- **Redaction**: apply header/body redaction before persisting `diag_json` |
||||
|
|
||||
|
### HTTP Example |
||||
|
|
||||
|
```ts |
||||
|
async function deliverHttpCallback(cb: CallbackRecord, event: CallbackEvent) { |
||||
|
const start = performance.now(); |
||||
|
try { |
||||
|
const res = await fetch(cb.target, { |
||||
|
method: 'POST', |
||||
|
headers: { 'content-type': 'application/json', ...(cb.headers ?? {}) }, |
||||
|
body: JSON.stringify(event), |
||||
|
}); |
||||
|
recordHistory(cb.id, 'callback', 'success', start, { status: res.status }); |
||||
|
} catch (err) { |
||||
|
scheduleRetry(cb.id, event); // capped exponential |
||||
|
recordHistory(cb.id, 'callback', 'failure', start, { error: String(err) }); |
||||
|
} |
||||
|
} |
||||
|
``` |
||||
|
|
||||
|
--- |
||||
|
|
||||
|
## 5) Observability & Health |
||||
|
|
||||
|
- **Event Codes**: `DNP-FETCH-*`, `DNP-NOTIFY-*`, `DNP-CB-*` |
||||
|
- **Health API** (TS): `getDualScheduleStatus()` returns `{ nextRuns, lastOutcomes, cacheAgeMs, staleArmed, queueDepth }` |
||||
|
- **Compaction**: nightly job to prune `history` > 30 days |
||||
|
- **Device Debug**: Android broadcast to dump status to logcat for field diagnostics |
||||
|
|
||||
|
--- |
||||
|
|
||||
|
## 6) Security & Permissions |
||||
|
|
||||
|
- Default **HTTPS-only** callbacks, opt-out via explicit dev flag |
||||
|
- Android: runtime gate for `POST_NOTIFICATIONS`; show rationale UI for exact alarms (if requested) |
||||
|
- **PII/Secrets**: redact before persistence; never log tokens |
||||
|
- **Input Validation**: sanitize HTTP callback targets; enforce allowlist pattern (e.g., `https://*.yourdomain.tld` in prod) |
||||
|
|
||||
|
--- |
||||
|
|
||||
|
## 7) Performance & Battery |
||||
|
|
||||
|
- **±Jitter (5m)** for fetch; coalesce same-minute schedules |
||||
|
- **Retry Caps**: ≤ 5 attempts, upper bound 60 min backoff |
||||
|
- **Network Guards**: avoid waking when offline; use WorkManager constraints to defer |
||||
|
- **Back-Pressure**: cap concurrent callbacks; open circuit on sustained failures |
||||
|
|
||||
|
--- |
||||
|
|
||||
|
## 8) Tests You Can Add Now |
||||
|
|
||||
|
- **TTL Edge Cases**: past/future timezones, DST cutovers |
||||
|
- **Retry & Circuit**: force network failures, assert capped retries + circuit open |
||||
|
- **Boot Reschedule**: instrumentation test to simulate reboot and check re-arming |
||||
|
- **SW/IndexedDB**: headless test verifying cache write/read + TTL skip |
||||
|
|
||||
|
--- |
||||
|
|
||||
|
## 9) Documentation Tasks |
||||
|
|
||||
|
- API reference for new **health** and **callback** semantics |
||||
|
- Platform guides: Android exact alarm notes, iOS background limits, Web SW lifecycle |
||||
|
- Migration note: why `scheduleDualNotification` is preferred; compat wrappers policy |
||||
|
- “Runbook” for QA: how to toggle jitter/backoff; how to inspect `history` |
||||
|
|
||||
|
--- |
||||
|
|
||||
|
## 10) Acceptance Criteria (Phase 2) |
||||
|
|
||||
|
- Android end-to-end demo: fetch → cache → TTL check → notify → callback(s) → history |
||||
|
- Health endpoint returns non-null next run, recent outcomes, and cache age |
||||
|
- iOS parity path demonstrated on simulator (background fetch + local notif) |
||||
|
- Web SW functional on Chromium + Firefox with IndexedDB persistence |
||||
|
- Logs show structured `DNP-*` events; compaction reduces history size as configured |
||||
|
- Docs updated; examples build and run |
||||
|
|
||||
|
--- |
||||
|
|
||||
|
## 11) Risks & Mitigations |
||||
|
|
||||
|
- **Doze/Idle drops alarms** → prefer WorkManager + exact when allowed; add tolerance window |
||||
|
- **iOS background unpredictability** → encourage scheduled “fetch windows”; document silent-push optionality |
||||
|
- **Web Push unavailable** → periodic sync + foreground fallback; degrade gracefully |
||||
|
- **Callback storms** → batch events where possible; per-callback rate limit |
||||
|
|
||||
|
--- |
||||
|
|
||||
|
## 12) Versioning |
||||
|
|
||||
|
- Release as `1.1.0` when Android path merges; mark wrappers as **soft-deprecated** in docs |
||||
|
- Keep zero-padded doc versions in `/doc/` and release notes linking to them |
@ -0,0 +1,283 @@ |
|||||
|
/** |
||||
|
* Callback Registry Implementation |
||||
|
* Provides uniform callback lifecycle usable from any platform |
||||
|
* |
||||
|
* @author Matthew Raymer |
||||
|
* @version 1.1.0 |
||||
|
*/ |
||||
|
|
||||
|
export type CallbackKind = 'http' | 'local' | 'queue'; |
||||
|
|
||||
|
export interface CallbackEvent { |
||||
|
id: string; |
||||
|
at: number; |
||||
|
type: 'onFetchStart' | 'onFetchSuccess' | 'onFetchFailure' | |
||||
|
'onNotifyStart' | 'onNotifyDelivered' | 'onNotifySkippedTTL' | 'onNotifyFailure'; |
||||
|
payload?: unknown; |
||||
|
} |
||||
|
|
||||
|
export type CallbackFunction = (e: CallbackEvent) => Promise<void> | void; |
||||
|
|
||||
|
export interface CallbackRecord { |
||||
|
id: string; |
||||
|
kind: CallbackKind; |
||||
|
target: string; |
||||
|
headers?: Record<string, string>; |
||||
|
enabled: boolean; |
||||
|
createdAt: number; |
||||
|
retryCount?: number; |
||||
|
lastFailure?: number; |
||||
|
circuitOpen?: boolean; |
||||
|
} |
||||
|
|
||||
|
export interface CallbackRegistry { |
||||
|
register(id: string, callback: CallbackRecord): Promise<void>; |
||||
|
unregister(id: string): Promise<void>; |
||||
|
fire(event: CallbackEvent): Promise<void>; |
||||
|
getRegistered(): Promise<CallbackRecord[]>; |
||||
|
getStatus(): Promise<{ |
||||
|
total: number; |
||||
|
enabled: number; |
||||
|
circuitOpen: number; |
||||
|
lastActivity: number; |
||||
|
}>; |
||||
|
} |
||||
|
|
||||
|
/** |
||||
|
* Callback Registry Implementation |
||||
|
* Handles callback registration, delivery, and circuit breaker logic |
||||
|
*/ |
||||
|
export class CallbackRegistryImpl implements CallbackRegistry { |
||||
|
private callbacks = new Map<string, CallbackRecord>(); |
||||
|
private localCallbacks = new Map<string, CallbackFunction>(); |
||||
|
private retryQueue = new Map<string, CallbackEvent[]>(); |
||||
|
private circuitBreakers = new Map<string, { |
||||
|
failures: number; |
||||
|
lastFailure: number; |
||||
|
open: boolean; |
||||
|
}>(); |
||||
|
|
||||
|
constructor() { |
||||
|
this.startRetryProcessor(); |
||||
|
} |
||||
|
|
||||
|
async register(id: string, callback: CallbackRecord): Promise<void> { |
||||
|
this.callbacks.set(id, callback); |
||||
|
|
||||
|
// Initialize circuit breaker
|
||||
|
if (!this.circuitBreakers.has(id)) { |
||||
|
this.circuitBreakers.set(id, { |
||||
|
failures: 0, |
||||
|
lastFailure: 0, |
||||
|
open: false |
||||
|
}); |
||||
|
} |
||||
|
|
||||
|
console.log(`DNP-CB-REGISTER: Callback ${id} registered (${callback.kind})`); |
||||
|
} |
||||
|
|
||||
|
async unregister(id: string): Promise<void> { |
||||
|
this.callbacks.delete(id); |
||||
|
this.localCallbacks.delete(id); |
||||
|
this.retryQueue.delete(id); |
||||
|
this.circuitBreakers.delete(id); |
||||
|
|
||||
|
console.log(`DNP-CB-UNREGISTER: Callback ${id} unregistered`); |
||||
|
} |
||||
|
|
||||
|
async fire(event: CallbackEvent): Promise<void> { |
||||
|
const enabledCallbacks = Array.from(this.callbacks.values()) |
||||
|
.filter(cb => cb.enabled); |
||||
|
|
||||
|
console.log(`DNP-CB-FIRE: Firing event ${event.type} to ${enabledCallbacks.length} callbacks`); |
||||
|
|
||||
|
for (const callback of enabledCallbacks) { |
||||
|
try { |
||||
|
await this.deliverCallback(callback, event); |
||||
|
} catch (error) { |
||||
|
console.error(`DNP-CB-FIRE-ERROR: Failed to deliver to ${callback.id}`, error); |
||||
|
await this.handleCallbackFailure(callback, event, error); |
||||
|
} |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
async getRegistered(): Promise<CallbackRecord[]> { |
||||
|
return Array.from(this.callbacks.values()); |
||||
|
} |
||||
|
|
||||
|
async getStatus(): Promise<{ |
||||
|
total: number; |
||||
|
enabled: number; |
||||
|
circuitOpen: number; |
||||
|
lastActivity: number; |
||||
|
}> { |
||||
|
const callbacks = Array.from(this.callbacks.values()); |
||||
|
const circuitBreakers = Array.from(this.circuitBreakers.values()); |
||||
|
|
||||
|
return { |
||||
|
total: callbacks.length, |
||||
|
enabled: callbacks.filter(cb => cb.enabled).length, |
||||
|
circuitOpen: circuitBreakers.filter(cb => cb.open).length, |
||||
|
lastActivity: Math.max( |
||||
|
...callbacks.map(cb => cb.createdAt), |
||||
|
...circuitBreakers.map(cb => cb.lastFailure) |
||||
|
) |
||||
|
}; |
||||
|
} |
||||
|
|
||||
|
private async deliverCallback(callback: CallbackRecord, event: CallbackEvent): Promise<void> { |
||||
|
const circuitBreaker = this.circuitBreakers.get(callback.id); |
||||
|
|
||||
|
// Check circuit breaker
|
||||
|
if (circuitBreaker?.open) { |
||||
|
console.warn(`DNP-CB-CIRCUIT: Circuit open for ${callback.id}, skipping delivery`); |
||||
|
return; |
||||
|
} |
||||
|
|
||||
|
const start = performance.now(); |
||||
|
|
||||
|
try { |
||||
|
switch (callback.kind) { |
||||
|
case 'http': |
||||
|
await this.deliverHttpCallback(callback, event); |
||||
|
break; |
||||
|
case 'local': |
||||
|
await this.deliverLocalCallback(callback, event); |
||||
|
break; |
||||
|
case 'queue': |
||||
|
await this.deliverQueueCallback(callback, event); |
||||
|
break; |
||||
|
default: |
||||
|
throw new Error(`Unknown callback kind: ${callback.kind}`); |
||||
|
} |
||||
|
|
||||
|
// Reset circuit breaker on success
|
||||
|
if (circuitBreaker) { |
||||
|
circuitBreaker.failures = 0; |
||||
|
circuitBreaker.open = false; |
||||
|
} |
||||
|
|
||||
|
const duration = performance.now() - start; |
||||
|
console.log(`DNP-CB-SUCCESS: Delivered to ${callback.id} in ${duration.toFixed(2)}ms`); |
||||
|
|
||||
|
} catch (error) { |
||||
|
throw error; |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
private async deliverHttpCallback(callback: CallbackRecord, event: CallbackEvent): Promise<void> { |
||||
|
const response = await fetch(callback.target, { |
||||
|
method: 'POST', |
||||
|
headers: { |
||||
|
'Content-Type': 'application/json', |
||||
|
...callback.headers |
||||
|
}, |
||||
|
body: JSON.stringify({ |
||||
|
...event, |
||||
|
callbackId: callback.id, |
||||
|
timestamp: Date.now() |
||||
|
}) |
||||
|
}); |
||||
|
|
||||
|
if (!response.ok) { |
||||
|
throw new Error(`HTTP ${response.status}: ${response.statusText}`); |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
private async deliverLocalCallback(callback: CallbackRecord, event: CallbackEvent): Promise<void> { |
||||
|
const localCallback = this.localCallbacks.get(callback.id); |
||||
|
if (!localCallback) { |
||||
|
throw new Error(`Local callback ${callback.id} not found`); |
||||
|
} |
||||
|
|
||||
|
await localCallback(event); |
||||
|
} |
||||
|
|
||||
|
private async deliverQueueCallback(callback: CallbackRecord, event: CallbackEvent): Promise<void> { |
||||
|
// Queue callback implementation would go here
|
||||
|
// For now, just log the event
|
||||
|
console.log(`DNP-CB-QUEUE: Queued event ${event.type} for ${callback.id}`); |
||||
|
} |
||||
|
|
||||
|
private async handleCallbackFailure( |
||||
|
callback: CallbackRecord, |
||||
|
event: CallbackEvent, |
||||
|
error: unknown |
||||
|
): Promise<void> { |
||||
|
const circuitBreaker = this.circuitBreakers.get(callback.id); |
||||
|
|
||||
|
if (circuitBreaker) { |
||||
|
circuitBreaker.failures++; |
||||
|
circuitBreaker.lastFailure = Date.now(); |
||||
|
|
||||
|
// Open circuit after 5 consecutive failures
|
||||
|
if (circuitBreaker.failures >= 5) { |
||||
|
circuitBreaker.open = true; |
||||
|
console.error(`DNP-CB-CIRCUIT-OPEN: Circuit opened for ${callback.id} after ${circuitBreaker.failures} failures`); |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
// Schedule retry with exponential backoff
|
||||
|
await this.scheduleRetry(callback, event); |
||||
|
|
||||
|
console.error(`DNP-CB-FAILURE: Callback ${callback.id} failed`, error); |
||||
|
} |
||||
|
|
||||
|
private async scheduleRetry(callback: CallbackRecord, event: CallbackEvent): Promise<void> { |
||||
|
const retryCount = callback.retryCount || 0; |
||||
|
|
||||
|
if (retryCount >= 5) { |
||||
|
console.warn(`DNP-CB-RETRY-LIMIT: Max retries reached for ${callback.id}`); |
||||
|
return; |
||||
|
} |
||||
|
|
||||
|
const backoffMs = Math.min(1000 * Math.pow(2, retryCount), 60000); // Cap at 1 minute
|
||||
|
const retryEvent = { ...event, retryCount: retryCount + 1 }; |
||||
|
|
||||
|
if (!this.retryQueue.has(callback.id)) { |
||||
|
this.retryQueue.set(callback.id, []); |
||||
|
} |
||||
|
|
||||
|
this.retryQueue.get(callback.id)!.push(retryEvent); |
||||
|
|
||||
|
console.log(`DNP-CB-RETRY: Scheduled retry ${retryCount + 1} for ${callback.id} in ${backoffMs}ms`); |
||||
|
} |
||||
|
|
||||
|
private startRetryProcessor(): void { |
||||
|
setInterval(async () => { |
||||
|
for (const [callbackId, events] of this.retryQueue.entries()) { |
||||
|
if (events.length === 0) continue; |
||||
|
|
||||
|
const callback = this.callbacks.get(callbackId); |
||||
|
if (!callback) { |
||||
|
this.retryQueue.delete(callbackId); |
||||
|
continue; |
||||
|
} |
||||
|
|
||||
|
const event = events.shift(); |
||||
|
if (!event) continue; |
||||
|
|
||||
|
try { |
||||
|
await this.deliverCallback(callback, event); |
||||
|
} catch (error) { |
||||
|
console.error(`DNP-CB-RETRY-FAILED: Retry failed for ${callbackId}`, error); |
||||
|
} |
||||
|
} |
||||
|
}, 5000); // Process retries every 5 seconds
|
||||
|
} |
||||
|
|
||||
|
// Register local callback function
|
||||
|
registerLocalCallback(id: string, callback: CallbackFunction): void { |
||||
|
this.localCallbacks.set(id, callback); |
||||
|
console.log(`DNP-CB-LOCAL: Local callback ${id} registered`); |
||||
|
} |
||||
|
|
||||
|
// Unregister local callback function
|
||||
|
unregisterLocalCallback(id: string): void { |
||||
|
this.localCallbacks.delete(id); |
||||
|
console.log(`DNP-CB-LOCAL: Local callback ${id} unregistered`); |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
// Singleton instance
|
||||
|
export const callbackRegistry = new CallbackRegistryImpl(); |
@ -0,0 +1,311 @@ |
|||||
|
/** |
||||
|
* Observability & Health Monitoring Implementation |
||||
|
* Provides structured logging, event codes, and health monitoring |
||||
|
* |
||||
|
* @author Matthew Raymer |
||||
|
* @version 1.1.0 |
||||
|
*/ |
||||
|
|
||||
|
export interface HealthStatus { |
||||
|
nextRuns: number[]; |
||||
|
lastOutcomes: string[]; |
||||
|
cacheAgeMs: number | null; |
||||
|
staleArmed: boolean; |
||||
|
queueDepth: number; |
||||
|
circuitBreakers: { |
||||
|
total: number; |
||||
|
open: number; |
||||
|
failures: number; |
||||
|
}; |
||||
|
performance: { |
||||
|
avgFetchTime: number; |
||||
|
avgNotifyTime: number; |
||||
|
successRate: number; |
||||
|
}; |
||||
|
} |
||||
|
|
||||
|
export interface EventLog { |
||||
|
id: string; |
||||
|
timestamp: number; |
||||
|
level: 'INFO' | 'WARN' | 'ERROR'; |
||||
|
eventCode: string; |
||||
|
message: string; |
||||
|
data?: Record<string, unknown>; |
||||
|
duration?: number; |
||||
|
} |
||||
|
|
||||
|
export interface PerformanceMetrics { |
||||
|
fetchTimes: number[]; |
||||
|
notifyTimes: number[]; |
||||
|
callbackTimes: number[]; |
||||
|
successCount: number; |
||||
|
failureCount: number; |
||||
|
lastReset: number; |
||||
|
} |
||||
|
|
||||
|
/** |
||||
|
* Observability Manager |
||||
|
* Handles structured logging, health monitoring, and performance tracking |
||||
|
*/ |
||||
|
export class ObservabilityManager { |
||||
|
private eventLogs: EventLog[] = []; |
||||
|
private performanceMetrics: PerformanceMetrics = { |
||||
|
fetchTimes: [], |
||||
|
notifyTimes: [], |
||||
|
callbackTimes: [], |
||||
|
successCount: 0, |
||||
|
failureCount: 0, |
||||
|
lastReset: Date.now() |
||||
|
}; |
||||
|
private maxLogs = 1000; |
||||
|
private maxMetrics = 100; |
||||
|
|
||||
|
/** |
||||
|
* Log structured event with event code |
||||
|
*/ |
||||
|
logEvent( |
||||
|
level: 'INFO' | 'WARN' | 'ERROR', |
||||
|
eventCode: string, |
||||
|
message: string, |
||||
|
data?: Record<string, unknown>, |
||||
|
duration?: number |
||||
|
): void { |
||||
|
const event: EventLog = { |
||||
|
id: this.generateEventId(), |
||||
|
timestamp: Date.now(), |
||||
|
level, |
||||
|
eventCode, |
||||
|
message, |
||||
|
data, |
||||
|
duration |
||||
|
}; |
||||
|
|
||||
|
this.eventLogs.unshift(event); |
||||
|
|
||||
|
// Keep only recent logs
|
||||
|
if (this.eventLogs.length > this.maxLogs) { |
||||
|
this.eventLogs = this.eventLogs.slice(0, this.maxLogs); |
||||
|
} |
||||
|
|
||||
|
// Console output with structured format
|
||||
|
const logMessage = `[${eventCode}] ${message}`; |
||||
|
const logData = data ? ` | Data: ${JSON.stringify(data)}` : ''; |
||||
|
const logDuration = duration ? ` | Duration: ${duration}ms` : ''; |
||||
|
|
||||
|
switch (level) { |
||||
|
case 'INFO': |
||||
|
console.log(logMessage + logData + logDuration); |
||||
|
break; |
||||
|
case 'WARN': |
||||
|
console.warn(logMessage + logData + logDuration); |
||||
|
break; |
||||
|
case 'ERROR': |
||||
|
console.error(logMessage + logData + logDuration); |
||||
|
break; |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
/** |
||||
|
* Record performance metrics |
||||
|
*/ |
||||
|
recordMetric(type: 'fetch' | 'notify' | 'callback', duration: number, success: boolean): void { |
||||
|
switch (type) { |
||||
|
case 'fetch': |
||||
|
this.performanceMetrics.fetchTimes.push(duration); |
||||
|
break; |
||||
|
case 'notify': |
||||
|
this.performanceMetrics.notifyTimes.push(duration); |
||||
|
break; |
||||
|
case 'callback': |
||||
|
this.performanceMetrics.callbackTimes.push(duration); |
||||
|
break; |
||||
|
} |
||||
|
|
||||
|
if (success) { |
||||
|
this.performanceMetrics.successCount++; |
||||
|
} else { |
||||
|
this.performanceMetrics.failureCount++; |
||||
|
} |
||||
|
|
||||
|
// Keep only recent metrics
|
||||
|
this.trimMetrics(); |
||||
|
} |
||||
|
|
||||
|
/** |
||||
|
* Get health status |
||||
|
*/ |
||||
|
async getHealthStatus(): Promise<HealthStatus> { |
||||
|
const now = Date.now(); |
||||
|
const recentLogs = this.eventLogs.filter(log => now - log.timestamp < 24 * 60 * 60 * 1000); // Last 24 hours
|
||||
|
|
||||
|
// Calculate next runs (mock implementation)
|
||||
|
const nextRuns = this.calculateNextRuns(); |
||||
|
|
||||
|
// Get last outcomes from recent logs
|
||||
|
const lastOutcomes = recentLogs |
||||
|
.filter(log => log.eventCode.startsWith('DNP-FETCH-') || log.eventCode.startsWith('DNP-NOTIFY-')) |
||||
|
.slice(0, 10) |
||||
|
.map(log => log.eventCode); |
||||
|
|
||||
|
// Calculate cache age (mock implementation)
|
||||
|
const cacheAgeMs = this.calculateCacheAge(); |
||||
|
|
||||
|
// Check if stale armed
|
||||
|
const staleArmed = cacheAgeMs ? cacheAgeMs > 3600000 : true; // 1 hour
|
||||
|
|
||||
|
// Calculate queue depth
|
||||
|
const queueDepth = recentLogs.filter(log => |
||||
|
log.eventCode.includes('QUEUE') || log.eventCode.includes('RETRY') |
||||
|
).length; |
||||
|
|
||||
|
// Circuit breaker status
|
||||
|
const circuitBreakers = this.getCircuitBreakerStatus(); |
||||
|
|
||||
|
// Performance metrics
|
||||
|
const performance = this.calculatePerformanceMetrics(); |
||||
|
|
||||
|
return { |
||||
|
nextRuns, |
||||
|
lastOutcomes, |
||||
|
cacheAgeMs, |
||||
|
staleArmed, |
||||
|
queueDepth, |
||||
|
circuitBreakers, |
||||
|
performance |
||||
|
}; |
||||
|
} |
||||
|
|
||||
|
/** |
||||
|
* Get recent event logs |
||||
|
*/ |
||||
|
getRecentLogs(limit: number = 50): EventLog[] { |
||||
|
return this.eventLogs.slice(0, limit); |
||||
|
} |
||||
|
|
||||
|
/** |
||||
|
* Get performance metrics |
||||
|
*/ |
||||
|
getPerformanceMetrics(): PerformanceMetrics { |
||||
|
return { ...this.performanceMetrics }; |
||||
|
} |
||||
|
|
||||
|
/** |
||||
|
* Reset performance metrics |
||||
|
*/ |
||||
|
resetMetrics(): void { |
||||
|
this.performanceMetrics = { |
||||
|
fetchTimes: [], |
||||
|
notifyTimes: [], |
||||
|
callbackTimes: [], |
||||
|
successCount: 0, |
||||
|
failureCount: 0, |
||||
|
lastReset: Date.now() |
||||
|
}; |
||||
|
|
||||
|
this.logEvent('INFO', 'DNP-METRICS-RESET', 'Performance metrics reset'); |
||||
|
} |
||||
|
|
||||
|
/** |
||||
|
* Compact old logs (called by cleanup job) |
||||
|
*/ |
||||
|
compactLogs(olderThanMs: number = 30 * 24 * 60 * 60 * 1000): number { // 30 days
|
||||
|
const cutoff = Date.now() - olderThanMs; |
||||
|
const initialCount = this.eventLogs.length; |
||||
|
|
||||
|
this.eventLogs = this.eventLogs.filter(log => log.timestamp >= cutoff); |
||||
|
|
||||
|
const removedCount = initialCount - this.eventLogs.length; |
||||
|
if (removedCount > 0) { |
||||
|
this.logEvent('INFO', 'DNP-LOGS-COMPACTED', `Removed ${removedCount} old logs`); |
||||
|
} |
||||
|
|
||||
|
return removedCount; |
||||
|
} |
||||
|
|
||||
|
// Private helper methods
|
||||
|
private generateEventId(): string { |
||||
|
return `evt_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`; |
||||
|
} |
||||
|
|
||||
|
private trimMetrics(): void { |
||||
|
if (this.performanceMetrics.fetchTimes.length > this.maxMetrics) { |
||||
|
this.performanceMetrics.fetchTimes = this.performanceMetrics.fetchTimes.slice(-this.maxMetrics); |
||||
|
} |
||||
|
if (this.performanceMetrics.notifyTimes.length > this.maxMetrics) { |
||||
|
this.performanceMetrics.notifyTimes = this.performanceMetrics.notifyTimes.slice(-this.maxMetrics); |
||||
|
} |
||||
|
if (this.performanceMetrics.callbackTimes.length > this.maxMetrics) { |
||||
|
this.performanceMetrics.callbackTimes = this.performanceMetrics.callbackTimes.slice(-this.maxMetrics); |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
private calculateNextRuns(): number[] { |
||||
|
// Mock implementation - would calculate from actual schedules
|
||||
|
const now = Date.now(); |
||||
|
return [ |
||||
|
now + (60 * 60 * 1000), // 1 hour from now
|
||||
|
now + (24 * 60 * 60 * 1000) // 24 hours from now
|
||||
|
]; |
||||
|
} |
||||
|
|
||||
|
private calculateCacheAge(): number | null { |
||||
|
// Mock implementation - would get from actual cache
|
||||
|
return 1800000; // 30 minutes
|
||||
|
} |
||||
|
|
||||
|
private getCircuitBreakerStatus(): { total: number; open: number; failures: number } { |
||||
|
// Mock implementation - would get from actual circuit breakers
|
||||
|
return { |
||||
|
total: 3, |
||||
|
open: 1, |
||||
|
failures: 5 |
||||
|
}; |
||||
|
} |
||||
|
|
||||
|
private calculatePerformanceMetrics(): { |
||||
|
avgFetchTime: number; |
||||
|
avgNotifyTime: number; |
||||
|
successRate: number; |
||||
|
} { |
||||
|
const fetchTimes = this.performanceMetrics.fetchTimes; |
||||
|
const notifyTimes = this.performanceMetrics.notifyTimes; |
||||
|
const totalOperations = this.performanceMetrics.successCount + this.performanceMetrics.failureCount; |
||||
|
|
||||
|
return { |
||||
|
avgFetchTime: fetchTimes.length > 0 ? |
||||
|
fetchTimes.reduce((a, b) => a + b, 0) / fetchTimes.length : 0, |
||||
|
avgNotifyTime: notifyTimes.length > 0 ? |
||||
|
notifyTimes.reduce((a, b) => a + b, 0) / notifyTimes.length : 0, |
||||
|
successRate: totalOperations > 0 ? |
||||
|
this.performanceMetrics.successCount / totalOperations : 0 |
||||
|
}; |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
// Singleton instance
|
||||
|
export const observability = new ObservabilityManager(); |
||||
|
|
||||
|
// Event code constants
|
||||
|
export const EVENT_CODES = { |
||||
|
FETCH_START: 'DNP-FETCH-START', |
||||
|
FETCH_SUCCESS: 'DNP-FETCH-SUCCESS', |
||||
|
FETCH_FAILURE: 'DNP-FETCH-FAILURE', |
||||
|
FETCH_RETRY: 'DNP-FETCH-RETRY', |
||||
|
NOTIFY_START: 'DNP-NOTIFY-START', |
||||
|
NOTIFY_SUCCESS: 'DNP-NOTIFY-SUCCESS', |
||||
|
NOTIFY_FAILURE: 'DNP-NOTIFY-FAILURE', |
||||
|
NOTIFY_SKIPPED_TTL: 'DNP-NOTIFY-SKIPPED-TTL', |
||||
|
CALLBACK_START: 'DNP-CB-START', |
||||
|
CALLBACK_SUCCESS: 'DNP-CB-SUCCESS', |
||||
|
CALLBACK_FAILURE: 'DNP-CB-FAILURE', |
||||
|
CALLBACK_RETRY: 'DNP-CB-RETRY', |
||||
|
CALLBACK_CIRCUIT_OPEN: 'DNP-CB-CIRCUIT-OPEN', |
||||
|
CALLBACK_CIRCUIT_CLOSE: 'DNP-CB-CIRCUIT-CLOSE', |
||||
|
BOOT_RECOVERY: 'DNP-BOOT-RECOVERY', |
||||
|
SCHEDULE_UPDATE: 'DNP-SCHEDULE-UPDATE', |
||||
|
CACHE_HIT: 'DNP-CACHE-HIT', |
||||
|
CACHE_MISS: 'DNP-CACHE-MISS', |
||||
|
TTL_EXPIRED: 'DNP-TTL-EXPIRED', |
||||
|
METRICS_RESET: 'DNP-METRICS-RESET', |
||||
|
LOGS_COMPACTED: 'DNP-LOGS-COMPACTED' |
||||
|
} as const; |
Loading…
Reference in new issue