""" Environment variables: - SQLALCHEMY_DATABASE_URI: path to sqlite file, starting with "sqlite:////" - ADMIN_PASSWORD: password for admin user for sensitive endpoints, defaults to 'admin' """ from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives import serialization from cryptography.hazmat.primitives.asymmetric import ec from flask import Flask, request, jsonify, Response from models import db, VAPIDKey, Settings, Subscription from pywebpush import webpush, WebPushException from sqlalchemy import and_ from typing import Dict, Tuple import base64 import datetime import json import os import threading import time CONTACT_EMAIL = "mailto:info@timesafari.app" PUSH_SERVER_VERSION = os.getenv('PUSH_SERVER_VERSION') app = Flask(__name__) class WebPushService(): latest_subscription_run = None """ This class provides services for sending web push notifications. """ def __init__(self, app, config_name: str) -> None: """ Initializes the WebPushService with the given application and configuration name. Args: - app: The application instance where the service will be attached. - config_name (str): The name of the configuration to be used. Attributes: - app: The application instance. - daily_notification_thread (threading.Thread): A thread to send daily notifications. """ # Setting the application instance self.app = app self.app.add_url_rule('/web-push/regenerate-vapid', view_func=self.regenerate_vapid, methods=['POST']) self.app.add_url_rule('/web-push/ping', view_func=self.ping, methods=['GET']) # Setting the database URI for the application db_uri = os.getenv('SQLALCHEMY_DATABASE_URI', 'sqlite:////app/instance/data/webpush.db') # This relative path works in docker-compose # This relative path works on a local run if you link to this dir from "var/app-instance" #db_uri = os.getenv('SQLALCHEMY_DATABASE_URI', 'sqlite:///data/webpush.db') self.app.config['SQLALCHEMY_DATABASE_URI'] = db_uri # Initializing the database with the application db.init_app(self.app) # Creating a context for the application and initializing services with self.app.app_context(): self._initialize() # Creating and starting a thread to send daily notifications self.daily_notification_thread = threading.Thread(target=self._send_daily_notifications) self.daily_notification_thread.start() def _generate_and_save_vapid_keys(self) -> None: """ Generates VAPID (Voluntary Application Server Identification) keys and saves them to the database. The method generates a pair of public and private keys using the elliptic curve SECP256R1. Both the public and private keys are then serialized and encoded in base64 format. The keys are then stored in the database using a VAPIDKey model. Notes: - In case of any exception during the key generation or database operations, the error is printed to the console. Raises: - Exceptions raised by the key generation or database operations are caught and printed. """ try: # Generating a private key using the elliptic curve SECP256R1 private_key = ec.generate_private_key(ec.SECP256R1(), default_backend()) # Serializing and encoding the public key to base64 format public_key_bytes = private_key.public_key().public_bytes( encoding=serialization.Encoding.X962, format=serialization.PublicFormat.UncompressedPoint ) public_key_base64 = base64.b64encode(public_key_bytes).decode() # Serializing and encoding the private key to base64 format private_key_bytes = private_key.private_bytes( encoding=serialization.Encoding.DER, format=serialization.PrivateFormat.PKCS8, encryption_algorithm=serialization.NoEncryption() ) private_key_base64 = base64.b64encode(private_key_bytes).decode() # Saving the keys to the database key = VAPIDKey(public_key=public_key_base64, private_key=private_key_base64) db.session.add(key) db.session.commit() except Exception as e: print(f"Error generating VAPID keys: {str(e)}") raise e def _initialize(self) -> None: """ Initializes the WebPushService by checking for the presence of VAPID keys in the database. If no VAPID keys are found in the database, this method triggers the generation and storage of new VAPID keys by invoking the `_generate_and_save_vapid_keys` method. Notes: - VAPID (Voluntary Application Server Identification) keys are essential for sending push notifications to web clients, hence the check and generation if they don't exist. """ # Checking if there are any VAPID keys in the database if not VAPIDKey.query.first(): # Generating and saving VAPID keys if none are found self._generate_and_save_vapid_keys() @staticmethod def _send_push_notification(subscription_info: Dict, message: Dict, vapid_key: VAPIDKey) -> Dict[str, any]: """ Sends a push notification using the provided subscription information, message, and VAPID key. Args: - subscription_info (Dict): The information required to send the push notification to a specific client. - message (Dict): The actual message content to be sent as the push notification. - vapid_key (VAPIDKey): The VAPID key model instance containing the private key used for sending the notification. Returns Dict with the following keys - success: True if the push notification was sent successfully, False otherwise - message: a string message with the resulting text, usually nothing on success - result: request.Response https://requests.readthedocs.io/en/latest/api.html#requests.Response if the push notification was sent successfully or False if there was an exception Notes: - The `webpush` function is used to send the notification. - In case of any exception, especially a WebPushException, the error is printed to the console. """ # Sending the push notification using the webpush function try: result = webpush( subscription_info=subscription_info, data=json.dumps(message), vapid_private_key=vapid_key.private_key, vapid_claims={"sub": CONTACT_EMAIL} ) # "because sometimes that's what I had to do to make it work!" - Matthew time.sleep(1) return {"success": result.status_code == 201, "message": result.text, "result": result} except WebPushException as ex: now = datetime.datetime.now().isoformat() endpoint = subscription_info['endpoint'] print(f"{now}: Failed to send push notification for {endpoint} -- {ex}", flush=True) unsubscribed_msg = '410 Gone' unsubscribed = False if unsubscribed_msg in ex.args[0]: subscription = Subscription.query.filter_by(endpoint=endpoint).first() # Delete the subscription if found if subscription: db.session.delete(subscription) db.session.commit() print(f"Committed delete of {subscription_info}", flush=True) unsubscribed = True else: print(f"Could not find subscription at: {endpoint}", flush=True) else: print("Error other than unsubscribed/expired.", ex.args[0], flush=True) return {"success": False, "message": str(ex), "error": ex, "unsubscribed": unsubscribed} def _send_daily_notifications(self) -> None: """ Continuously sends daily push notifications to all subscribed clients. This method: 1. Retrieves all subscription data from the database. 2. Constructs the push notification message. 3. Sends a push notification to each subscribed client. 4. Sleeps for 24 hours before repeating the process. Notes: - The method runs in an infinite loop, meaning it will keep sending notifications until the program is terminated. - The notifications are sent using the `_send_push_notification` method. - A context for the application is created to enable database operations. - The message content for the daily update is hardcoded in this method. """ while True: now = datetime.datetime.now() print(f"{now} - Starting to send subscriptions...", flush=True) # Creating a context for the application to enable database operations with self.app.app_context(): # Retrieve the VAPID key from the database vapid_key = VAPIDKey.query.first() # Construct the push notification message # The title value is a key, triggering the device to apply logic and customize both title and message. # See https://gitea.anomalistdesign.com/trent_larson/crowd-funder-for-time-pwa/src/commit/1e6159869fc28ca6e6b5b3d186617d75705100b4/sw_scripts/additional-scripts.js#L65 UPDATE_TITLE = "DAILY_CHECK" message = {"title": UPDATE_TITLE, "message": f"Update for {now}"} # Determine the beginning and end time to check for subscriptions settings = Settings.query.first() if settings.running_notify_end_time is None: # set all subscriptions without a time to the current time # (these won't be picked-up in the current run, since thie current minute is the end minute) Subscription.query.filter_by(notify_time=None).update({Subscription.notify_time: now.strftime('%H:%M')}) """ Storing the HH:MM for the desired notification time isn't a bad idea. However, the logic that compares directly to it is a bit complicated. It would be more straightforward to save the next notification time in each record and then update that every time this process runs. It would require raw SQL with some calculations and many DB updates each day, but the logic would be more clear. """ # get the previous notify end time from the DB as a datetime prev_notify_end_time = datetime.datetime.fromisoformat(settings.prev_notify_end_time) # if it's before midnight this morning, catch us up to the beginning of today: if prev_notify_end_time < now.replace(hour=0, minute=0, second=0, microsecond=0): # make the start time the later of: prevNotifyEndTime or yesterday at this time start_time = max( prev_notify_end_time, # if the current time is later in the day, use that # because the next run will pick up everything from midnight until now today now.replace(second=0, microsecond=0) - datetime.timedelta(days=1) ) # make the end time right at midnight at the beginning of today end_time = now.replace(hour=0, minute=0, second=0, microsecond=0) start_minute = start_time.strftime('%H:%M') # (we'd never catch anything if we used a non-zero start_minute and "00:00" as the end_minute) end_minute = '23:60' # gotta catch "23:59", too else: # the start time is OK start_time = prev_notify_end_time # the end time is now end_time = now.replace(second=0, microsecond=0) start_minute = start_time.strftime('%H:%M') end_minute = end_time.strftime('%H:%M') # This really should update & continue only if the running_notify_end_time is still None, # just in case another thread started. settings.running_notify_end_time = end_time.isoformat() db.session.commit() # this check was generated by Copilot; it's probably unnecessary if settings.running_notify_end_time == end_time.isoformat(): # Now get the 'HH:MM' value for start & end so we can compare to the notify_time field # get all the subscriptions that have a notify_time between start_minute inclusive and end_minute exclusive all_subscriptions = Subscription.query.filter( and_(Subscription.notify_time >= start_minute, Subscription.notify_time < end_minute) ) # Send a push notification to each subscribed client num_subscriptions = all_subscriptions.count() for subscription in all_subscriptions: subscription_info = { "endpoint": subscription.endpoint, "keys": { "p256dh": subscription.p256dh, "auth": subscription.auth } } result = WebPushService._send_push_notification(subscription_info, message, vapid_key) print( f"Result from sub {subscription.id}: success={result['success']} text={result['message']}", flush=True ) settings.prev_notify_end_time = end_time.isoformat() settings.running_notify_end_time = None db.session.commit() print(f"{now} - Finished sending {num_subscriptions} subscriptions.", flush=True) else: print(f"{now} - Failed to update running_notify_end_time", flush=True) else: print(f"{now} - Stopped because we're already running a notification check.", flush=True) self.latest_subscription_run = now.isoformat() # Sleep before repeating time.sleep(5 * 60) # This is an endpoint, routed in __init__ def ping(self) -> str: """ Endpoint to show liveness info Returns: - Response: Text with some subscription-run info """ return f"pong ... version {PUSH_SERVER_VERSION} ... with latest subscription run at {self.latest_subscription_run}" # This is an endpoint, routed in __init__ def regenerate_vapid(self) -> Tuple[Response, int, dict[str, str]] | Tuple[Response, int]: """ Endpoint to regenerate VAPID keys. This method: 1. Deletes the current VAPID keys from the database. 2. Generates and stores new VAPID keys using the `_generate_and_save_vapid_keys` method. URL: /web-push/regenerate-vapid Method: POST Header: Authentication: Basic ... Returns: - Tuple with "success" as True or False, and "message" message string Notes: - If the operation is successful, a JSON response with a success message is returned with a 200 status code. - If there's an error during the operation, a JSON response with the error message is returned with a 500 status code. """ # This default can be invoked thus: curl -X POST -H "Authorization: Basic YWRtaW46YWRtaW4=" localhost:3000/web-push/regenerate-vapid envPassword = os.getenv('ADMIN_PASSWORD', 'admin') auth = request.authorization if (auth is None or auth.username is None or auth.username != 'admin' or auth.password is None or auth.password != envPassword): return ( jsonify(error='Wrong password'), 401, {'WWW-Authenticate': 'Basic realm="Login Required"'} ) # Creating a context for the application to enable database operations try: with self.app.app_context(): # Deleting the current VAPID keys from the database VAPIDKey.query.delete() db.session.commit() # Generating and saving new VAPID keys self._generate_and_save_vapid_keys() return jsonify(success=True, message="VAPID keys regenerated successfully"), 200 except Exception as e: return jsonify(success=False, message=f'Error regenerating VAPID keys: {str(e)}'), 500 @staticmethod @app.route('/web-push/vapid') def vapid() -> Response: """ Endpoint to retrieve the current VAPID public key. This method fetches the VAPID public key from the database and returns it in a JSON response. URL: /web-push/vapid Method: GET Returns: - Response: A JSON response containing the VAPID public key. Notes: - The response contains a key "vapidKey" with the associated public key as its value. """ # Retrieving the VAPID key from the database key = VAPIDKey.query.first() # Returning the public key in a JSON response return jsonify(vapidKey=key.public_key) @staticmethod @app.route('/web-push/subscribe', methods=['POST']) def subscribe() -> Tuple[Response, int]: """ Endpoint to handle new web push subscription requests. This method: 1. Retrieves the VAPID key from the database. 2. Reads the subscription content from the incoming request. 3. Saves the subscription data to the database. 4. Sends a confirmation push notification to the new subscriber. URL: /web-push/subscribe Method: POST Returns: - A JSON response with "success" as True or False, "message" as a response message string, and potentially a "result" as a request.Response object from sending a test notification Notes: - If the operation is successful, a confirmation push notification is sent to the subscriber with a success message. - If there are no VAPID keys available, an error message is returned with a 500 status code. - There's a hardcoded 5-second sleep after saving the subscription, which might be intended for some delay before sending the confirmation. Ensure this is the desired behavior. """ # Retrieving the content from the incoming request content = request.json # Retrieving the VAPID key from the database vapid_key = VAPIDKey.query.first() # Checking if the VAPID key is available if not vapid_key: return jsonify(success=False, message="No VAPID keys available"), 500 # Constructing the notify_time string notify_time = "13:13" # random time that is in most people's waking hours (server time, typically UTC) if ('notifyTime' in content) and ('utcHour' in content['notifyTime']): notify_hour = content['notifyTime']['utcHour'] if 'minute' in content['notifyTime']: notify_minute = content['notifyTime']['minute'] else: notify_minute = 0 notify_time = '{:02d}'.format(notify_hour) + ":" + '{:02d}'.format(notify_minute) # Creating a new Subscription instance with the provided data subscription = Subscription(auth=content['keys']['auth'], endpoint=content['endpoint'], notify_time=notify_time, p256dh=content['keys']['p256dh'], vapid_key_id=vapid_key.id) # Saving the subscription data to the database db.session.add(subscription) db.session.commit() # Introducing a delay (ensure that gateway endpoint is available) # ... which I'm now commenting out because there's no pending request so it doesn't make sense... we'll see if things still work #time.sleep(10) # Constructing the subscription information for the push notification subscription_info = { "endpoint": subscription.endpoint, "keys": { "p256dh": subscription.p256dh, "auth": subscription.auth } } # Creating a confirmation message for the push notification message = {"title": "Subscription Successful", "message": "Thank you for subscribing!"} # Sending the confirmation push notification result = WebPushService._send_push_notification(subscription_info, message, vapid_key) # Returning the operation status return jsonify(success=result["success"], message=result["message"]), 200 @staticmethod @app.route('/web-push/unsubscribe', methods=['POST']) def unsubscribe() -> Tuple[Response, int]: """ Endpoint to handle web push unsubscription requests. This method: 1. Reads the endpoint from the incoming request. 2. Searches for the subscription in the database using the endpoint. 3. If found, deletes the subscription from the database. URL: /web-push/unsubscribe Method: POST Returns: - Tuple[str, int]: A JSON response indicating the success or failure of the operation, along with the appropriate HTTP status code. Notes: - If the unsubscription is successful, a JSON response with a success message is returned. - If the subscription is not found in the database, an error message is returned with a 404 status code. """ # Retrieving the endpoint from the incoming request content = request.json endpoint = content['endpoint'] # Searching for the subscription in the database using the endpoint subscription = Subscription.query.filter_by(endpoint=endpoint).first() # If the subscription is found, delete it from the database if subscription: db.session.delete(subscription) db.session.commit() return jsonify(success=True, message="Subscription deleted successfully"), 200 # If the subscription is not found, return an error message else: return jsonify(success=False, message="Subscription not found"), 404 @staticmethod @app.route('/web-push/send-test', methods=['POST']) def send_test() -> Tuple[Response, int]: """ Endpoint to send a test push notification to a specific client. This method: 1. Retrieves the subscription information from the incoming request. 2. Looks up the subscription in the database. 3. Calls the _send_push_notification method to send a test push notification. - The subscription will include the "title" and "message" if supplied in the body object. 4. Returns the result of the _send_push_notification call. URL: /web-push/send-test Method: POST Returns: - A JSON response with the result of the _send_push_notification call. Notes: - The incoming request should contain the parameters "endpoint", "p256dh", and "auth". """ # Retrieving the subscription information from the incoming request content = request.json endpoint = content['endpoint'] p256dh = content['keys']['p256dh'] auth = content['keys']['auth'] # Looking up the subscription in the database subscription = Subscription.query.filter_by(endpoint=endpoint, p256dh=p256dh, auth=auth).first() # If the subscription is found, call the _send_push_notification method if subscription: subscription_info = { "endpoint": subscription.endpoint, "keys": { "p256dh": subscription.p256dh, "auth": subscription.auth } } title = "Test Notification" if "title" in content: title = content['title'] message = "This is a test notification." if "message" in content: message = content['message'] vapid_key = VAPIDKey.query.filter_by(id=subscription.vapid_key_id).first() result = WebPushService._send_push_notification( subscription_info, {"title": title, "message": message}, vapid_key ) print(f"Test sent: {result['success']}") return jsonify(success=result["success"], message=result["message"]), 200 else: print(f"Test failed due to missing subscription. Request: {json.dumps(content)}") return jsonify({"success": False, "message": "Subscription not found"}), 404 web_push_service = WebPushService(app, "app")