""" Environment variables: - SQLALCHEMY_DATABASE_URI: path to sqlite file, starting with "sqlite:////" - ADMIN_PASSWORD: password for admin user for sensitive endpoints, defaults to 'admin' """ from typing import Dict, Tuple, Union, Optional from flask import Flask, request, jsonify, Response from models import db, VAPIDKey, Subscription from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives import serialization from cryptography.hazmat.primitives.asymmetric import ec from pywebpush import webpush, WebPushException import base64 import datetime import json import os import threading import time CONTACT_EMAIL = "mailto:info@timesafari.app" app = Flask(__name__) class WebPushService(): """ This class provides services for sending web push notifications. """ def __init__(self, app, config_name: str) -> None: """ Initializes the WebPushService with the given application and configuration name. Args: - app: The application instance where the service will be attached. - config_name (str): The name of the configuration to be used. Attributes: - app: The application instance. - daily_notification_thread (threading.Thread): A thread to send daily notifications. """ # Setting the application instance self.app = app self.app.add_url_rule('/web-push/regenerate-vapid', view_func=self.regenerate_vapid, methods=['POST']) # Setting the database URI for the application db_uri = os.getenv('SQLALCHEMY_DATABASE_URI', 'sqlite:////app/instance/data/webpush.db') # This relative path works in docker-compose #db_uri = os.getenv('SQLALCHEMY_DATABASE_URI', 'sqlite:///data/webpush.db') self.app.config['SQLALCHEMY_DATABASE_URI'] = db_uri # Initializing the database with the application db.init_app(self.app) # Creating a context for the application and initializing services with self.app.app_context(): self._initialize() # Creating and starting a thread to send daily notifications self.daily_notification_thread = threading.Thread(target=self._send_daily_notifications) self.daily_notification_thread.start() def _generate_and_save_vapid_keys(self) -> None: """ Generates VAPID (Voluntary Application Server Identification) keys and saves them to the database. The method generates a pair of public and private keys using the elliptic curve SECP256R1. Both the public and private keys are then serialized and encoded in base64 format. The keys are then stored in the database using a VAPIDKey model. Notes: - In case of any exception during the key generation or database operations, the error is printed to the console. Raises: - Exceptions raised by the key generation or database operations are caught and printed. """ try: # Generating a private key using the elliptic curve SECP256R1 private_key = ec.generate_private_key(ec.SECP256R1(), default_backend()) # Serializing and encoding the public key to base64 format public_key_bytes = private_key.public_key().public_bytes( encoding=serialization.Encoding.X962, format=serialization.PublicFormat.UncompressedPoint ) public_key_base64 = base64.b64encode(public_key_bytes).decode() # Serializing and encoding the private key to base64 format private_key_bytes = private_key.private_bytes( encoding=serialization.Encoding.DER, format=serialization.PrivateFormat.PKCS8, encryption_algorithm=serialization.NoEncryption() ) private_key_base64 = base64.b64encode(private_key_bytes).decode() # Saving the keys to the database key = VAPIDKey(public_key=public_key_base64, private_key=private_key_base64) db.session.add(key) db.session.commit() except Exception as e: print(f"Error generating VAPID keys: {str(e)}") raise e def _initialize(self) -> None: """ Initializes the WebPushService by checking for the presence of VAPID keys in the database. If no VAPID keys are found in the database, this method triggers the generation and storage of new VAPID keys by invoking the `_generate_and_save_vapid_keys` method. Notes: - VAPID (Voluntary Application Server Identification) keys are essential for sending push notifications to web clients, hence the check and generation if they don't exist. """ # Checking if there are any VAPID keys in the database if not VAPIDKey.query.first(): # Generating and saving VAPID keys if none are found self._generate_and_save_vapid_keys() @staticmethod def _send_push_notification(subscription_info: Dict, message: Dict, vapid_key: VAPIDKey) -> bool: """ Sends a push notification using the provided subscription information, message, and VAPID key. Args: - subscription_info (Dict): The information required to send the push notification to a specific client. - message (Dict): The actual message content to be sent as the push notification. - vapid_key (VAPIDKey): The VAPID key model instance containing the private key used for sending the notification. Returns: - request.Response https://requests.readthedocs.io/en/latest/api.html#requests.Response if the push notification was sent successfully or False if there was an exception Notes: - The `webpush` function is used to send the notification. - In case of any exception, especially a WebPushException, the error is printed to the console. """ # Sending the push notification using the webpush function try: result = webpush( subscription_info=subscription_info, data=json.dumps(message), vapid_private_key=vapid_key.private_key, vapid_claims={"sub": CONTACT_EMAIL} ) # "because sometimes that's what I had to do to make it work!" - Matthew time.sleep(1) return {"success": result.status_code == 201, "message": result.text, "result": result} except WebPushException as ex: now = datetime.datetime.now().isoformat() endpoint = subscription_info['endpoint'] print(f"{now}: Failed to send push notification for {endpoint} -- {ex}", flush=True) unsubscribed_msg = '410 Gone' unsubscribed = False if unsubscribed_msg in ex.args[0]: subscription = Subscription.query.filter_by(endpoint=endpoint).first() # Delete the subscription if found if subscription: db.session.delete(subscription) db.session.commit() print(f"Committed delete of {subscription_info}", flush=True) unsubscribed = True else: print(f"Could not find subscription at: {endpoint}", flush=True) else: print("Error other than unsubscribed/expired.", ex.args[0], flush=True) return {"success": False, "message": ex.args[0], "error": ex, "unsubscribed": unsubscribed} def _send_daily_notifications(self) -> None: """ Continuously sends daily push notifications to all subscribed clients. This method: 1. Retrieves all subscription data from the database. 2. Constructs the push notification message. 3. Sends a push notification to each subscribed client. 4. Sleeps for 24 hours before repeating the process. Notes: - The method runs in an infinite loop, meaning it will keep sending notifications until the program is terminated. - The notifications are sent using the `_send_push_notification` method. - A context for the application is created to enable database operations. - The message content for the daily update is hardcoded in this method. """ while True: now = datetime.datetime.now().isoformat() print(f"{now} - Starting to send subscriptions...", flush=True) # Creating a context for the application to enable database operations with self.app.app_context(): # Retrieving all subscription data from the database all_subscriptions = Subscription.query.all() # Retrieving the VAPID key from the database vapid_key = VAPIDKey.query.first() # Constructing the push notification message message = {"title": "Daily Update", "message": f"Update for {now}"} # Sending a push notification to each subscribed client for subscription in all_subscriptions: subscription_info = { "endpoint": subscription.endpoint, "keys": { "p256dh": subscription.p256dh, "auth": subscription.auth } } WebPushService._send_push_notification(subscription_info, message, vapid_key) print(f"{now} - Finished sending {len(all_subscriptions)} subscriptions.", flush=True) # Sleeping for 24 hours before sending the next set of notifications time.sleep(24 * 60 * 60) # This is an endpoint, routed in __init__ def regenerate_vapid(self) -> Tuple[str, int]: """ Endpoint to regenerate VAPID keys. This method: 1. Deletes the current VAPID keys from the database. 2. Generates and stores new VAPID keys using the `_generate_and_save_vapid_keys` method. URL: /web-push/regenerate-vapid Method: POST Header: Authentication: Basic ... Returns: - tuple with "success" as True or False, and "message" message string Notes: - If the operation is successful, a JSON response with a success message is returned with a 200 status code. - If there's an error during the operation, a JSON response with the error message is returned with a 500 status code. """ # This default can be invoked thus: curl -X POST -H "Authorization: Basic YWRtaW46YWRtaW4=" localhost:3000/web-push/regenerate-vapid envPassword = os.getenv('ADMIN_PASSWORD', 'admin') auth = request.authorization if (auth is None or auth.username is None or auth.username != 'admin' or auth.password is None or auth.password != envPassword): return ( jsonify(error='Wrong password'), 401, {'WWW-Authenticate': 'Basic realm="Login Required"'} ) # Creating a context for the application to enable database operations try: with self.app.app_context(): # Deleting the current VAPID keys from the database VAPIDKey.query.delete() db.session.commit() # Generating and saving new VAPID keys self._generate_and_save_vapid_keys() return jsonify(success=True, message="VAPID keys regenerated successfully"), 200 except Exception as e: return jsonify(success=False, message=f'Error regenerating VAPID keys: {str(e)}'), 500 @staticmethod @app.route('/web-push/vapid') def vapid() -> Response: """ Endpoint to retrieve the current VAPID public key. This method fetches the VAPID public key from the database and returns it in a JSON response. URL: /web-push/vapid Method: GET Returns: - Response: A JSON response containing the VAPID public key. Notes: - The response contains a key "vapidKey" with the associated public key as its value. """ # Retrieving the VAPID key from the database key = VAPIDKey.query.first() # Returning the public key in a JSON response return jsonify(vapidKey=key.public_key) @staticmethod @app.route('/web-push/subscribe', methods=['POST']) def subscribe() -> Tuple[str, int]: """ Endpoint to handle new web push subscription requests. This method: 1. Retrieves the VAPID key from the database. 2. Reads the subscription content from the incoming request. 3. Saves the subscription data to the database. 4. Sends a confirmation push notification to the new subscriber. URL: /web-push/subscribe Method: POST Returns: - A JSON response with "success" as True or False, "message" as a response message string, and potentially a "result" as a request.Response object from sending a test notification Notes: - If the operation is successful, a confirmation push notification is sent to the subscriber with a success message. - If there are no VAPID keys available, an error message is returned with a 500 status code. - There's a hardcoded 5-second sleep after saving the subscription, which might be intended for some delay before sending the confirmation. Ensure this is the desired behavior. """ # Retrieving the content from the incoming request content = request.json # Retrieving the VAPID key from the database vapid_key = VAPIDKey.query.first() # Checking if the VAPID key is available if not vapid_key: return jsonify(success=False, message="No VAPID keys available"), 500 # Creating a new Subscription instance with the provided data subscription = Subscription(endpoint=content['endpoint'], p256dh=content['keys']['p256dh'], auth=content['keys']['auth'], vapid_key_id=vapid_key.id) # Saving the subscription data to the database db.session.add(subscription) db.session.commit() # Introducing a delay (ensure that gateway endpoint is available) time.sleep(10) # Constructing the subscription information for the push notification subscription_info = { "endpoint": subscription.endpoint, "keys": { "p256dh": subscription.p256dh, "auth": subscription.auth } } # Creating a confirmation message for the push notification message = {"title": "Subscription Successful", "message": "Thank you for subscribing!"} # Sending the confirmation push notification result = WebPushService._send_push_notification(subscription_info, message, vapid_key) # Returning the operation status return jsonify(success=result["success"], message=result["message"]) @staticmethod @app.route('/web-push/unsubscribe', methods=['POST']) def unsubscribe() -> Tuple[str, int]: """ Endpoint to handle web push unsubscription requests. This method: 1. Reads the endpoint from the incoming request. 2. Searches for the subscription in the database using the endpoint. 3. If found, deletes the subscription from the database. URL: /web-push/unsubscribe Method: POST Returns: - Tuple[str, int]: A JSON response indicating the success or failure of the operation, along with the appropriate HTTP status code. Notes: - If the unsubscription is successful, a JSON response with a success message is returned. - If the subscription is not found in the database, an error message is returned with a 404 status code. """ # Retrieving the endpoint from the incoming request content = request.json endpoint = content['endpoint'] # Searching for the subscription in the database using the endpoint subscription = Subscription.query.filter_by(endpoint=endpoint).first() # If the subscription is found, delete it from the database if subscription: db.session.delete(subscription) db.session.commit() return jsonify(success=True, message="Subscription deleted successfully") # If the subscription is not found, return an error message else: return jsonify(success=False, message="Subscription not found"), 404 @staticmethod @app.route('/web-push/send-test', methods=['POST']) def send_test() -> Tuple[str, int]: """ Endpoint to send a test push notification to a specific client. This method: 1. Retrieves the subscription information from the incoming request. 2. Looks up the subscription in the database. 3. Calls the _send_push_notification method to send a test push notification. 4. Returns the result of the _send_push_notification call. URL: /web-push/send-test Method: POST Returns: - A JSON response with the result of the _send_push_notification call. Notes: - The incoming request should contain the parameters "endpoint", "p256dh", and "auth". """ # Retrieving the subscription information from the incoming request content = request.json endpoint = content['endpoint'] p256dh = content['keys']['p256dh'] auth = content['keys']['auth'] # Looking up the subscription in the database subscription = Subscription.query.filter_by(endpoint=endpoint, p256dh=p256dh, auth=auth).first() # If the subscription is found, call the _send_push_notification method if subscription: subscription_info = { "endpoint": subscription.endpoint, "keys": { "p256dh": subscription.p256dh, "auth": subscription.auth } } title = "Test Notification" if "title" in content: title = content['title'] message = "This is a test notification." if "message" in content: message = content['message'] vapid_key = VAPIDKey.query.filter_by(id=subscription.vapid_key_id).first() result = WebPushService._send_push_notification( subscription_info, {"title": title, "message": message}, vapid_key ) return jsonify(success=result["success"], message=result["message"]) else: return jsonify({"success": False, "message": "Subscription not found"}), 404 web_push_service = WebPushService(app, "app")