""" Web Push Notification Service This module provides a Flask-based web service for managing and sending web push notifications to subscribed clients. It implements the Web Push Protocol with VAPID authentication. Key Features: - VAPID key management for secure push notifications - Subscription management (subscribe/unsubscribe) - Scheduled daily notifications - Test notification endpoints - Support for direct and check-type notifications Environment Variables: ADMIN_PASSWORD: Password for admin endpoints (default: 'admin') PUSH_SERVER_VERSION: Version identifier for the push server SECONDS_BETWEEN_NOTIFICATIONS: Interval between notification checks (default: 300) SQLALCHEMY_DATABASE_URI: SQLite database path (format: "sqlite:////path/to/db") Technical Details: - Uses Flask for web framework - SQLAlchemy for database operations - pywebpush for Web Push Protocol implementation - Threading for background notification processing - Cryptography for VAPID key generation Security Notes: - Implements VAPID authentication - Admin endpoints require basic auth - Database connections use SQLite with configurable path - Message size limits enforced Author: Matthew Raymer Version: 1.0.0 """ import structlog logger = structlog.get_logger() from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives import serialization from cryptography.hazmat.primitives.asymmetric import ec from flask import Flask, request, jsonify, Response from models import db, VAPIDKey, Settings, Subscription from pywebpush import webpush, WebPushException from sqlalchemy import and_ from typing import Dict, Tuple import base64 import datetime import json import os import threading import time import sqlalchemy.exc import cryptography.exceptions CONTACT_EMAIL = "mailto:info@timesafari.app" # On Time Safari, this title bypasses the filters and shows the message directly. TITLE_DIRECT_NOTIFICATION = 'DIRECT_NOTIFICATION' # On Time Safari, this title triggers the API check for the user's latest data. TITLE_DAILY_INDIVIDUAL_CHECK = 'DAILY_CHECK' PUSH_SERVER_VERSION = os.getenv('PUSH_SERVER_VERSION') SECONDS_BETWEEN_NOTIFICATIONS_STR = os.getenv( 'SECONDS_BETWEEN_NOTIFICATIONS', '60') SECONDS_BETWEEN_NOTIFICATIONS = int(SECONDS_BETWEEN_NOTIFICATIONS_STR) app = Flask(__name__) class WebPushService(): """ Web Push Notification Service Manager This class manages web push notification services including VAPID key management, subscription handling, and notification delivery. It integrates with Flask to provide HTTP endpoints and maintains a background thread for scheduled notifications. Key Responsibilities: - VAPID key management and generation - Push notification delivery - Subscription management - Daily notification scheduling - Health check endpoints Attributes: latest_subscription_run (str): ISO formatted timestamp of last notification run app (Flask): Flask application instance daily_notification_thread (threading.Thread): Background thread for notifications Public Endpoints: - /web-push/regenerate-vapid (POST): Regenerate VAPID keys (admin auth required) - /web-push/ping (GET): Service health check - /web-push/vapid (GET): Retrieve current VAPID public key - /web-push/subscribe (POST): Register new push subscription - /web-push/unsubscribe (POST): Remove push subscription - /web-push/send-test (POST): Send test notification Database Models Used: - VAPIDKey: Stores VAPID key pairs - Settings: Stores service configuration - Subscription: Stores push notification subscriptions Security Features: - Admin authentication for sensitive endpoints - VAPID authentication for push messages - Input validation - Subscription verification - Error handling for expired subscriptions Threading: Runs a background thread that: - Checks for pending notifications every SECONDS_BETWEEN_NOTIFICATIONS - Processes notifications based on configured notification times - Handles subscription cleanup for expired endpoints Error Handling: - Catches and logs push notification failures - Handles expired subscriptions - Manages database transaction errors - Provides detailed error responses Dependencies: - Flask for web framework - SQLAlchemy for database operations - pywebpush for Web Push Protocol - cryptography for key generation Example Usage: app = Flask(__name__) web_push_service = WebPushService(app, "production") # Service automatically starts background thread and initializes endpoints Author: Matthew Raymer Version: 1.0.0 """ latest_subscription_run = None def __init__(self, app, config_name: str) -> None: """ Initialize the Web Push Service with Flask app and configuration. Args: app (Flask): Flask application instance to attach service endpoints config_name (str): Configuration profile name (e.g., "production", "development") Initializes: - Database connection - VAPID keys if not present - Background notification thread - Service endpoints - Configuration settings Raises: RuntimeError: If database initialization fails Exception: If VAPID key generation fails """ # Setting the application instance self.app = app self.app.add_url_rule('/web-push/regenerate-vapid', view_func=self.regenerate_vapid, methods=['POST']) self.app.add_url_rule( '/web-push/ping', view_func=self.ping, methods=['GET']) # Setting the database URI for the application, with an absolute path db_uri = os.getenv('SQLALCHEMY_DATABASE_URI', 'sqlite:////app/instance/data/webpush.db') # This relative path works in docker-compose # This relative path works on a local run if you link to the dir with app.py from "var/app-instance" db_uri = os.getenv('SQLALCHEMY_DATABASE_URI', 'sqlite:///data/webpush.db') self.app.config['SQLALCHEMY_DATABASE_URI'] = db_uri # Initializing the database with the application db.init_app(self.app) # Creating a context for the application and initializing services with self.app.app_context(): self._initialize() # Creating and starting a thread to send daily notifications self.daily_notification_thread = threading.Thread( target=self._send_daily_notifications) self.daily_notification_thread.start() def _generate_and_save_vapid_keys(self) -> None: """ Generate and store new VAPID keys for push notification authentication. This method handles the complete lifecycle of VAPID key generation: 1. Generates a new ECDSA private key using SECP256R1 curve 2. Derives the corresponding public key 3. Serializes both keys to base64 format 4. Stores the key pair in the database Technical Details: - Uses SECP256R1 elliptic curve for key generation - Public key format: X9.62 uncompressed point encoding - Private key format: PKCS#8 DER encoding - Both keys are base64 encoded for storage - No encryption is used for private key storage (consider as enhancement) Database Impact: - Creates new record in VAPIDKey table - Previous keys should be deleted before calling this method Security Considerations: - Private key is stored unencrypted - Key generation uses cryptographically secure random number generator - Uses standard cryptographic primitives from cryptography library Raises: Exception: If key generation, serialization, or database operations fail Error details are logged before re-raising Note: This is an internal method used during initialization and key rotation. For key rotation, ensure all existing subscriptions are updated or removed before rotating keys. """ try: # Generating a private key using the elliptic curve SECP256R1 private_key = ec.generate_private_key( ec.SECP256R1(), default_backend()) # Serializing and encoding the public key to base64 format public_key_bytes = private_key.public_key().public_bytes( encoding=serialization.Encoding.X962, format=serialization.PublicFormat.UncompressedPoint ) public_key_base64 = base64.b64encode(public_key_bytes).decode() # Serializing and encoding the private key to base64 format private_key_bytes = private_key.private_bytes( encoding=serialization.Encoding.DER, format=serialization.PrivateFormat.PKCS8, encryption_algorithm=serialization.NoEncryption() ) private_key_base64 = base64.b64encode(private_key_bytes).decode() # Saving the keys to the database key = VAPIDKey(public_key=public_key_base64, private_key=private_key_base64) db.session.add(key) db.session.commit() except Exception as e: logger.error("error_generating_vapid_keys", error=str(e)) raise e def _initialize(self) -> None: """ Initialize the WebPushService by ensuring VAPID keys exist. This method is called during service startup to ensure the necessary VAPID (Voluntary Application Server Identification) infrastructure is in place. Workflow: 1. Checks for existing VAPID keys in database 2. If no keys exist, generates and stores new key pair 3. Maintains exactly one active VAPID key pair at all times Database Operations: - Reads from VAPIDKey table - May write to VAPIDKey table if no keys exist - Uses SQLAlchemy session for transaction management Dependencies: - Requires active database connection - Must be called within Flask application context - Depends on _generate_and_save_vapid_keys for key generation Security Considerations: - Ensures VAPID authentication is always available - Maintains cryptographic identity of the service - Critical for secure push notification delivery Error Handling: - Database errors are propagated to caller - Key generation errors are propagated to caller Note: This is an internal method called automatically during service initialization. Manual calls are not typically necessary unless recovering from a failure state. """ # Checking if there are any VAPID keys in the database if not VAPIDKey.query.first(): # Generating and saving VAPID keys if none are found self._generate_and_save_vapid_keys() @staticmethod def _send_push_notification(subscription_info: Dict, message: Dict, vapid_private_key: str) -> Dict[str, any]: """ Send a web push notification to a subscribed client. This method handles the delivery of push notifications using the Web Push Protocol, including VAPID authentication and error handling. Args: subscription_info (Dict): Push subscription information containing: - endpoint: URL for the push service - keys: Dict containing: - p256dh: Client's public key - auth: Client's auth secret message (Dict): Notification payload containing: - title: Notification title (required) - message: Notification body (optional) vapid_private_key (str): Base64-encoded private key for VAPID authentication Returns: Dict[str, any]: Result containing: - success (bool): True if notification was sent (status code 201) - message (str): Response text or error message - result (Response|bool): Response object if successful, False if failed - unsubscribed (bool): Optional, True if subscription was removed - error (Exception): Optional, present if an error occurred Technical Details: - Uses pywebpush library for Web Push Protocol implementation - Implements mandatory 1-second delay after sending - Automatically handles subscription cleanup for 410 Gone responses - Uses VAPID for push service authentication Error Handling: - Catches and processes WebPushException - Logs errors with ISO formatted timestamps - Handles unsubscribe (410 Gone) responses - Removes invalid subscriptions from database Database Impact: - May delete from Subscription table on 410 Gone responses - Uses SQLAlchemy session for transaction management Security Considerations: - Requires valid VAPID authentication - Handles secure key transmission - Implements proper subscription cleanup Note: The 1-second delay after sending is a workaround for reliability. Consider monitoring and adjusting this delay based on service requirements. """ try: result = webpush( subscription_info=subscription_info, data=json.dumps(message), vapid_private_key=vapid_private_key, vapid_claims={"sub": CONTACT_EMAIL} ) # "because sometimes that's what I had to do to make it work!" - Matthew time.sleep(1) return {"success": result.status_code == 201, "message": result.text, "result": result} except WebPushException as ex: now = datetime.datetime.now().isoformat() endpoint = subscription_info['endpoint'] logger.error("push_notification_failed", endpoint=endpoint, error=str(ex), timestamp=now) unsubscribed_msg = '410 Gone' unsubscribed = False if unsubscribed_msg in ex.args[0]: subscription = Subscription.query.filter_by( endpoint=endpoint).first() if subscription: db.session.delete(subscription) db.session.commit() logger.info("subscription_deleted", endpoint=endpoint, reason="410_gone") unsubscribed = True else: logger.warning("subscription_not_found", endpoint=endpoint) else: logger.error("push_notification_error", endpoint=endpoint, error=ex.args[0]) return {"success": False, "message": str(ex), "error": ex, "unsubscribed": unsubscribed} def _send_daily_notifications(self) -> None: """ Process and send scheduled notifications to all eligible subscribers. This method runs in a continuous loop as a background thread, processing notifications based on subscribers' configured notification times. Implementation Notes: The notification time logic stores HH:MM for desired notification times. While direct time comparison is complex, this approach was chosen for simplicity. A potential improvement would be to store next notification time per record and update via SQL calculations, trading more DB updates for clearer logic. Workflow: 1. Runs in infinite loop with configurable sleep interval 2. For each iteration: - Checks if notification process is already running - Determines time window for notifications - Processes subscriptions within that window - Updates notification tracking in settings Time Window Logic: - If previous notification was before today: Start: Max(previous_end_time, yesterday_at_current_time) End: Beginning of today (00:00) - Otherwise: Start: Previous notification end time End: Current time Database Operations: - Reads from Settings table for notification tracking - Updates Settings.running_notify_end_time during processing - Updates Settings.prev_notify_end_time after completion - Reads from Subscription table for eligible notifications - May delete from Subscription table on failed deliveries Notification Types: - TITLE_DIRECT_NOTIFICATION: Shows message directly - TITLE_DAILY_INDIVIDUAL_CHECK: Triggers client API check Default message provided for DIRECT_NOTIFICATION if none specified Error Handling: - Logs all notification failures - Handles subscription cleanup for failed deliveries - Maintains notification tracking even on failures - Prevents concurrent notification processing Thread Safety: - Uses Settings.running_notify_end_time as process lock - Updates notification timestamps atomically - Handles concurrent access to subscription data Performance Considerations: - Processes notifications in batches by time window - Implements configurable delay between iterations - Logs performance metrics for monitoring Dependencies: - Requires Flask application context - Needs active database connection - Uses WebPushService._send_push_notification for delivery Configuration: - Sleep interval: SECONDS_BETWEEN_NOTIFICATIONS environment variable - Default message: Hardcoded for DIRECT_NOTIFICATION type Note: This method runs indefinitely in a background thread. Termination occurs only when the application shuts down. """ while True: now = datetime.datetime.now() logger.info("starting_subscription_check", timestamp=now.isoformat()) # Creating a context for the application to enable database operations with self.app.app_context(): # Retrieve the VAPID key from the database vapid_key = VAPIDKey.query.first() # Determine the beginning and end time to check for subscriptions settings = Settings.query.first() if settings.running_notify_end_time is None: # only do this if we're not already inside one of these loops # get the previous notify end time from the DB as a datetime prev_notify_end_time = datetime.datetime.fromisoformat( settings.prev_notify_end_time) # if it's before midnight this morning if prev_notify_end_time < now.replace( hour=0, minute=0, second=0, microsecond=0): # catch us up to the beginning of today # make the start time the later of: prevNotifyEndTime # or yesterday at this time start_time = max( prev_notify_end_time, # if the current time is later in the day, use that # because the next run will pick up everything from midnight until now today now.replace(second=0, microsecond=0) - datetime.timedelta(days=1) ) # make the end time right at midnight at the beginning of today end_time = now.replace( hour=0, minute=0, second=0, microsecond=0) start_minute = start_time.strftime('%H:%M') # (we'd never catch anything if we used a non-zero # start_minute and "00:00" as the end_minute) end_minute = '23:60' # gotta catch "23:59", too else: # the start time is OK start_time = prev_notify_end_time # the end time is now end_time = now.replace(second=0, microsecond=0) start_minute = start_time.strftime('%H:%M') end_minute = end_time.strftime('%H:%M') # This really should update & continue only if the # running_notify_end_time is still None, # just in case another thread started. settings.running_notify_end_time = end_time.isoformat() # set all subscriptions without a time to the current time # (these won't be picked-up in the current run, since the # current minute is the end minute) Subscription.query.filter_by(notify_time=None).update( {Subscription.notify_time: now.strftime('%H:%M')}) db.session.commit() # this check was generated by Copilot; it's probably unnecessary if settings.running_notify_end_time == end_time.isoformat(): # Now get the 'HH:MM' value for start & end so we can # compare to the notify_time field # get all the subscriptions that have a notify_time # between start_minute inclusive and end_minute exclusive all_subscriptions = Subscription.query.filter( and_(Subscription.notify_time >= start_minute, Subscription.notify_time < end_minute) ) # Send a push notification to each subscribed client num_subscriptions = all_subscriptions.count() for subscription in all_subscriptions: subscription_info = { "endpoint": subscription.endpoint, "keys": { "p256dh": subscription.p256dh, "auth": subscription.auth } } # Construct the push notification message # The title value is a key, defaulting to trigger # the device to apply logic and customize both # title and message. # See https://gitea.anomalistdesign.com/trent_larson/crowd-funder-for-time-pwa/src/commit/1e6159869fc28ca6e6b5b3d186617d75705100b4/sw_scripts/additional-scripts.js#L65 payload = {"title": subscription.notify_type} if subscription.message: payload['message'] = subscription.message elif subscription.notify_type == TITLE_DIRECT_NOTIFICATION: # They should get some message. payload['message'] = ( "Just a friendly reminder: click and share some gratitude with the world.") result = WebPushService._send_push_notification( subscription_info, payload, vapid_key.private_key) logger.info("subscription_notification_result", subscription_id=subscription.id, success=result['success'], message=result['message']) settings.prev_notify_end_time = end_time.isoformat() settings.running_notify_end_time = None db.session.commit() logger.info("notification_batch_complete", count=num_subscriptions, timestamp=now.isoformat()) else: logger.error("failed_to_update_running_notify_end_time", timestamp=now.isoformat()) else: logger.info("subscription_check_skipped", reason="already_running", timestamp=now.isoformat()) self.latest_subscription_run = now.isoformat() # Sleep before repeating time.sleep(SECONDS_BETWEEN_NOTIFICATIONS) # This is an endpoint, routed in __init__ def ping(self) -> str: """ Endpoint to show liveness info Returns: - Response: Text with some subscription-run info """ return f"pong ... version {PUSH_SERVER_VERSION} ... with latest subscription run at {self.latest_subscription_run}" # This is an endpoint, routed in __init__ def regenerate_vapid(self) -> Tuple[Response, int, dict[str, str]] | Tuple[Response, int]: """ Regenerate VAPID keys for push notification authentication. This endpoint provides secure rotation of VAPID keys used for push notification authentication. It requires admin authentication to prevent unauthorized key rotation. Endpoint Details: URL: /web-push/regenerate-vapid Method: POST Auth Required: Basic Authentication username: admin password: ADMIN_PASSWORD environment variable (default: 'admin') Authentication: - Uses HTTP Basic Authentication - Credentials must match configured admin values - Returns 401 with WWW-Authenticate header on auth failure Process Flow: 1. Validates admin credentials 2. Deletes existing VAPID keys from database 3. Generates and stores new key pair 4. Returns success/failure response Database Impact: - Deletes all records from VAPIDKey table - Creates new record in VAPIDKey table - Uses transaction to ensure atomic updates Security Considerations: - Requires admin authentication - Invalidates all existing VAPID keys - Should be used cautiously as it affects all subscriptions - Consider notifying subscribers before rotation Returns: Union[ Tuple[Response, int, dict[str, str]], # Auth failure case Tuple[Response, int] # Success/other failure cases ]: - Success: ({"success": True, "message": str}, 200) - Auth Failure: ({"error": str}, 401, {"WWW-Authenticate": str}) - Other Failure: ({"success": False, "message": str}, 500) Example Usage: curl -X POST -H "Authorization: Basic YWRtaW46YWRtaW4=" localhost:3000/web-push/regenerate-vapid Note: Key rotation may impact existing subscriptions. Consider implementing a migration strategy for existing subscriptions when rotating keys. """ envPassword = os.getenv('ADMIN_PASSWORD', 'admin') auth = request.authorization if (auth is None or auth.username is None or auth.username != 'admin' or auth.password is None or auth.password != envPassword): return ( jsonify(error='Wrong password'), 401, {'WWW-Authenticate': 'Basic realm="Login Required"'} ) # Creating a context for the application to enable database operations try: with self.app.app_context(): VAPIDKey.query.delete() db.session.commit() self._generate_and_save_vapid_keys() return jsonify(success=True, message="VAPID keys regenerated successfully"), 200 except (sqlalchemy.exc.SQLAlchemyError, cryptography.exceptions.InvalidKey) as e: return jsonify(success=False, message=f'Error regenerating VAPID keys: {str(e)}'), 500 @staticmethod @app.route('/web-push/vapid') def vapid() -> Response: """ Retrieve the current VAPID public key for push notification authentication. This endpoint provides the public VAPID key needed by web clients to set up push notification subscriptions. The key is used as part of the Web Push Protocol to authenticate notification requests. Endpoint Details: URL: /web-push/vapid Method: GET Auth Required: None Returns: Response: JSON object containing: vapidKey (str): Base64-encoded VAPID public key Example Response: { "vapidKey": "BDd3_hVL9fZi9Ybo2UUzA284..." } Error Handling: - Returns 500 if no VAPID key is found in database - Database errors propagate to global error handler Security Considerations: - Public key is safe to expose - No authentication required as this is public information - Key rotation handled by separate admin endpoint Usage Example: fetch('/web-push/vapid') .then(response => response.json()) .then(data => subscribeUserToPush(data.vapidKey)); """ # Retrieving the VAPID key from the database key = VAPIDKey.query.first() # Returning the public key in a JSON response return jsonify(vapidKey=key.public_key) @staticmethod @app.route('/web-push/subscribe', methods=['POST']) def subscribe() -> Tuple[Response, int]: """ Register a new web push notification subscription. This endpoint stores subscription details and sends a confirmation notification to verify the subscription is working correctly. Endpoint Details: URL: /web-push/subscribe Method: POST Auth Required: None Request Body: { "endpoint": str, # Push service URL for the subscription "keys": { "p256dh": str, # Client's public key "auth": str # Client's auth secret }, "notifyTime": { "utcHour": int, # Hour in UTC (0-23) "minute": int # Optional, defaults to 0 (0-59) }, "notifyType": str, # Optional, defaults to DAILY_CHECK # Valid values: DAILY_CHECK, DIRECT_NOTIFICATION "message": str # Optional, max 100 chars, used with DIRECT_NOTIFICATION } Returns: Tuple[Response, int]: JSON response and HTTP status Success: ({"success": true, "message": str}, 200) Error: ({"success": false, "message": str}, 400|500) Error Cases: - 400: Missing/invalid request parameters - 400: Message exceeds 100 characters - 400: Invalid notifyType - 500: VAPID key not available Security Considerations: - Validates all input parameters - Enforces message length limits - Stores subscription with creation timestamp - Associates subscription with current VAPID key Example Usage: fetch('/web-push/subscribe', { method: 'POST', headers: {'Content-Type': 'application/json'}, body: JSON.stringify({ endpoint: pushSubscription.endpoint, keys: pushSubscription.keys, notifyTime: {utcHour: 14, minute: 30}, notifyType: "DAILY_CHECK" }) }); """ # Retrieving the content from the incoming request content = request.json if (content is None) or ('endpoint' not in content) or ('keys' not in content): return jsonify(success=False, message="Missing subscription information"), 400 if ('p256dh' not in content['keys']) or ('auth' not in content['keys']): return jsonify(success=False, message="Missing subscription keys information"), 400 if ('notifyTime' not in content) or ('utcHour' not in content['notifyTime']): return jsonify(success=False, message="Missing notifyTime information"), 400 if ('notifyType' in content) and (content['notifyType'] not in [TITLE_DIRECT_NOTIFICATION, TITLE_DAILY_INDIVIDUAL_CHECK]): return jsonify(success=False, message="Invalid notifyType"), 400 # Retrieving the VAPID key from the database vapid_key = VAPIDKey.query.first() # Checking if the VAPID key is available if not vapid_key: return jsonify(success=False, message="No VAPID keys available"), 500 # Constructing the notify_time string notify_hour = content['notifyTime']['utcHour'] if 'minute' in content['notifyTime']: notify_minute = content['notifyTime']['minute'] else: notify_minute = 0 notify_time = '{:02d}'.format( notify_hour) + ":" + '{:02d}'.format(notify_minute) notify_type = TITLE_DAILY_INDIVIDUAL_CHECK if 'notifyType' in content: notify_type = content['notifyType'] # check that the message is 100 characters or less message = None if 'message' in content: if len(content['message']) > 100: return jsonify(success=False, message="Message is too long. Max 100 characters."), 400 else: message = content['message'] # Creating a new Subscription instance with the provided data subscription = Subscription(auth=content['keys']['auth'], created_date=datetime.datetime.now().isoformat(), endpoint=content['endpoint'], message=message, notify_time=notify_time, notify_type=notify_type, p256dh=content['keys']['p256dh'], vapid_key_id=vapid_key.id) # Saving the subscription data to the database db.session.add(subscription) db.session.commit() # Constructing the subscription information for the push notification subscription_info = { "endpoint": subscription.endpoint, "keys": { "p256dh": subscription.p256dh, "auth": subscription.auth } } # Creating a confirmation message for the push notification message = {"title": "Subscription Successful", "message": "Thank you for subscribing!"} # Sending the confirmation push notification result = WebPushService._send_push_notification( subscription_info, message, vapid_key.private_key) # Returning the operation status return jsonify(success=result["success"], message=result["message"]), 200 @staticmethod @app.route('/web-push/unsubscribe', methods=['POST']) def unsubscribe() -> Tuple[Response, int]: """ Remove a web push notification subscription. This endpoint removes subscription(s) for a given endpoint, optionally filtered by notification type. Endpoint Details: URL: /web-push/unsubscribe Method: POST Auth Required: None Request Body: { "endpoint": str, # Required: Push service URL to unsubscribe "notifyType": str # Optional: If provided, only removes subscriptions # of this type (DAILY_CHECK or DIRECT_NOTIFICATION) } Returns: Tuple[Response, int]: JSON response and HTTP status Success: ({"success": true, "message": str}, 200) Error: ({"success": false, "message": str}, 400) Error Cases: - 400: Missing endpoint parameter Behavior: - If notifyType is provided: Removes only subscriptions matching both endpoint and notifyType - If notifyType is omitted: Removes all subscriptions for the endpoint Example Usage: # Remove specific notification type fetch('/web-push/unsubscribe', { method: 'POST', headers: {'Content-Type': 'application/json'}, body: JSON.stringify({ endpoint: subscription.endpoint, notifyType: "DAILY_CHECK" }) }); # Remove all notifications fetch('/web-push/unsubscribe', { method: 'POST', headers: {'Content-Type': 'application/json'}, body: JSON.stringify({ endpoint: subscription.endpoint }) }); """ # Retrieving the endpoint from the incoming request content = request.json endpoint = content.get('endpoint') if endpoint is None: return jsonify(success=False, message="Missing endpoint information"), 400 # Searching for the subscription in the database using the endpoint if 'notifyType' in content: notify_type = content['notifyType'] logger.info("unsubscribe_request", endpoint=endpoint, notify_type=notify_type) db.session.query( Subscription).filter( and_(Subscription.endpoint == endpoint, Subscription.notify_type == notify_type) ).delete(synchronize_session=False) db.session.commit() else: logger.info("unsubscribe_request", endpoint=endpoint, notify_type="all") db.session.query(Subscription).filter( Subscription.endpoint == endpoint).delete(synchronize_session=False) db.session.commit() return jsonify(success=True, message="Subscription deleted successfully"), 200 @staticmethod @app.route('/web-push/send-test', methods=['POST']) def send_test() -> Tuple[Response, int]: """ Send a test push notification to a specific client. This endpoint verifies push notification functionality by sending a test message to a specified subscription. Endpoint Details: URL: /web-push/send-test Method: POST Auth Required: None Request Body: { "endpoint": str, # Required: Push service URL "keys": { "p256dh": str, # Required: Client's public key "auth": str # Required: Client's auth secret }, "title": str, # Optional: Custom notification title # (defaults to "Test Notification") "message": str # Optional: Custom notification message # (defaults to "This is a test notification.") } Returns: Tuple[Response, int]: JSON response and HTTP status Success: ({"success": true, "message": str}, 200) Error: ({"success": false, "message": str}, 400|404) Error Cases: - 400: Missing required subscription parameters - 404: Subscription not found in database Example Usage: fetch('/web-push/send-test', { method: 'POST', headers: {'Content-Type': 'application/json'}, body: JSON.stringify({ endpoint: subscription.endpoint, keys: subscription.keys, title: "Custom Test", message: "Hello, World!" }) }); Note: This endpoint is useful for: - Verifying subscription setup - Testing notification delivery - Debugging client-side notification handling """ # Retrieving the subscription information from the incoming request content = request.json endpoint = content.get('endpoint') p256dh = content.get('keys', {}).get('p256dh') auth = content.get('keys', {}).get('auth') if (endpoint is None) or (p256dh is None) or (auth is None): return jsonify({"success": False, "message": "Missing subscription information"}), 400 # Looking up the subscription in the database subscription = Subscription.query.filter_by( endpoint=endpoint, p256dh=p256dh, auth=auth).first() # If the subscription is found, call the _send_push_notification method if subscription: subscription_info = { "endpoint": endpoint, "keys": { "p256dh": p256dh, "auth": auth } } title = "Test Notification" if "title" in content: title = content['title'] message = "This is a test notification." if "message" in content: message = content['message'] vapid_key = VAPIDKey.query.filter_by( id=subscription.vapid_key_id).first() result = WebPushService._send_push_notification( subscription_info, {"title": title, "message": message}, vapid_key.private_key ) logger.info("test_notification_sent", success=result["success"], message=result["message"]) return jsonify(success=result["success"], message=result["message"]), 200 else: logger.error("test_notification_failed", error="subscription_not_found", request=content) return jsonify({"success": False, "message": "Subscription not found"}), 404 web_push_service = WebPushService(app, "app")