"""
Environment variables :
- SQLALCHEMY_DATABASE_URI : path to sqlite file , starting with " sqlite://// "
- ADMIN_PASSWORD : password for admin user for sensitive endpoints , defaults to ' admin '
"""
from cryptography . hazmat . backends import default_backend
from cryptography . hazmat . primitives import serialization
from cryptography . hazmat . primitives . asymmetric import ec
from flask import Flask , request , jsonify , Response
from models import db , VAPIDKey , Settings , Subscription
from pywebpush import webpush , WebPushException
from sqlalchemy import and_
from typing import Dict , Tuple
import base64
import datetime
import json
import os
import threading
import time
CONTACT_EMAIL = " mailto:info@timesafari.app "
PUSH_SERVER_VERSION = os . getenv ( ' PUSH_SERVER_VERSION ' )
app = Flask ( __name__ )
class WebPushService ( ) :
latest_subscription_run = None
"""
This class provides services for sending web push notifications .
"""
def __init__ ( self , app , config_name : str ) - > None :
"""
Initializes the WebPushService with the given application and configuration name .
Args :
- app : The application instance where the service will be attached .
- config_name ( str ) : The name of the configuration to be used .
Attributes :
- app : The application instance .
- daily_notification_thread ( threading . Thread ) : A thread to send daily notifications .
"""
# Setting the application instance
self . app = app
self . app . add_url_rule ( ' /web-push/regenerate-vapid ' , view_func = self . regenerate_vapid , methods = [ ' POST ' ] )
self . app . add_url_rule ( ' /web-push/ping ' , view_func = self . ping , methods = [ ' GET ' ] )
# Setting the database URI for the application
db_uri = os . getenv ( ' SQLALCHEMY_DATABASE_URI ' , ' sqlite:////app/instance/data/webpush.db ' )
# This relative path works in docker-compose
# This relative path works on a local run if you link to this dir from "var/app-instance"
#db_uri = os.getenv('SQLALCHEMY_DATABASE_URI', 'sqlite:///data/webpush.db')
self . app . config [ ' SQLALCHEMY_DATABASE_URI ' ] = db_uri
# Initializing the database with the application
db . init_app ( self . app )
# Creating a context for the application and initializing services
with self . app . app_context ( ) :
self . _initialize ( )
# Creating and starting a thread to send daily notifications
self . daily_notification_thread = threading . Thread ( target = self . _send_daily_notifications )
self . daily_notification_thread . start ( )
def _generate_and_save_vapid_keys ( self ) - > None :
"""
Generates VAPID ( Voluntary Application Server Identification ) keys and saves them to the database .
The method generates a pair of public and private keys using the elliptic curve SECP256R1 .
Both the public and private keys are then serialized and encoded in base64 format .
The keys are then stored in the database using a VAPIDKey model .
Notes :
- In case of any exception during the key generation or database operations , the error is printed to the console .
Raises :
- Exceptions raised by the key generation or database operations are caught and printed .
"""
try :
# Generating a private key using the elliptic curve SECP256R1
private_key = ec . generate_private_key ( ec . SECP256R1 ( ) , default_backend ( ) )
# Serializing and encoding the public key to base64 format
public_key_bytes = private_key . public_key ( ) . public_bytes (
encoding = serialization . Encoding . X962 ,
format = serialization . PublicFormat . UncompressedPoint
)
public_key_base64 = base64 . b64encode ( public_key_bytes ) . decode ( )
# Serializing and encoding the private key to base64 format
private_key_bytes = private_key . private_bytes (
encoding = serialization . Encoding . DER ,
format = serialization . PrivateFormat . PKCS8 ,
encryption_algorithm = serialization . NoEncryption ( )
)
private_key_base64 = base64 . b64encode ( private_key_bytes ) . decode ( )
# Saving the keys to the database
key = VAPIDKey ( public_key = public_key_base64 , private_key = private_key_base64 )
db . session . add ( key )
db . session . commit ( )
except Exception as e :
print ( f " Error generating VAPID keys: { str ( e ) } " )
raise e
def _initialize ( self ) - > None :
"""
Initializes the WebPushService by checking for the presence of VAPID keys in the database .
If no VAPID keys are found in the database , this method triggers the generation and storage
of new VAPID keys by invoking the ` _generate_and_save_vapid_keys ` method .
Notes :
- VAPID ( Voluntary Application Server Identification ) keys are essential for sending push notifications
to web clients , hence the check and generation if they don ' t exist.
"""
# Checking if there are any VAPID keys in the database
if not VAPIDKey . query . first ( ) :
# Generating and saving VAPID keys if none are found
self . _generate_and_save_vapid_keys ( )
@staticmethod
def _send_push_notification ( subscription_info : Dict , message : Dict , vapid_key : VAPIDKey ) - > Dict [ str , any ] :
"""
Sends a push notification using the provided subscription information , message , and VAPID key .
Args :
- subscription_info ( Dict ) : The information required to send the push notification to a specific client .
- message ( Dict ) : The actual message content to be sent as the push notification .
- vapid_key ( VAPIDKey ) : The VAPID key model instance containing the private key used for sending the notification .
Returns Dict with the following keys
- success : True if the push notification was sent successfully , False otherwise
- message : a string message with the resulting text , usually nothing on success
- result : request . Response https : / / requests . readthedocs . io / en / latest / api . html #requests.Response if the push notification was sent successfully
or False if there was an exception
Notes :
- The ` webpush ` function is used to send the notification .
- In case of any exception , especially a WebPushException , the error is printed to the console .
"""
# Sending the push notification using the webpush function
try :
result = webpush (
subscription_info = subscription_info ,
data = json . dumps ( message ) ,
vapid_private_key = vapid_key . private_key ,
vapid_claims = { " sub " : CONTACT_EMAIL }
)
# "because sometimes that's what I had to do to make it work!" - Matthew
time . sleep ( 1 )
return { " success " : result . status_code == 201 , " message " : result . text , " result " : result }
except WebPushException as ex :
now = datetime . datetime . now ( ) . isoformat ( )
endpoint = subscription_info [ ' endpoint ' ]
print ( f " { now } : Failed to send push notification for { endpoint } -- { ex } " , flush = True )
unsubscribed_msg = ' 410 Gone '
unsubscribed = False
if unsubscribed_msg in ex . args [ 0 ] :
subscription = Subscription . query . filter_by ( endpoint = endpoint ) . first ( )
# Delete the subscription if found
if subscription :
db . session . delete ( subscription )
db . session . commit ( )
print ( f " Committed delete of { subscription_info } " , flush = True )
unsubscribed = True
else :
print ( f " Could not find subscription at: { endpoint } " , flush = True )
else :
print ( " Error other than unsubscribed/expired. " , ex . args [ 0 ] , flush = True )
return { " success " : False , " message " : str ( ex ) , " error " : ex , " unsubscribed " : unsubscribed }
def _send_daily_notifications ( self ) - > None :
"""
Continuously sends daily push notifications to all subscribed clients .
This method :
1. Retrieves all subscription data from the database .
2. Constructs the push notification message .
3. Sends a push notification to each subscribed client .
4. Sleeps for 24 hours before repeating the process .
Notes :
- The method runs in an infinite loop , meaning it will keep sending notifications until the program is terminated .
- The notifications are sent using the ` _send_push_notification ` method .
- A context for the application is created to enable database operations .
- The message content for the daily update is hardcoded in this method .
"""
while True :
now = datetime . datetime . now ( )
print ( f " { now } - Starting to send subscriptions... " , flush = True )
# Creating a context for the application to enable database operations
with self . app . app_context ( ) :
# Retrieve the VAPID key from the database
vapid_key = VAPIDKey . query . first ( )
# Construct the push notification message
# The title value is a key, triggering the device to apply logic and customize both title and message.
# See https://gitea.anomalistdesign.com/trent_larson/crowd-funder-for-time-pwa/src/commit/1e6159869fc28ca6e6b5b3d186617d75705100b4/sw_scripts/additional-scripts.js#L65
UPDATE_TITLE = " DAILY_CHECK "
message = { " title " : UPDATE_TITLE , " message " : f " Update for { now } " }
# Determine the beginning and end time to check for subscriptions
settings = Settings . query . first ( )
if settings . running_notify_end_time is None :
# set all subscriptions without a time to the current time
# (these won't be picked-up in the current run, since thie current minute is the end minute)
Subscription . query . filter_by ( notify_time = None ) . update ( { Subscription . notify_time : now . strftime ( ' % H: % M ' ) } )
"""
Storing the HH : MM for the desired notification time isn ' t a bad idea.
However , the logic that compares directly to it is a bit complicated .
It would be more straightforward to save the next notification time in each record
and then update that every time this process runs . It would require raw SQL with
some calculations and many DB updates each day , but the logic would be more clear .
"""
# get the previous notify end time from the DB as a datetime
prev_notify_end_time = datetime . datetime . fromisoformat ( settings . prev_notify_end_time )
# if it's before midnight this morning, catch us up to the beginning of today:
if prev_notify_end_time < now . replace ( hour = 0 , minute = 0 , second = 0 , microsecond = 0 ) :
# make the start time the later of: prevNotifyEndTime or yesterday at this time
start_time = max (
prev_notify_end_time ,
# if the current time is later in the day, use that
# because the next run will pick up everything from midnight until now today
now . replace ( second = 0 , microsecond = 0 ) - datetime . timedelta ( days = 1 )
)
# make the end time right at midnight at the beginning of today
end_time = now . replace ( hour = 0 , minute = 0 , second = 0 , microsecond = 0 )
start_minute = start_time . strftime ( ' % H: % M ' )
# (we'd never catch anything if we used a non-zero start_minute and "00:00" as the end_minute)
end_minute = ' 23:60 ' # gotta catch "23:59", too
else :
# the start time is OK
start_time = prev_notify_end_time
# the end time is now
end_time = now . replace ( second = 0 , microsecond = 0 )
start_minute = start_time . strftime ( ' % H: % M ' )
end_minute = end_time . strftime ( ' % H: % M ' )
# This really should update & continue only if the running_notify_end_time is still None,
# just in case another thread started.
settings . running_notify_end_time = end_time . isoformat ( )
db . session . commit ( )
# this check was generated by Copilot; it's probably unnecessary
if settings . running_notify_end_time == end_time . isoformat ( ) :
# Now get the 'HH:MM' value for start & end so we can compare to the notify_time field
# get all the subscriptions that have a notify_time between start_minute inclusive and end_minute exclusive
all_subscriptions = Subscription . query . filter (
and_ ( Subscription . notify_time > = start_minute , Subscription . notify_time < end_minute )
)
# Send a push notification to each subscribed client
num_subscriptions = all_subscriptions . count ( )
for subscription in all_subscriptions :
subscription_info = {
" endpoint " : subscription . endpoint ,
" keys " : {
" p256dh " : subscription . p256dh ,
" auth " : subscription . auth
}
}
result = WebPushService . _send_push_notification ( subscription_info , message , vapid_key )
print (
f " Result from sub { subscription . id } : success= { result [ ' success ' ] } text= { result [ ' message ' ] } " ,
flush = True
)
settings . prev_notify_end_time = end_time . isoformat ( )
settings . running_notify_end_time = None
db . session . commit ( )
print ( f " { now } - Finished sending { num_subscriptions } subscriptions. " , flush = True )
else :
print ( f " { now } - Failed to update running_notify_end_time " , flush = True )
else :
print ( f " { now } - Stopped because we ' re already running a notification check. " , flush = True )
self . latest_subscription_run = now . isoformat ( )
# Sleep before repeating
time . sleep ( 5 * 60 )
# This is an endpoint, routed in __init__
def ping ( self ) - > str :
"""
Endpoint to show liveness info
Returns :
- Response : Text with some subscription - run info
"""
return f " pong ... version { PUSH_SERVER_VERSION } ... with latest subscription run at { self . latest_subscription_run } "
# This is an endpoint, routed in __init__
def regenerate_vapid ( self ) - > Tuple [ Response , int , dict [ str , str ] ] | Tuple [ Response , int ] :
"""
Endpoint to regenerate VAPID keys .
This method :
1. Deletes the current VAPID keys from the database .
2. Generates and stores new VAPID keys using the ` _generate_and_save_vapid_keys ` method .
URL : / web - push / regenerate - vapid
Method : POST
Header : Authentication : Basic . . .
Returns :
- Tuple with " success " as True or False , and " message " message string
Notes :
- If the operation is successful , a JSON response with a success message is returned with a 200 status code .
- If there ' s an error during the operation, a JSON response with the error message is returned with a 500 status code.
"""
# This default can be invoked thus: curl -X POST -H "Authorization: Basic YWRtaW46YWRtaW4=" localhost:3000/web-push/regenerate-vapid
envPassword = os . getenv ( ' ADMIN_PASSWORD ' , ' admin ' )
auth = request . authorization
if ( auth is None
or auth . username is None
or auth . username != ' admin '
or auth . password is None
or auth . password != envPassword ) :
return (
jsonify ( error = ' Wrong password ' ) ,
401 ,
{ ' WWW-Authenticate ' : ' Basic realm= " Login Required " ' }
)
# Creating a context for the application to enable database operations
try :
with self . app . app_context ( ) :
# Deleting the current VAPID keys from the database
VAPIDKey . query . delete ( )
db . session . commit ( )
# Generating and saving new VAPID keys
self . _generate_and_save_vapid_keys ( )
return jsonify ( success = True , message = " VAPID keys regenerated successfully " ) , 200
except Exception as e :
return jsonify ( success = False , message = f ' Error regenerating VAPID keys: { str ( e ) } ' ) , 500
@staticmethod
@app . route ( ' /web-push/vapid ' )
def vapid ( ) - > Response :
"""
Endpoint to retrieve the current VAPID public key .
This method fetches the VAPID public key from the database and returns it in a JSON response .
URL : / web - push / vapid
Method : GET
Returns :
- Response : A JSON response containing the VAPID public key .
Notes :
- The response contains a key " vapidKey " with the associated public key as its value .
"""
# Retrieving the VAPID key from the database
key = VAPIDKey . query . first ( )
# Returning the public key in a JSON response
return jsonify ( vapidKey = key . public_key )
@staticmethod
@app . route ( ' /web-push/subscribe ' , methods = [ ' POST ' ] )
def subscribe ( ) - > Tuple [ Response , int ] :
"""
Endpoint to handle new web push subscription requests .
This method :
1. Retrieves the VAPID key from the database .
2. Reads the subscription content from the incoming request .
3. Saves the subscription data to the database .
4. Sends a confirmation push notification to the new subscriber .
URL : / web - push / subscribe
Method : POST
Returns :
- A JSON response with " success " as True or False , " message " as a response message string , and potentially a " result " as a request . Response object from sending a test notification
Notes :
- If the operation is successful , a confirmation push notification is sent to the subscriber with a success message .
- If there are no VAPID keys available , an error message is returned with a 500 status code .
- There ' s a hardcoded 5-second sleep after saving the subscription, which might be intended for some delay before sending the confirmation. Ensure this is the desired behavior.
"""
# Retrieving the content from the incoming request
content = request . json
# Retrieving the VAPID key from the database
vapid_key = VAPIDKey . query . first ( )
# Checking if the VAPID key is available
if not vapid_key :
return jsonify ( success = False , message = " No VAPID keys available " ) , 500
# Constructing the notify_time string
notify_time = " 13:13 " # random time that is in most people's waking hours (server time, typically UTC)
if ( ' notifyTime ' in content ) and ( ' utcHour ' in content [ ' notifyTime ' ] ) :
notify_hour = content [ ' notifyTime ' ] [ ' utcHour ' ]
if ' minute ' in content [ ' notifyTime ' ] :
notify_minute = content [ ' notifyTime ' ] [ ' minute ' ]
else :
notify_minute = 0
notify_time = ' {:02d} ' . format ( notify_hour ) + " : " + ' {:02d} ' . format ( notify_minute )
# Creating a new Subscription instance with the provided data
subscription = Subscription ( auth = content [ ' keys ' ] [ ' auth ' ] ,
endpoint = content [ ' endpoint ' ] ,
notify_time = notify_time ,
p256dh = content [ ' keys ' ] [ ' p256dh ' ] ,
vapid_key_id = vapid_key . id )
# Saving the subscription data to the database
db . session . add ( subscription )
db . session . commit ( )
# Introducing a delay (ensure that gateway endpoint is available)
# ... which I'm now commenting out because there's no pending request so it doesn't make sense... we'll see if things still work
#time.sleep(10)
# Constructing the subscription information for the push notification
subscription_info = {
" endpoint " : subscription . endpoint ,
" keys " : {
" p256dh " : subscription . p256dh ,
" auth " : subscription . auth
}
}
# Creating a confirmation message for the push notification
message = { " title " : " Subscription Successful " , " message " : " Thank you for subscribing! " }
# Sending the confirmation push notification
result = WebPushService . _send_push_notification ( subscription_info , message , vapid_key )
# Returning the operation status
return jsonify ( success = result [ " success " ] , message = result [ " message " ] ) , 200
@staticmethod
@app . route ( ' /web-push/unsubscribe ' , methods = [ ' POST ' ] )
def unsubscribe ( ) - > Tuple [ Response , int ] :
"""
Endpoint to handle web push unsubscription requests .
This method :
1. Reads the endpoint from the incoming request .
2. Searches for the subscription in the database using the endpoint .
3. If found , deletes the subscription from the database .
URL : / web - push / unsubscribe
Method : POST
Returns :
- Tuple [ str , int ] : A JSON response indicating the success or failure of the operation , along with the appropriate HTTP status code .
Notes :
- If the unsubscription is successful , a JSON response with a success message is returned .
- If the subscription is not found in the database , an error message is returned with a 404 status code .
"""
# Retrieving the endpoint from the incoming request
content = request . json
endpoint = content [ ' endpoint ' ]
# Searching for the subscription in the database using the endpoint
subscription = Subscription . query . filter_by ( endpoint = endpoint ) . first ( )
# If the subscription is found, delete it from the database
if subscription :
db . session . delete ( subscription )
db . session . commit ( )
return jsonify ( success = True , message = " Subscription deleted successfully " ) , 200
# If the subscription is not found, return an error message
else :
return jsonify ( success = False , message = " Subscription not found " ) , 404
@staticmethod
@app . route ( ' /web-push/send-test ' , methods = [ ' POST ' ] )
def send_test ( ) - > Tuple [ Response , int ] :
"""
Endpoint to send a test push notification to a specific client .
This method :
1. Retrieves the subscription information from the incoming request .
2. Looks up the subscription in the database .
3. Calls the _send_push_notification method to send a test push notification .
- The subscription will include the " title " and " message " if supplied in the body object .
4. Returns the result of the _send_push_notification call .
URL : / web - push / send - test
Method : POST
Returns :
- A JSON response with the result of the _send_push_notification call .
Notes :
- The incoming request should contain the parameters " endpoint " , " p256dh " , and " auth " .
"""
# Retrieving the subscription information from the incoming request
content = request . json
endpoint = content [ ' endpoint ' ]
p256dh = content [ ' keys ' ] [ ' p256dh ' ]
auth = content [ ' keys ' ] [ ' auth ' ]
# Looking up the subscription in the database
subscription = Subscription . query . filter_by ( endpoint = endpoint , p256dh = p256dh , auth = auth ) . first ( )
# If the subscription is found, call the _send_push_notification method
if subscription :
subscription_info = {
" endpoint " : subscription . endpoint ,
" keys " : {
" p256dh " : subscription . p256dh ,
" auth " : subscription . auth
}
}
title = " Test Notification "
if " title " in content :
title = content [ ' title ' ]
message = " This is a test notification. "
if " message " in content :
message = content [ ' message ' ]
vapid_key = VAPIDKey . query . filter_by ( id = subscription . vapid_key_id ) . first ( )
result = WebPushService . _send_push_notification (
subscription_info ,
{ " title " : title , " message " : message } ,
vapid_key
)
print ( f " Test sent: { result [ ' success ' ] } " )
return jsonify ( success = result [ " success " ] , message = result [ " message " ] ) , 200
else :
print ( f " Test failed due to missing subscription. Request: { json . dumps ( content ) } " )
return jsonify ( { " success " : False , " message " : " Subscription not found " } ) , 404
web_push_service = WebPushService ( app , " app " )