You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

393 lines
15 KiB

"""
Environment variables:
- SQLALCHEMY_DATABASE_URI: path to sqlite file, starting with "sqlite:////"
- ADMIN_PASSWORD: password for admin user for sensitive endpoints, defaults to 'admin'
"""
from typing import Dict, Tuple, Union, Optional
from flask import Flask, request, jsonify, Response
from models import db, VAPIDKey, Subscription
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import ec
from pywebpush import webpush, WebPushException
import base64
import datetime
import json
import os
import threading
import time
CONTACT_EMAIL = "mailto:info@timesafari.app"
app = Flask(__name__)
class WebPushService():
"""
This class provides services for sending web push notifications.
"""
def __init__(self, app, config_name: str) -> None:
"""
Initializes the WebPushService with the given application and configuration name.
Args:
- app: The application instance where the service will be attached.
- config_name (str): The name of the configuration to be used.
Attributes:
- app: The application instance.
- daily_notification_thread (threading.Thread): A thread to send daily notifications.
"""
# Setting the application instance
self.app = app
self.app.add_url_rule('/web-push/regenerate_vapid', view_func=self.regenerate_vapid, methods=['POST'])
# Setting the database URI for the application
db_uri = os.getenv('SQLALCHEMY_DATABASE_URI', 'sqlite:////app/instance/data/webpush.db')
# This relative path works in docker-compose
#db_uri = os.getenv('SQLALCHEMY_DATABASE_URI', 'sqlite:///data/webpush.db')
self.app.config['SQLALCHEMY_DATABASE_URI'] = db_uri
# Initializing the database with the application
db.init_app(self.app)
# Creating a context for the application and initializing services
with self.app.app_context():
self._initialize()
# Creating and starting a thread to send daily notifications
self.daily_notification_thread = threading.Thread(target=self._send_daily_notifications)
self.daily_notification_thread.start()
def _generate_and_save_vapid_keys(self) -> None:
"""
Generates VAPID (Voluntary Application Server Identification) keys and saves them to the database.
The method generates a pair of public and private keys using the elliptic curve SECP256R1.
Both the public and private keys are then serialized and encoded in base64 format.
The keys are then stored in the database using a VAPIDKey model.
Notes:
- In case of any exception during the key generation or database operations, the error is printed to the console.
Raises:
- Exceptions raised by the key generation or database operations are caught and printed.
"""
try:
# Generating a private key using the elliptic curve SECP256R1
private_key = ec.generate_private_key(ec.SECP256R1(), default_backend())
# Serializing and encoding the public key to base64 format
public_key_bytes = private_key.public_key().public_bytes(
encoding=serialization.Encoding.X962,
format=serialization.PublicFormat.UncompressedPoint
)
public_key_base64 = base64.b64encode(public_key_bytes).decode()
# Serializing and encoding the private key to base64 format
private_key_bytes = private_key.private_bytes(
encoding=serialization.Encoding.DER,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption()
)
private_key_base64 = base64.b64encode(private_key_bytes).decode()
# Saving the keys to the database
key = VAPIDKey(public_key=public_key_base64, private_key=private_key_base64)
db.session.add(key)
db.session.commit()
except Exception as e:
print(f"Error generating VAPID keys: {e}")
def _initialize(self) -> None:
"""
Initializes the WebPushService by checking for the presence of VAPID keys in the database.
If no VAPID keys are found in the database, this method triggers the generation and storage
of new VAPID keys by invoking the `_generate_and_save_vapid_keys` method.
Notes:
- VAPID (Voluntary Application Server Identification) keys are essential for sending push notifications
to web clients, hence the check and generation if they don't exist.
"""
# Checking if there are any VAPID keys in the database
if not VAPIDKey.query.first():
# Generating and saving VAPID keys if none are found
self._generate_and_save_vapid_keys()
@staticmethod
def _send_push_notification(subscription_info: Dict, message: Dict, vapid_key: VAPIDKey) -> bool:
"""
Sends a push notification using the provided subscription information, message, and VAPID key.
Args:
- subscription_info (Dict): The information required to send the push notification to a specific client.
- message (Dict): The actual message content to be sent as the push notification.
- vapid_key (VAPIDKey): The VAPID key model instance containing the private key used for sending the notification.
Returns:
- bool: True if the push notification was sent successfully, False otherwise.
Notes:
- The `webpush` function is used to send the notification.
- In case of any exception, especially a WebPushException, the error is printed to the console.
"""
# Sending the push notification using the webpush function
try:
webpush(
subscription_info=subscription_info,
data=json.dumps(message),
vapid_private_key=vapid_key.private_key,
vapid_claims={"sub": CONTACT_EMAIL}
)
return True
except WebPushException as ex:
print(f"Failed to send push notification for {subscription_info['endpoint']} -- {ex}")
return False
def _send_daily_notifications(self) -> None:
"""
Continuously sends daily push notifications to all subscribed clients.
This method:
1. Retrieves all subscription data from the database.
2. Constructs the push notification message.
3. Sends a push notification to each subscribed client.
4. Sleeps for 24 hours before repeating the process.
Notes:
- The method runs in an infinite loop, meaning it will keep sending notifications until the program is terminated.
- The notifications are sent using the `_send_push_notification` method.
- A context for the application is created to enable database operations.
- The message content for the daily update is hardcoded in this method.
"""
while True:
now = datetime.datetime.now().isoformat()
print(f"{now} - Starting to send subscriptions...")
# Creating a context for the application to enable database operations
with self.app.app_context():
# Retrieving all subscription data from the database
all_subscriptions = Subscription.query.all()
# Retrieving the VAPID key from the database
vapid_key = VAPIDKey.query.first()
# Constructing the push notification message
message = {"title": "Daily Update", "message": f"Update for {now}"}
# Sending a push notification to each subscribed client
for subscription in all_subscriptions:
subscription_info = {
"endpoint": subscription.endpoint,
"keys": {
"p256dh": subscription.p256dh,
"auth": subscription.auth
}
}
WebPushService._send_push_notification(subscription_info, message, vapid_key)
print(f"{now} - Finished sending {len(all_subscriptions)} subscriptions.")
# Sleeping for 24 hours before sending the next set of notifications
time.sleep(24 * 60 * 60)
# This is an endpoint, routed in __init__
def regenerate_vapid(self) -> Tuple[str, int]:
"""
Endpoint to regenerate VAPID keys.
This method:
1. Deletes the current VAPID keys from the database.
2. Generates and stores new VAPID keys using the `_generate_and_save_vapid_keys` method.
URL: /web-push/regenerate_vapid
Method: POST
Header: Authentication: Basic ...
Returns:
- Tuple[str, int]: A JSON response indicating the success or failure of the operation, along with the appropriate HTTP status code.
Notes:
- If the operation is successful, a JSON response with a success message is returned with a 200 status code.
- If there's an error during the operation, a JSON response with the error message is returned with a 500 status code.
"""
# This default can be invoked thus: curl -X POST -H "Authorization: Basic YWRtaW46YWRtaW4=" localhost:3000/web-push/regenerate_vapid
envPassword = os.getenv('ADMIN_PASSWORD', 'admin')
auth = request.authorization
if (auth is None
or auth.username is None
or auth.username != 'admin'
or auth.password is None
or auth.password != envPassword):
return (
jsonify(error='Wrong password'),
401,
{'WWW-Authenticate': 'Basic realm="Login Required"'}
)
# Creating a context for the application to enable database operations
try:
with self.app.app_context():
# Deleting the current VAPID keys from the database
VAPIDKey.query.delete()
db.session.commit()
# Generating and saving new VAPID keys
self._generate_and_save_vapid_keys()
return jsonify(success=True, message="VAPID keys regenerated successfully"), 200
except Exception as e:
return jsonify(error=f'Error regenerating VAPID keys: {str(e)}'), 500
@staticmethod
@app.route('/web-push/vapid')
def vapid() -> Response:
"""
Endpoint to retrieve the current VAPID public key.
This method fetches the VAPID public key from the database and returns it in a JSON response.
URL: /web-push/vapid
Method: GET
Returns:
- Response: A JSON response containing the VAPID public key.
Notes:
- The response contains a key "vapidKey" with the associated public key as its value.
"""
# Retrieving the VAPID key from the database
key = VAPIDKey.query.first()
# Returning the public key in a JSON response
return jsonify(vapidKey=key.public_key)
@staticmethod
@app.route('/web-push/subscribe', methods=['POST'])
def subscribe() -> Tuple[str, int]:
"""
Endpoint to handle new web push subscription requests.
This method:
1. Retrieves the VAPID key from the database.
2. Reads the subscription content from the incoming request.
3. Saves the subscription data to the database.
4. Sends a confirmation push notification to the new subscriber.
URL: /web-push/subscribe
Method: POST
Returns:
- Tuple[str, int]: A JSON response indicating the success or failure of the operation, along with the appropriate HTTP status code.
Notes:
- If the operation is successful, a confirmation push notification is sent to the subscriber with a success message.
- If there are no VAPID keys available, an error message is returned with a 500 status code.
- There's a hardcoded 5-second sleep after saving the subscription, which might be intended for some delay before sending the confirmation. Ensure this is the desired behavior.
"""
# Retrieving the content from the incoming request
content = request.json
# Retrieving the VAPID key from the database
vapid_key = VAPIDKey.query.first()
# Checking if the VAPID key is available
if not vapid_key:
return jsonify(success=False, error="No VAPID keys available"), 500
# Creating a new Subscription instance with the provided data
subscription = Subscription(endpoint=content['endpoint'],
p256dh=content['keys']['p256dh'],
auth=content['keys']['auth'],
vapid_key_id=vapid_key.id)
# Saving the subscription data to the database
db.session.add(subscription)
db.session.commit()
# Introducing a delay (ensure that gateway endpoint is available)
time.sleep(10)
# Constructing the subscription information for the push notification
subscription_info = {
"endpoint": subscription.endpoint,
"keys": {
"p256dh": subscription.p256dh,
"auth": subscription.auth
}
}
# Creating a confirmation message for the push notification
message = {"title": "Subscription Successful", "message": "Thank you for subscribing!"}
# Sending the confirmation push notification
success = WebPushService._send_push_notification(subscription_info, message, vapid_key)
# Returning the operation status
return jsonify(success=success)
@staticmethod
@app.route('/web-push/unsubscribe', methods=['POST'])
def unsubscribe() -> Tuple[str, int]:
"""
Endpoint to handle web push unsubscription requests.
This method:
1. Reads the endpoint from the incoming request.
2. Searches for the subscription in the database using the endpoint.
3. If found, deletes the subscription from the database.
URL: /web-push/unsubscribe
Method: POST
Returns:
- Tuple[str, int]: A JSON response indicating the success or failure of the operation, along with the appropriate HTTP status code.
Notes:
- If the unsubscription is successful, a JSON response with a success message is returned.
- If the subscription is not found in the database, an error message is returned with a 404 status code.
"""
# Retrieving the endpoint from the incoming request
content = request.json
endpoint = content['endpoint']
# Searching for the subscription in the database using the endpoint
subscription = Subscription.query.filter_by(endpoint=endpoint).first()
# If the subscription is found, delete it from the database
if subscription:
db.session.delete(subscription)
db.session.commit()
return jsonify(success=True, message="Subscription deleted successfully")
# If the subscription is not found, return an error message
else:
return jsonify(success=False, error="Subscription not found"), 404
web_push_service = WebPushService(app, "app")