You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

582 lines
25 KiB

"""
Environment variables:
- SQLALCHEMY_DATABASE_URI: path to sqlite file, starting with "sqlite:////"
- ADMIN_PASSWORD: password for admin user for sensitive endpoints, defaults to 'admin'
"""
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import ec
from flask import Flask, request, jsonify, Response
from models import db, VAPIDKey, Settings, Subscription
from pywebpush import webpush, WebPushException
from sqlalchemy import and_
from typing import Dict, Tuple
import base64
import datetime
import json
import os
import threading
import time
CONTACT_EMAIL = "mailto:info@timesafari.app"
app = Flask(__name__)
class WebPushService():
latest_subscription_run = None
"""
This class provides services for sending web push notifications.
"""
def __init__(self, app, config_name: str) -> None:
"""
Initializes the WebPushService with the given application and configuration name.
Args:
- app: The application instance where the service will be attached.
- config_name (str): The name of the configuration to be used.
Attributes:
- app: The application instance.
- daily_notification_thread (threading.Thread): A thread to send daily notifications.
"""
# Setting the application instance
self.app = app
self.app.add_url_rule('/web-push/regenerate-vapid', view_func=self.regenerate_vapid, methods=['POST'])
self.app.add_url_rule('/web-push/ping', view_func=self.ping, methods=['GET'])
# Setting the database URI for the application
db_uri = os.getenv('SQLALCHEMY_DATABASE_URI', 'sqlite:////app/instance/data/webpush.db')
# This relative path works in docker-compose
# This relative path works on a local run if you link to this dir from "var/app-instance"
#db_uri = os.getenv('SQLALCHEMY_DATABASE_URI', 'sqlite:///data/webpush.db')
self.app.config['SQLALCHEMY_DATABASE_URI'] = db_uri
# Initializing the database with the application
db.init_app(self.app)
# Creating a context for the application and initializing services
with self.app.app_context():
self._initialize()
# Creating and starting a thread to send daily notifications
self.daily_notification_thread = threading.Thread(target=self._send_daily_notifications)
self.daily_notification_thread.start()
def _generate_and_save_vapid_keys(self) -> None:
"""
Generates VAPID (Voluntary Application Server Identification) keys and saves them to the database.
The method generates a pair of public and private keys using the elliptic curve SECP256R1.
Both the public and private keys are then serialized and encoded in base64 format.
The keys are then stored in the database using a VAPIDKey model.
Notes:
- In case of any exception during the key generation or database operations, the error is printed to the console.
Raises:
- Exceptions raised by the key generation or database operations are caught and printed.
"""
try:
# Generating a private key using the elliptic curve SECP256R1
private_key = ec.generate_private_key(ec.SECP256R1(), default_backend())
# Serializing and encoding the public key to base64 format
public_key_bytes = private_key.public_key().public_bytes(
encoding=serialization.Encoding.X962,
format=serialization.PublicFormat.UncompressedPoint
)
public_key_base64 = base64.b64encode(public_key_bytes).decode()
# Serializing and encoding the private key to base64 format
private_key_bytes = private_key.private_bytes(
encoding=serialization.Encoding.DER,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption()
)
private_key_base64 = base64.b64encode(private_key_bytes).decode()
# Saving the keys to the database
key = VAPIDKey(public_key=public_key_base64, private_key=private_key_base64)
db.session.add(key)
db.session.commit()
except Exception as e:
print(f"Error generating VAPID keys: {str(e)}")
raise e
def _initialize(self) -> None:
"""
Initializes the WebPushService by checking for the presence of VAPID keys in the database.
If no VAPID keys are found in the database, this method triggers the generation and storage
of new VAPID keys by invoking the `_generate_and_save_vapid_keys` method.
Notes:
- VAPID (Voluntary Application Server Identification) keys are essential for sending push notifications
to web clients, hence the check and generation if they don't exist.
"""
# Checking if there are any VAPID keys in the database
if not VAPIDKey.query.first():
# Generating and saving VAPID keys if none are found
self._generate_and_save_vapid_keys()
@staticmethod
def _send_push_notification(subscription_info: Dict, message: Dict, vapid_key: VAPIDKey) -> Dict[str, any]:
"""
Sends a push notification using the provided subscription information, message, and VAPID key.
Args:
- subscription_info (Dict): The information required to send the push notification to a specific client.
- message (Dict): The actual message content to be sent as the push notification.
- vapid_key (VAPIDKey): The VAPID key model instance containing the private key used for sending the notification.
Returns Dict with the following keys
- success: True if the push notification was sent successfully, False otherwise
- message: a string message with the resulting text, usually nothing on success
- result: request.Response https://requests.readthedocs.io/en/latest/api.html#requests.Response if the push notification was sent successfully
or False if there was an exception
Notes:
- The `webpush` function is used to send the notification.
- In case of any exception, especially a WebPushException, the error is printed to the console.
"""
# Sending the push notification using the webpush function
try:
result = webpush(
subscription_info=subscription_info,
data=json.dumps(message),
vapid_private_key=vapid_key.private_key,
vapid_claims={"sub": CONTACT_EMAIL}
)
# "because sometimes that's what I had to do to make it work!" - Matthew
time.sleep(1)
return {"success": result.status_code == 201, "message": result.text, "result": result}
except WebPushException as ex:
now = datetime.datetime.now().isoformat()
endpoint = subscription_info['endpoint']
print(f"{now}: Failed to send push notification for {endpoint} -- {ex}", flush=True)
unsubscribed_msg = '410 Gone'
unsubscribed = False
if unsubscribed_msg in ex.args[0]:
subscription = Subscription.query.filter_by(endpoint=endpoint).first()
# Delete the subscription if found
if subscription:
db.session.delete(subscription)
db.session.commit()
print(f"Committed delete of {subscription_info}", flush=True)
unsubscribed = True
else:
print(f"Could not find subscription at: {endpoint}", flush=True)
else:
print("Error other than unsubscribed/expired.", ex.args[0], flush=True)
return {"success": False, "message": str(ex), "error": ex, "unsubscribed": unsubscribed}
def _send_daily_notifications(self) -> None:
"""
Continuously sends daily push notifications to all subscribed clients.
This method:
1. Retrieves all subscription data from the database.
2. Constructs the push notification message.
3. Sends a push notification to each subscribed client.
4. Sleeps for 24 hours before repeating the process.
Notes:
- The method runs in an infinite loop, meaning it will keep sending notifications until the program is terminated.
- The notifications are sent using the `_send_push_notification` method.
- A context for the application is created to enable database operations.
- The message content for the daily update is hardcoded in this method.
"""
while True:
now = datetime.datetime.now()
print(f"{now} - Starting to send subscriptions...", flush=True)
# Creating a context for the application to enable database operations
with self.app.app_context():
# Retrieve the VAPID key from the database
vapid_key = VAPIDKey.query.first()
# Construct the push notification message
# The title value is a key, triggering the device to apply logic and customize both title and message.
# See https://gitea.anomalistdesign.com/trent_larson/crowd-funder-for-time-pwa/src/commit/1e6159869fc28ca6e6b5b3d186617d75705100b4/sw_scripts/additional-scripts.js#L65
UPDATE_TITLE = "DAILY_CHECK"
message = {"title": UPDATE_TITLE, "message": f"Update for {now}"}
# Determine the beginning and end time to check for subscriptions
settings = Settings.query.first()
if settings.running_notify_end_time is None:
# set all subscriptions without a time to the current time
# (these won't be picked-up in the current run, since thie current minute is the end minute)
Subscription.query.filter_by(notify_time=None).update({Subscription.notify_time: now.strftime('%H:%M')})
"""
Storing the HH:MM for the desired notification time isn't a bad idea.
However, the logic that compares directly to it is a bit complicated.
It would be more straightforward to save the next notification time in each record
and then update that every time this process runs. It would require raw SQL with
some calculations and many DB updates each day, but the logic would be more clear.
"""
# get the previous notify end time from the DB as a datetime
prev_notify_end_time = datetime.datetime.fromisoformat(settings.prev_notify_end_time)
# if it's before midnight this morning, catch us up to the beginning of today:
if prev_notify_end_time < now.replace(hour=0, minute=0, second=0, microsecond=0):
# make the start time the later of: prevNotifyEndTime or yesterday at this time
start_time = max(
prev_notify_end_time,
# if the current time is later in the day, use that
# because the next run will pick up everything from midnight until now today
now.replace(second=0, microsecond=0) - datetime.timedelta(days=1)
)
# make the end time right at midnight at the beginning of today
end_time = now.replace(hour=0, minute=0, second=0, microsecond=0)
start_minute = start_time.strftime('%H:%M')
# (we'd never catch anything if we used a non-zero start_minute and "00:00" as the end_minute)
end_minute = '23:60' # gotta catch "23:59", too
else:
# the start time is OK
start_time = prev_notify_end_time
# the end time is now
end_time = now.replace(second=0, microsecond=0)
start_minute = start_time.strftime('%H:%M')
end_minute = end_time.strftime('%H:%M')
# This really should update & continue only if the running_notify_end_time is still None,
# just in case another thread started.
settings.running_notify_end_time = end_time.isoformat()
db.session.commit()
# this check was generated by Copilot; it's probably unnecessary
if settings.running_notify_end_time == end_time.isoformat():
# Now get the 'HH:MM' value for start & end so we can compare to the notify_time field
# get all the subscriptions that have a notify_time between start_minute inclusive and end_minute exclusive
all_subscriptions = Subscription.query.filter(
and_(Subscription.notify_time >= start_minute, Subscription.notify_time < end_minute)
)
# Send a push notification to each subscribed client
num_subscriptions = all_subscriptions.count()
for subscription in all_subscriptions:
subscription_info = {
"endpoint": subscription.endpoint,
"keys": {
"p256dh": subscription.p256dh,
"auth": subscription.auth
}
}
result = WebPushService._send_push_notification(subscription_info, message, vapid_key)
print(
f"Result from sub {subscription.id}: success={result['success']} text={result['message']}",
flush=True
)
settings.prev_notify_end_time = end_time.isoformat()
settings.running_notify_end_time = None
db.session.commit()
print(f"{now} - Finished sending {num_subscriptions} subscriptions.", flush=True)
else:
print(f"{now} - Failed to update running_notify_end_time", flush=True)
else:
print(f"{now} - Stopped because we're already running a notification check.", flush=True)
self.latest_subscription_run = now.isoformat()
# Sleep before repeating
time.sleep(5 * 60)
# This is an endpoint, routed in __init__
def ping(self) -> str:
"""
Endpoint to show liveness info
Returns:
- Response: Text with some subscription-run info
"""
return f"pong ... with latest subscription run at {self.latest_subscription_run}"
# This is an endpoint, routed in __init__
def regenerate_vapid(self) -> Tuple[Response, int, dict[str, str]] | Tuple[Response, int]:
"""
Endpoint to regenerate VAPID keys.
This method:
1. Deletes the current VAPID keys from the database.
2. Generates and stores new VAPID keys using the `_generate_and_save_vapid_keys` method.
URL: /web-push/regenerate-vapid
Method: POST
Header: Authentication: Basic ...
Returns:
- Tuple with "success" as True or False, and "message" message string
Notes:
- If the operation is successful, a JSON response with a success message is returned with a 200 status code.
- If there's an error during the operation, a JSON response with the error message is returned with a 500 status code.
"""
# This default can be invoked thus: curl -X POST -H "Authorization: Basic YWRtaW46YWRtaW4=" localhost:3000/web-push/regenerate-vapid
envPassword = os.getenv('ADMIN_PASSWORD', 'admin')
auth = request.authorization
if (auth is None
or auth.username is None
or auth.username != 'admin'
or auth.password is None
or auth.password != envPassword):
return (
jsonify(error='Wrong password'),
401,
{'WWW-Authenticate': 'Basic realm="Login Required"'}
)
# Creating a context for the application to enable database operations
try:
with self.app.app_context():
# Deleting the current VAPID keys from the database
VAPIDKey.query.delete()
db.session.commit()
# Generating and saving new VAPID keys
self._generate_and_save_vapid_keys()
return jsonify(success=True, message="VAPID keys regenerated successfully"), 200
except Exception as e:
return jsonify(success=False, message=f'Error regenerating VAPID keys: {str(e)}'), 500
@staticmethod
@app.route('/web-push/vapid')
def vapid() -> Response:
"""
Endpoint to retrieve the current VAPID public key.
This method fetches the VAPID public key from the database and returns it in a JSON response.
URL: /web-push/vapid
Method: GET
Returns:
- Response: A JSON response containing the VAPID public key.
Notes:
- The response contains a key "vapidKey" with the associated public key as its value.
"""
# Retrieving the VAPID key from the database
key = VAPIDKey.query.first()
# Returning the public key in a JSON response
return jsonify(vapidKey=key.public_key)
@staticmethod
@app.route('/web-push/subscribe', methods=['POST'])
def subscribe() -> Tuple[Response, int]:
"""
Endpoint to handle new web push subscription requests.
This method:
1. Retrieves the VAPID key from the database.
2. Reads the subscription content from the incoming request.
3. Saves the subscription data to the database.
4. Sends a confirmation push notification to the new subscriber.
URL: /web-push/subscribe
Method: POST
Returns:
- A JSON response with "success" as True or False, "message" as a response message string, and potentially a "result" as a request.Response object from sending a test notification
Notes:
- If the operation is successful, a confirmation push notification is sent to the subscriber with a success message.
- If there are no VAPID keys available, an error message is returned with a 500 status code.
- There's a hardcoded 5-second sleep after saving the subscription, which might be intended for some delay before sending the confirmation. Ensure this is the desired behavior.
"""
# Retrieving the content from the incoming request
content = request.json
# Retrieving the VAPID key from the database
vapid_key = VAPIDKey.query.first()
# Checking if the VAPID key is available
if not vapid_key:
return jsonify(success=False, message="No VAPID keys available"), 500
# Constructing the notify_time string
notify_time = "13:13" # random time that is in most people's waking hours (server time, typically UTC)
if ('notifyTime' in content) and ('utcHour' in content['notifyTime']):
notify_hour = content['notifyTime']['utcHour']
if 'minute' in content['notifyTime']:
notify_minute = content['notifyTime']['minute']
else:
notify_minute = 0
notify_time = '{:02d}'.format(notify_hour) + ":" + '{:02d}'.format(notify_minute)
# Creating a new Subscription instance with the provided data
subscription = Subscription(auth=content['keys']['auth'],
endpoint=content['endpoint'],
notify_time=notify_time,
p256dh=content['keys']['p256dh'],
vapid_key_id=vapid_key.id)
# Saving the subscription data to the database
db.session.add(subscription)
db.session.commit()
# Introducing a delay (ensure that gateway endpoint is available)
# ... which I'm now commenting out because there's no pending request so it doesn't make sense... we'll see if things still work
#time.sleep(10)
# Constructing the subscription information for the push notification
subscription_info = {
"endpoint": subscription.endpoint,
"keys": {
"p256dh": subscription.p256dh,
"auth": subscription.auth
}
}
# Creating a confirmation message for the push notification
message = {"title": "Subscription Successful", "message": "Thank you for subscribing!"}
# Sending the confirmation push notification
result = WebPushService._send_push_notification(subscription_info, message, vapid_key)
# Returning the operation status
return jsonify(success=result["success"], message=result["message"]), 200
@staticmethod
@app.route('/web-push/unsubscribe', methods=['POST'])
def unsubscribe() -> Tuple[Response, int]:
"""
Endpoint to handle web push unsubscription requests.
This method:
1. Reads the endpoint from the incoming request.
2. Searches for the subscription in the database using the endpoint.
3. If found, deletes the subscription from the database.
URL: /web-push/unsubscribe
Method: POST
Returns:
- Tuple[str, int]: A JSON response indicating the success or failure of the operation, along with the appropriate HTTP status code.
Notes:
- If the unsubscription is successful, a JSON response with a success message is returned.
- If the subscription is not found in the database, an error message is returned with a 404 status code.
"""
# Retrieving the endpoint from the incoming request
content = request.json
endpoint = content['endpoint']
# Searching for the subscription in the database using the endpoint
subscription = Subscription.query.filter_by(endpoint=endpoint).first()
# If the subscription is found, delete it from the database
if subscription:
db.session.delete(subscription)
db.session.commit()
return jsonify(success=True, message="Subscription deleted successfully"), 200
# If the subscription is not found, return an error message
else:
return jsonify(success=False, message="Subscription not found"), 404
@staticmethod
@app.route('/web-push/send-test', methods=['POST'])
def send_test() -> Tuple[Response, int]:
"""
Endpoint to send a test push notification to a specific client.
This method:
1. Retrieves the subscription information from the incoming request.
2. Looks up the subscription in the database.
3. Calls the _send_push_notification method to send a test push notification.
- The subscription will include the "title" and "message" if supplied in the body object.
4. Returns the result of the _send_push_notification call.
URL: /web-push/send-test
Method: POST
Returns:
- A JSON response with the result of the _send_push_notification call.
Notes:
- The incoming request should contain the parameters "endpoint", "p256dh", and "auth".
"""
# Retrieving the subscription information from the incoming request
content = request.json
endpoint = content['endpoint']
p256dh = content['keys']['p256dh']
auth = content['keys']['auth']
# Looking up the subscription in the database
subscription = Subscription.query.filter_by(endpoint=endpoint, p256dh=p256dh, auth=auth).first()
# If the subscription is found, call the _send_push_notification method
if subscription:
subscription_info = {
"endpoint": subscription.endpoint,
"keys": {
"p256dh": subscription.p256dh,
"auth": subscription.auth
}
}
title = "Test Notification"
if "title" in content:
title = content['title']
message = "This is a test notification."
if "message" in content:
message = content['message']
vapid_key = VAPIDKey.query.filter_by(id=subscription.vapid_key_id).first()
result = WebPushService._send_push_notification(
subscription_info,
{"title": title, "message": message},
vapid_key
)
print(f"Test sent: {result['success']}")
return jsonify(success=result["success"], message=result["message"]), 200
else:
print(f"Test failed due to missing subscription. Request: {json.dumps(content)}")
return jsonify({"success": False, "message": "Subscription not found"}), 404
web_push_service = WebPushService(app, "app")