You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
412 lines
16 KiB
412 lines
16 KiB
"""
|
|
Environment variables:
|
|
- SQLALCHEMY_DATABASE_URI: path to sqlite file, starting with "sqlite:////"
|
|
- ADMIN_PASSWORD: password for admin user for sensitive endpoints, defaults to 'admin'
|
|
"""
|
|
|
|
from typing import Dict, Tuple, Union, Optional
|
|
from flask import Flask, request, jsonify, Response
|
|
from models import db, VAPIDKey, Subscription
|
|
from cryptography.hazmat.backends import default_backend
|
|
from cryptography.hazmat.primitives import serialization
|
|
from cryptography.hazmat.primitives.asymmetric import ec
|
|
from pywebpush import webpush, WebPushException
|
|
|
|
import base64
|
|
import datetime
|
|
import json
|
|
import os
|
|
import threading
|
|
import time
|
|
|
|
CONTACT_EMAIL = "mailto:info@timesafari.app"
|
|
|
|
app = Flask(__name__)
|
|
|
|
class WebPushService():
|
|
"""
|
|
This class provides services for sending web push notifications.
|
|
"""
|
|
|
|
def __init__(self, app, config_name: str) -> None:
|
|
"""
|
|
Initializes the WebPushService with the given application and configuration name.
|
|
|
|
Args:
|
|
- app: The application instance where the service will be attached.
|
|
- config_name (str): The name of the configuration to be used.
|
|
|
|
Attributes:
|
|
- app: The application instance.
|
|
- daily_notification_thread (threading.Thread): A thread to send daily notifications.
|
|
"""
|
|
|
|
# Setting the application instance
|
|
self.app = app
|
|
self.app.add_url_rule('/web-push/regenerate_vapid', view_func=self.regenerate_vapid, methods=['POST'])
|
|
|
|
# Setting the database URI for the application
|
|
db_uri = os.getenv('SQLALCHEMY_DATABASE_URI', 'sqlite:////app/instance/data/webpush.db')
|
|
# This relative path works in docker-compose
|
|
#db_uri = os.getenv('SQLALCHEMY_DATABASE_URI', 'sqlite:///data/webpush.db')
|
|
self.app.config['SQLALCHEMY_DATABASE_URI'] = db_uri
|
|
|
|
# Initializing the database with the application
|
|
db.init_app(self.app)
|
|
|
|
# Creating a context for the application and initializing services
|
|
with self.app.app_context():
|
|
self._initialize()
|
|
|
|
# Creating and starting a thread to send daily notifications
|
|
self.daily_notification_thread = threading.Thread(target=self._send_daily_notifications)
|
|
self.daily_notification_thread.start()
|
|
|
|
|
|
def _generate_and_save_vapid_keys(self) -> None:
|
|
"""
|
|
Generates VAPID (Voluntary Application Server Identification) keys and saves them to the database.
|
|
|
|
The method generates a pair of public and private keys using the elliptic curve SECP256R1.
|
|
Both the public and private keys are then serialized and encoded in base64 format.
|
|
The keys are then stored in the database using a VAPIDKey model.
|
|
|
|
Notes:
|
|
- In case of any exception during the key generation or database operations, the error is printed to the console.
|
|
|
|
Raises:
|
|
- Exceptions raised by the key generation or database operations are caught and printed.
|
|
"""
|
|
try:
|
|
# Generating a private key using the elliptic curve SECP256R1
|
|
private_key = ec.generate_private_key(ec.SECP256R1(), default_backend())
|
|
|
|
# Serializing and encoding the public key to base64 format
|
|
public_key_bytes = private_key.public_key().public_bytes(
|
|
encoding=serialization.Encoding.X962,
|
|
format=serialization.PublicFormat.UncompressedPoint
|
|
)
|
|
public_key_base64 = base64.b64encode(public_key_bytes).decode()
|
|
|
|
# Serializing and encoding the private key to base64 format
|
|
private_key_bytes = private_key.private_bytes(
|
|
encoding=serialization.Encoding.DER,
|
|
format=serialization.PrivateFormat.PKCS8,
|
|
encryption_algorithm=serialization.NoEncryption()
|
|
)
|
|
private_key_base64 = base64.b64encode(private_key_bytes).decode()
|
|
|
|
# Saving the keys to the database
|
|
key = VAPIDKey(public_key=public_key_base64, private_key=private_key_base64)
|
|
db.session.add(key)
|
|
db.session.commit()
|
|
|
|
except Exception as e:
|
|
print(f"Error generating VAPID keys: {str(e)}")
|
|
raise e
|
|
|
|
|
|
def _initialize(self) -> None:
|
|
"""
|
|
Initializes the WebPushService by checking for the presence of VAPID keys in the database.
|
|
|
|
If no VAPID keys are found in the database, this method triggers the generation and storage
|
|
of new VAPID keys by invoking the `_generate_and_save_vapid_keys` method.
|
|
|
|
Notes:
|
|
- VAPID (Voluntary Application Server Identification) keys are essential for sending push notifications
|
|
to web clients, hence the check and generation if they don't exist.
|
|
"""
|
|
|
|
# Checking if there are any VAPID keys in the database
|
|
if not VAPIDKey.query.first():
|
|
|
|
# Generating and saving VAPID keys if none are found
|
|
self._generate_and_save_vapid_keys()
|
|
|
|
|
|
@staticmethod
|
|
def _send_push_notification(subscription_info: Dict, message: Dict, vapid_key: VAPIDKey) -> bool:
|
|
"""
|
|
Sends a push notification using the provided subscription information, message, and VAPID key.
|
|
|
|
Args:
|
|
- subscription_info (Dict): The information required to send the push notification to a specific client.
|
|
- message (Dict): The actual message content to be sent as the push notification.
|
|
- vapid_key (VAPIDKey): The VAPID key model instance containing the private key used for sending the notification.
|
|
|
|
Returns:
|
|
- request.Response https://requests.readthedocs.io/en/latest/api.html#requests.Response if the push notification was sent successfully
|
|
or False if there was an exception
|
|
|
|
Notes:
|
|
- The `webpush` function is used to send the notification.
|
|
- In case of any exception, especially a WebPushException, the error is printed to the console.
|
|
"""
|
|
|
|
# Sending the push notification using the webpush function
|
|
try:
|
|
result = webpush(
|
|
subscription_info=subscription_info,
|
|
data=json.dumps(message),
|
|
vapid_private_key=vapid_key.private_key,
|
|
vapid_claims={"sub": CONTACT_EMAIL}
|
|
)
|
|
time.sleep(1)
|
|
return {"success": True, "result": result}
|
|
|
|
except WebPushException as ex:
|
|
now = datetime.datetime.now().isoformat()
|
|
endpoint = subscription_info['endpoint']
|
|
print(f"{now}: Failed to send push notification for {endpoint} -- {ex}", flush=True)
|
|
|
|
unsubscribed_msg = '410 Gone'
|
|
if unsubscribed_msg in ex.args[0]:
|
|
subscription = Subscription.query.filter_by(endpoint=endpoint).first()
|
|
|
|
# Delete the subscription if found
|
|
if subscription:
|
|
db.session.delete(subscription)
|
|
db.session.commit()
|
|
print(f"Committed delete of {subscription_info}", flush=True)
|
|
else:
|
|
print(f"Could not find subscription at: {endpoint}", flush=True)
|
|
else:
|
|
print("Error other than unsubscribed/expired.", ex.args[0], flush=True)
|
|
|
|
return {"success": False, "message": ex.args[0]}
|
|
|
|
|
|
def _send_daily_notifications(self) -> None:
|
|
"""
|
|
Continuously sends daily push notifications to all subscribed clients.
|
|
|
|
This method:
|
|
1. Retrieves all subscription data from the database.
|
|
2. Constructs the push notification message.
|
|
3. Sends a push notification to each subscribed client.
|
|
4. Sleeps for 24 hours before repeating the process.
|
|
|
|
Notes:
|
|
- The method runs in an infinite loop, meaning it will keep sending notifications until the program is terminated.
|
|
- The notifications are sent using the `_send_push_notification` method.
|
|
- A context for the application is created to enable database operations.
|
|
- The message content for the daily update is hardcoded in this method.
|
|
"""
|
|
|
|
while True:
|
|
|
|
now = datetime.datetime.now().isoformat()
|
|
print(f"{now} - Starting to send subscriptions...", flush=True)
|
|
|
|
# Creating a context for the application to enable database operations
|
|
with self.app.app_context():
|
|
|
|
# Retrieving all subscription data from the database
|
|
all_subscriptions = Subscription.query.all()
|
|
|
|
# Retrieving the VAPID key from the database
|
|
vapid_key = VAPIDKey.query.first()
|
|
|
|
# Constructing the push notification message
|
|
message = {"title": "Daily Update", "message": f"Update for {now}"}
|
|
|
|
# Sending a push notification to each subscribed client
|
|
for subscription in all_subscriptions:
|
|
subscription_info = {
|
|
"endpoint": subscription.endpoint,
|
|
"keys": {
|
|
"p256dh": subscription.p256dh,
|
|
"auth": subscription.auth
|
|
}
|
|
}
|
|
WebPushService._send_push_notification(subscription_info, message, vapid_key)
|
|
print(f"{now} - Finished sending {len(all_subscriptions)} subscriptions.", flush=True)
|
|
|
|
# Sleeping for 24 hours before sending the next set of notifications
|
|
time.sleep(24 * 60 * 60)
|
|
|
|
|
|
# This is an endpoint, routed in __init__
|
|
def regenerate_vapid(self) -> Tuple[str, int]:
|
|
"""
|
|
Endpoint to regenerate VAPID keys.
|
|
|
|
This method:
|
|
1. Deletes the current VAPID keys from the database.
|
|
2. Generates and stores new VAPID keys using the `_generate_and_save_vapid_keys` method.
|
|
|
|
URL: /web-push/regenerate_vapid
|
|
Method: POST
|
|
Header: Authentication: Basic ...
|
|
|
|
Returns:
|
|
- tuple with "success" as True or False, and "message" message string
|
|
|
|
Notes:
|
|
- If the operation is successful, a JSON response with a success message is returned with a 200 status code.
|
|
- If there's an error during the operation, a JSON response with the error message is returned with a 500 status code.
|
|
"""
|
|
|
|
# This default can be invoked thus: curl -X POST -H "Authorization: Basic YWRtaW46YWRtaW4=" localhost:3000/web-push/regenerate_vapid
|
|
envPassword = os.getenv('ADMIN_PASSWORD', 'admin')
|
|
auth = request.authorization
|
|
if (auth is None
|
|
or auth.username is None
|
|
or auth.username != 'admin'
|
|
or auth.password is None
|
|
or auth.password != envPassword):
|
|
return (
|
|
jsonify(error='Wrong password'),
|
|
401,
|
|
{'WWW-Authenticate': 'Basic realm="Login Required"'}
|
|
)
|
|
|
|
# Creating a context for the application to enable database operations
|
|
try:
|
|
with self.app.app_context():
|
|
# Deleting the current VAPID keys from the database
|
|
VAPIDKey.query.delete()
|
|
db.session.commit()
|
|
|
|
# Generating and saving new VAPID keys
|
|
self._generate_and_save_vapid_keys()
|
|
|
|
return jsonify(success=True, message="VAPID keys regenerated successfully"), 200
|
|
|
|
except Exception as e:
|
|
return jsonify(success=False, message=f'Error regenerating VAPID keys: {str(e)}'), 500
|
|
|
|
|
|
@staticmethod
|
|
@app.route('/web-push/vapid')
|
|
def vapid() -> Response:
|
|
"""
|
|
Endpoint to retrieve the current VAPID public key.
|
|
|
|
This method fetches the VAPID public key from the database and returns it in a JSON response.
|
|
|
|
URL: /web-push/vapid
|
|
Method: GET
|
|
|
|
Returns:
|
|
- Response: A JSON response containing the VAPID public key.
|
|
|
|
Notes:
|
|
- The response contains a key "vapidKey" with the associated public key as its value.
|
|
"""
|
|
|
|
# Retrieving the VAPID key from the database
|
|
key = VAPIDKey.query.first()
|
|
|
|
# Returning the public key in a JSON response
|
|
return jsonify(vapidKey=key.public_key)
|
|
|
|
|
|
@staticmethod
|
|
@app.route('/web-push/subscribe', methods=['POST'])
|
|
def subscribe() -> Tuple[str, int]:
|
|
"""
|
|
Endpoint to handle new web push subscription requests.
|
|
|
|
This method:
|
|
1. Retrieves the VAPID key from the database.
|
|
2. Reads the subscription content from the incoming request.
|
|
3. Saves the subscription data to the database.
|
|
4. Sends a confirmation push notification to the new subscriber.
|
|
|
|
URL: /web-push/subscribe
|
|
Method: POST
|
|
|
|
Returns:
|
|
- A JSON response with "success" as True or False, "message" as a response message string, and potentially a "result" as a request.Response object from sending a test notification
|
|
|
|
Notes:
|
|
- If the operation is successful, a confirmation push notification is sent to the subscriber with a success message.
|
|
- If there are no VAPID keys available, an error message is returned with a 500 status code.
|
|
- There's a hardcoded 5-second sleep after saving the subscription, which might be intended for some delay before sending the confirmation. Ensure this is the desired behavior.
|
|
"""
|
|
|
|
# Retrieving the content from the incoming request
|
|
content = request.json
|
|
|
|
# Retrieving the VAPID key from the database
|
|
vapid_key = VAPIDKey.query.first()
|
|
|
|
# Checking if the VAPID key is available
|
|
if not vapid_key:
|
|
return jsonify(success=False, message="No VAPID keys available"), 500
|
|
|
|
# Creating a new Subscription instance with the provided data
|
|
subscription = Subscription(endpoint=content['endpoint'],
|
|
p256dh=content['keys']['p256dh'],
|
|
auth=content['keys']['auth'],
|
|
vapid_key_id=vapid_key.id)
|
|
|
|
# Saving the subscription data to the database
|
|
db.session.add(subscription)
|
|
db.session.commit()
|
|
|
|
# Introducing a delay (ensure that gateway endpoint is available)
|
|
time.sleep(10)
|
|
|
|
# Constructing the subscription information for the push notification
|
|
subscription_info = {
|
|
"endpoint": subscription.endpoint,
|
|
"keys": {
|
|
"p256dh": subscription.p256dh,
|
|
"auth": subscription.auth
|
|
}
|
|
}
|
|
|
|
# Creating a confirmation message for the push notification
|
|
message = {"title": "Subscription Successful", "message": "Thank you for subscribing!"}
|
|
|
|
# Sending the confirmation push notification
|
|
result = WebPushService._send_push_notification(subscription_info, message, vapid_key)
|
|
|
|
# Returning the operation status
|
|
return jsonify(success=result.ok, message=result.text, result=result)
|
|
|
|
|
|
@staticmethod
|
|
@app.route('/web-push/unsubscribe', methods=['POST'])
|
|
def unsubscribe() -> Tuple[str, int]:
|
|
"""
|
|
Endpoint to handle web push unsubscription requests.
|
|
|
|
This method:
|
|
1. Reads the endpoint from the incoming request.
|
|
2. Searches for the subscription in the database using the endpoint.
|
|
3. If found, deletes the subscription from the database.
|
|
|
|
URL: /web-push/unsubscribe
|
|
Method: POST
|
|
|
|
Returns:
|
|
- Tuple[str, int]: A JSON response indicating the success or failure of the operation, along with the appropriate HTTP status code.
|
|
|
|
Notes:
|
|
- If the unsubscription is successful, a JSON response with a success message is returned.
|
|
- If the subscription is not found in the database, an error message is returned with a 404 status code.
|
|
"""
|
|
|
|
# Retrieving the endpoint from the incoming request
|
|
content = request.json
|
|
endpoint = content['endpoint']
|
|
|
|
# Searching for the subscription in the database using the endpoint
|
|
subscription = Subscription.query.filter_by(endpoint=endpoint).first()
|
|
|
|
# If the subscription is found, delete it from the database
|
|
if subscription:
|
|
db.session.delete(subscription)
|
|
db.session.commit()
|
|
return jsonify(success=True, message="Subscription deleted successfully")
|
|
|
|
# If the subscription is not found, return an error message
|
|
else:
|
|
return jsonify(success=False, message="Subscription not found"), 404
|
|
|
|
|
|
web_push_service = WebPushService(app, "app")
|
|
|