You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
641 lines
29 KiB
641 lines
29 KiB
"""
|
|
Environment variables:
|
|
- ADMIN_PASSWORD: password for admin user for sensitive endpoints, defaults to 'admin'
|
|
- PUSH_SERVER_VERSION: optional version of server
|
|
- SECONDS_BETWEEN_NOTIFICATIONS: optional number of seconds between notifications, defaults to 5 minutes
|
|
- SQLALCHEMY_DATABASE_URI: absolute path to sqlite file, starting with "sqlite:////"
|
|
"""
|
|
|
|
from cryptography.hazmat.backends import default_backend
|
|
from cryptography.hazmat.primitives import serialization
|
|
from cryptography.hazmat.primitives.asymmetric import ec
|
|
from flask import Flask, request, jsonify, Response
|
|
from models import db, VAPIDKey, Settings, Subscription
|
|
from pywebpush import webpush, WebPushException
|
|
from sqlalchemy import and_
|
|
from typing import Dict, Tuple
|
|
|
|
import base64
|
|
import datetime
|
|
import json
|
|
import os
|
|
import threading
|
|
import time
|
|
|
|
CONTACT_EMAIL = "mailto:info@timesafari.app"
|
|
|
|
# On Time Safari, this title bypasses the filters and shows the message directly.
|
|
TITLE_DIRECT_NOTIFICATION = 'DIRECT_NOTIFICATION'
|
|
# On Time Safari, this title triggers the API check for the user's latest data.
|
|
TITLE_DAILY_INDIVIDUAL_CHECK = 'DAILY_CHECK'
|
|
|
|
PUSH_SERVER_VERSION = os.getenv('PUSH_SERVER_VERSION')
|
|
SECONDS_BETWEEN_NOTIFICATIONS_STR = os.getenv('SECONDS_BETWEEN_NOTIFICATIONS', '60')
|
|
SECONDS_BETWEEN_NOTIFICATIONS = int(SECONDS_BETWEEN_NOTIFICATIONS_STR)
|
|
|
|
app = Flask(__name__)
|
|
|
|
class WebPushService():
|
|
|
|
latest_subscription_run = None
|
|
|
|
"""
|
|
This class provides services for sending web push notifications.
|
|
"""
|
|
|
|
def __init__(self, app, config_name: str) -> None:
|
|
"""
|
|
Initializes the WebPushService with the given application and configuration name.
|
|
|
|
Args:
|
|
- app: The application instance where the service will be attached.
|
|
- config_name (str): The name of the configuration to be used.
|
|
|
|
Attributes:
|
|
- app: The application instance.
|
|
- daily_notification_thread (threading.Thread): A thread to send daily notifications.
|
|
"""
|
|
|
|
# Setting the application instance
|
|
self.app = app
|
|
self.app.add_url_rule('/web-push/regenerate-vapid', view_func=self.regenerate_vapid, methods=['POST'])
|
|
self.app.add_url_rule('/web-push/ping', view_func=self.ping, methods=['GET'])
|
|
|
|
# Setting the database URI for the application, with an absolute path
|
|
db_uri = os.getenv('SQLALCHEMY_DATABASE_URI', 'sqlite:////app/instance/data/webpush.db')
|
|
# This relative path works in docker-compose
|
|
# This relative path works on a local run if you link to the dir with app.py from "var/app-instance"
|
|
db_uri = os.getenv('SQLALCHEMY_DATABASE_URI', 'sqlite:///data/webpush.db')
|
|
self.app.config['SQLALCHEMY_DATABASE_URI'] = db_uri
|
|
|
|
# Initializing the database with the application
|
|
db.init_app(self.app)
|
|
|
|
# Creating a context for the application and initializing services
|
|
with self.app.app_context():
|
|
self._initialize()
|
|
|
|
# Creating and starting a thread to send daily notifications
|
|
self.daily_notification_thread = threading.Thread(target=self._send_daily_notifications)
|
|
self.daily_notification_thread.start()
|
|
|
|
|
|
def _generate_and_save_vapid_keys(self) -> None:
|
|
"""
|
|
Generates VAPID (Voluntary Application Server Identification) keys and saves them to the database.
|
|
|
|
The method generates a pair of public and private keys using the elliptic curve SECP256R1.
|
|
Both the public and private keys are then serialized and encoded in base64 format.
|
|
The keys are then stored in the database using a VAPIDKey model.
|
|
|
|
Notes:
|
|
- In case of any exception during the key generation or database operations, the error is printed to the console.
|
|
|
|
Raises:
|
|
- Exceptions raised by the key generation or database operations are caught and printed.
|
|
"""
|
|
try:
|
|
# Generating a private key using the elliptic curve SECP256R1
|
|
private_key = ec.generate_private_key(ec.SECP256R1(), default_backend())
|
|
|
|
# Serializing and encoding the public key to base64 format
|
|
public_key_bytes = private_key.public_key().public_bytes(
|
|
encoding=serialization.Encoding.X962,
|
|
format=serialization.PublicFormat.UncompressedPoint
|
|
)
|
|
public_key_base64 = base64.b64encode(public_key_bytes).decode()
|
|
|
|
# Serializing and encoding the private key to base64 format
|
|
private_key_bytes = private_key.private_bytes(
|
|
encoding=serialization.Encoding.DER,
|
|
format=serialization.PrivateFormat.PKCS8,
|
|
encryption_algorithm=serialization.NoEncryption()
|
|
)
|
|
private_key_base64 = base64.b64encode(private_key_bytes).decode()
|
|
|
|
# Saving the keys to the database
|
|
key = VAPIDKey(public_key=public_key_base64, private_key=private_key_base64)
|
|
db.session.add(key)
|
|
db.session.commit()
|
|
|
|
except Exception as e:
|
|
print(f"Error generating VAPID keys: {str(e)}")
|
|
raise e
|
|
|
|
|
|
def _initialize(self) -> None:
|
|
"""
|
|
Initializes the WebPushService by checking for the presence of VAPID keys in the database.
|
|
|
|
If no VAPID keys are found in the database, this method triggers the generation and storage
|
|
of new VAPID keys by invoking the `_generate_and_save_vapid_keys` method.
|
|
|
|
Notes:
|
|
- VAPID (Voluntary Application Server Identification) keys are essential for sending push notifications
|
|
to web clients, hence the check and generation if they don't exist.
|
|
"""
|
|
|
|
# Checking if there are any VAPID keys in the database
|
|
if not VAPIDKey.query.first():
|
|
|
|
# Generating and saving VAPID keys if none are found
|
|
self._generate_and_save_vapid_keys()
|
|
|
|
|
|
@staticmethod
|
|
def _send_push_notification(subscription_info: Dict, message: Dict, vapid_private_key: str) -> Dict[str, any]:
|
|
"""
|
|
Sends a push notification using the provided subscription information, message, and VAPID key.
|
|
|
|
Args:
|
|
- subscription_info (Dict): The information required to send the push notification to a specific client.
|
|
- message (Dict): The actual message content to be sent as the push notification.
|
|
- vapid_private_key (str): The private key used for sending the notification.
|
|
|
|
Returns Dict with the following keys
|
|
- success: True if the push notification was sent successfully, False otherwise
|
|
- message: a string message with the resulting text, usually nothing on success
|
|
- result: request.Response https://requests.readthedocs.io/en/latest/api.html#requests.Response if the push notification was sent successfully
|
|
or False if there was an exception
|
|
|
|
Notes:
|
|
- The `webpush` function is used to send the notification.
|
|
- In case of any exception, especially a WebPushException, the error is printed to the console.
|
|
"""
|
|
|
|
# Sending the push notification using the webpush function
|
|
try:
|
|
result = webpush(
|
|
subscription_info=subscription_info,
|
|
data=json.dumps(message),
|
|
vapid_private_key=vapid_private_key,
|
|
vapid_claims={"sub": CONTACT_EMAIL}
|
|
)
|
|
# "because sometimes that's what I had to do to make it work!" - Matthew
|
|
time.sleep(1)
|
|
return {"success": result.status_code == 201, "message": result.text, "result": result}
|
|
|
|
except WebPushException as ex:
|
|
now = datetime.datetime.now().isoformat()
|
|
endpoint = subscription_info['endpoint']
|
|
print(f"{now}: Failed to send push notification for {json.dumps(endpoint)} -- {ex}", flush=True)
|
|
|
|
unsubscribed_msg = '410 Gone'
|
|
unsubscribed = False
|
|
if unsubscribed_msg in ex.args[0]:
|
|
subscription = Subscription.query.filter_by(endpoint=endpoint).first()
|
|
|
|
# Delete the subscription if found
|
|
if subscription:
|
|
db.session.delete(subscription)
|
|
db.session.commit()
|
|
print(f"Committed delete of {subscription_info}", flush=True)
|
|
unsubscribed = True
|
|
else:
|
|
print(f"Could not find subscription at: {endpoint}", flush=True)
|
|
else:
|
|
print("Error other than unsubscribed/expired.", ex.args[0], flush=True)
|
|
|
|
return {"success": False, "message": str(ex), "error": ex, "unsubscribed": unsubscribed}
|
|
|
|
|
|
def _send_daily_notifications(self) -> None:
|
|
"""
|
|
Continuously sends daily push notifications to all subscribed clients.
|
|
|
|
This method:
|
|
1. Retrieves all subscription data from the database.
|
|
2. Constructs the push notification message.
|
|
3. Sends a push notification to each subscribed client.
|
|
4. Sleeps for 5 minutes and repeats the process.
|
|
|
|
Notes:
|
|
- The method runs in an infinite loop, meaning it will keep sending notifications until the program is terminated.
|
|
- The notifications are sent using the `_send_push_notification` method.
|
|
- A context for the application is created to enable database operations.
|
|
- The message content for the daily update is hardcoded in this method.
|
|
"""
|
|
|
|
while True:
|
|
|
|
now = datetime.datetime.now()
|
|
# print(f"{now} - Starting to send subscriptions...", flush=True)
|
|
|
|
# Creating a context for the application to enable database operations
|
|
with self.app.app_context():
|
|
|
|
# Retrieve the VAPID key from the database
|
|
vapid_key = VAPIDKey.query.first()
|
|
|
|
# Determine the beginning and end time to check for subscriptions
|
|
settings = Settings.query.first()
|
|
if settings.running_notify_end_time is None:
|
|
# only do this if we're not already inside one of these loops
|
|
|
|
"""
|
|
Storing the HH:MM for the desired notification time isn't a bad idea.
|
|
|
|
However, the logic that compares directly to it is a bit complicated.
|
|
It would be more straightforward to save the next notification time in each record
|
|
and then update that every time this process runs. It would require raw SQL with
|
|
some calculations and many DB updates each day, but the logic would be more clear.
|
|
"""
|
|
|
|
# get the previous notify end time from the DB as a datetime
|
|
prev_notify_end_time = datetime.datetime.fromisoformat(settings.prev_notify_end_time)
|
|
# if it's before midnight this morning
|
|
if prev_notify_end_time < now.replace(hour=0, minute=0, second=0, microsecond=0):
|
|
# catch us up to the beginning of today
|
|
# make the start time the later of: prevNotifyEndTime or yesterday at this time
|
|
start_time = max(
|
|
prev_notify_end_time,
|
|
# if the current time is later in the day, use that
|
|
# because the next run will pick up everything from midnight until now today
|
|
now.replace(second=0, microsecond=0) - datetime.timedelta(days=1)
|
|
)
|
|
# make the end time right at midnight at the beginning of today
|
|
end_time = now.replace(hour=0, minute=0, second=0, microsecond=0)
|
|
|
|
start_minute = start_time.strftime('%H:%M')
|
|
# (we'd never catch anything if we used a non-zero start_minute and "00:00" as the end_minute)
|
|
end_minute = '23:60' # gotta catch "23:59", too
|
|
|
|
else:
|
|
# the start time is OK
|
|
start_time = prev_notify_end_time
|
|
# the end time is now
|
|
end_time = now.replace(second=0, microsecond=0)
|
|
|
|
start_minute = start_time.strftime('%H:%M')
|
|
end_minute = end_time.strftime('%H:%M')
|
|
|
|
# This really should update & continue only if the running_notify_end_time is still None,
|
|
# just in case another thread started.
|
|
settings.running_notify_end_time = end_time.isoformat()
|
|
|
|
# set all subscriptions without a time to the current time
|
|
# (these won't be picked-up in the current run, since the current minute is the end minute)
|
|
Subscription.query.filter_by(notify_time=None).update({Subscription.notify_time: now.strftime('%H:%M')})
|
|
|
|
db.session.commit()
|
|
|
|
# this check was generated by Copilot; it's probably unnecessary
|
|
if settings.running_notify_end_time == end_time.isoformat():
|
|
# Now get the 'HH:MM' value for start & end so we can compare to the notify_time field
|
|
|
|
# get all the subscriptions that have a notify_time between start_minute inclusive and end_minute exclusive
|
|
all_subscriptions = Subscription.query.filter(
|
|
and_(Subscription.notify_time >= start_minute, Subscription.notify_time < end_minute)
|
|
)
|
|
|
|
# Send a push notification to each subscribed client
|
|
num_subscriptions = all_subscriptions.count()
|
|
for subscription in all_subscriptions:
|
|
subscription_info = {
|
|
"endpoint": subscription.endpoint,
|
|
"keys": {
|
|
"p256dh": subscription.p256dh,
|
|
"auth": subscription.auth
|
|
}
|
|
}
|
|
# Construct the push notification message
|
|
# The title value is a key, defaulting to trigger the device to apply logic and customize both title and message.
|
|
# See https://gitea.anomalistdesign.com/trent_larson/crowd-funder-for-time-pwa/src/commit/1e6159869fc28ca6e6b5b3d186617d75705100b4/sw_scripts/additional-scripts.js#L65
|
|
payload = {"title": subscription.notify_type}
|
|
if subscription.message:
|
|
payload['message'] = subscription.message
|
|
elif subscription.notify_type == TITLE_DIRECT_NOTIFICATION:
|
|
# They should get some message.
|
|
payload['message'] = "Just a friendly reminder: click and share some gratitude with the world."
|
|
result = WebPushService._send_push_notification(subscription_info, payload, vapid_key.private_key)
|
|
print(
|
|
f"Result from sub {subscription.id}: success={result['success']} message={result['message']}",
|
|
flush=True
|
|
)
|
|
|
|
settings.prev_notify_end_time = end_time.isoformat()
|
|
settings.running_notify_end_time = None
|
|
db.session.commit()
|
|
print(f"{now} - Finished sending {num_subscriptions} subscriptions.", flush=True)
|
|
else:
|
|
print(f"{now} - Failed to update running_notify_end_time", flush=True)
|
|
else:
|
|
print(f"{now} - Subscription check stopped because we're already running a notification check.", flush=True)
|
|
|
|
self.latest_subscription_run = now.isoformat()
|
|
|
|
# Sleep before repeating
|
|
time.sleep(SECONDS_BETWEEN_NOTIFICATIONS)
|
|
|
|
# This is an endpoint, routed in __init__
|
|
def ping(self) -> str:
|
|
"""
|
|
Endpoint to show liveness info
|
|
|
|
Returns:
|
|
- Response: Text with some subscription-run info
|
|
"""
|
|
|
|
return f"pong ... version {PUSH_SERVER_VERSION} ... with latest subscription run at {self.latest_subscription_run}"
|
|
|
|
# This is an endpoint, routed in __init__
|
|
def regenerate_vapid(self) -> Tuple[Response, int, dict[str, str]] | Tuple[Response, int]:
|
|
"""
|
|
Endpoint to regenerate VAPID keys.
|
|
|
|
This method:
|
|
1. Deletes the current VAPID keys from the database.
|
|
2. Generates and stores new VAPID keys using the `_generate_and_save_vapid_keys` method.
|
|
|
|
URL: /web-push/regenerate-vapid
|
|
Method: POST
|
|
Header: Authentication: Basic ...
|
|
|
|
Returns:
|
|
- Tuple with "success" as True or False, and "message" message string
|
|
|
|
Notes:
|
|
- If the operation is successful, a JSON response with a success message is returned with a 200 status code.
|
|
- If there's an error during the operation, a JSON response with the error message is returned with a 500 status code.
|
|
"""
|
|
|
|
# This default can be invoked thus: curl -X POST -H "Authorization: Basic YWRtaW46YWRtaW4=" localhost:3000/web-push/regenerate-vapid
|
|
envPassword = os.getenv('ADMIN_PASSWORD', 'admin')
|
|
auth = request.authorization
|
|
if (auth is None
|
|
or auth.username is None
|
|
or auth.username != 'admin'
|
|
or auth.password is None
|
|
or auth.password != envPassword):
|
|
return (
|
|
jsonify(error='Wrong password'),
|
|
401,
|
|
{'WWW-Authenticate': 'Basic realm="Login Required"'}
|
|
)
|
|
|
|
# Creating a context for the application to enable database operations
|
|
try:
|
|
with self.app.app_context():
|
|
# Deleting the current VAPID keys from the database
|
|
VAPIDKey.query.delete()
|
|
db.session.commit()
|
|
|
|
# Generating and saving new VAPID keys
|
|
self._generate_and_save_vapid_keys()
|
|
|
|
return jsonify(success=True, message="VAPID keys regenerated successfully"), 200
|
|
|
|
except Exception as e:
|
|
return jsonify(success=False, message=f'Error regenerating VAPID keys: {str(e)}'), 500
|
|
|
|
|
|
@staticmethod
|
|
@app.route('/web-push/vapid')
|
|
def vapid() -> Response:
|
|
"""
|
|
Endpoint to retrieve the current VAPID public key.
|
|
|
|
This method fetches the VAPID public key from the database and returns it in a JSON response.
|
|
|
|
URL: /web-push/vapid
|
|
Method: GET
|
|
|
|
Returns:
|
|
- Response: A JSON response containing the VAPID public key.
|
|
|
|
Notes:
|
|
- The response contains a key "vapidKey" with the associated public key as its value.
|
|
"""
|
|
|
|
# Retrieving the VAPID key from the database
|
|
key = VAPIDKey.query.first()
|
|
|
|
# Returning the public key in a JSON response
|
|
return jsonify(vapidKey=key.public_key)
|
|
|
|
|
|
@staticmethod
|
|
@app.route('/web-push/subscribe', methods=['POST'])
|
|
def subscribe() -> Tuple[Response, int]:
|
|
"""
|
|
Endpoint to handle new web push subscription requests.
|
|
|
|
This method:
|
|
1. Retrieves the VAPID key from the database.
|
|
2. Reads the subscription content from the incoming request.
|
|
3. Saves the subscription data to the database.
|
|
4. Sends a confirmation push notification to the new subscriber.
|
|
|
|
URL: /web-push/subscribe
|
|
Method: POST
|
|
|
|
Body:
|
|
- JSON object with the following keys:
|
|
- endpoint: the endpoint URL for the push notification
|
|
- keys: a JSON object with the following keys:
|
|
- p256dh: the P-256 elliptic curve Diffie-Hellman key pair
|
|
- auth: the authentication secret for the push subscription
|
|
- message: an optional string message to send to the subscriber, max 100 characters
|
|
The message is only used for certain notifyType values like TITLE_DIRECT_NOTIFICATION
|
|
- notifyTime: a JSON object with the following keys:
|
|
- utcHour: the hour in UTC
|
|
- minute: the minute in UTC
|
|
- notifyType: optional type of notification to send
|
|
If not type is sent, the internal type is set to TITLE_DAILY_INDIVIDUAL_CHECK
|
|
|
|
Returns:
|
|
- A JSON response with "success" as True or False, "message" as a response message string, and potentially a "result" as a request.Response object from sending a test notification
|
|
|
|
Notes:
|
|
- If the operation is successful, a confirmation push notification is sent to the subscriber with a success message.
|
|
- If there are no VAPID keys available, an error message is returned with a 500 status code.
|
|
- There's a hardcoded 5-second sleep after saving the subscription, which might be intended for some delay before sending the confirmation. Ensure this is the desired behavior.
|
|
"""
|
|
|
|
# Retrieving the content from the incoming request
|
|
content = request.json
|
|
if (content is None) or ('endpoint' not in content) or ('keys' not in content):
|
|
return jsonify(success=False, message="Missing subscription information"), 400
|
|
if ('p256dh' not in content['keys']) or ('auth' not in content['keys']):
|
|
return jsonify(success=False, message="Missing subscription keys information"), 400
|
|
if ('notifyTime' not in content) or ('utcHour' not in content['notifyTime']):
|
|
return jsonify(success=False, message="Missing notifyTime information"), 400
|
|
if ('notifyType' in content) and (content['notifyType'] not in [TITLE_DIRECT_NOTIFICATION, TITLE_DAILY_INDIVIDUAL_CHECK]):
|
|
return jsonify(success=False, message="Invalid notifyType"), 400
|
|
|
|
# Retrieving the VAPID key from the database
|
|
vapid_key = VAPIDKey.query.first()
|
|
|
|
# Checking if the VAPID key is available
|
|
if not vapid_key:
|
|
return jsonify(success=False, message="No VAPID keys available"), 500
|
|
|
|
# Constructing the notify_time string
|
|
notify_hour = content['notifyTime']['utcHour']
|
|
if 'minute' in content['notifyTime']:
|
|
notify_minute = content['notifyTime']['minute']
|
|
else:
|
|
notify_minute = 0
|
|
notify_time = '{:02d}'.format(notify_hour) + ":" + '{:02d}'.format(notify_minute)
|
|
|
|
notify_type = TITLE_DAILY_INDIVIDUAL_CHECK
|
|
if 'notifyType' in content:
|
|
notify_type = content['notifyType']
|
|
|
|
# check that the message is 100 characters or less
|
|
message = None
|
|
if 'message' in content:
|
|
if len(content['message']) > 100:
|
|
return jsonify(success=False, message="Message is too long. Max 100 characters."), 400
|
|
else:
|
|
message = content['message']
|
|
|
|
# Creating a new Subscription instance with the provided data
|
|
subscription = Subscription(auth=content['keys']['auth'],
|
|
created_date=datetime.datetime.now().isoformat(),
|
|
endpoint=content['endpoint'],
|
|
message=message,
|
|
notify_time=notify_time,
|
|
notify_type=notify_type,
|
|
p256dh=content['keys']['p256dh'],
|
|
vapid_key_id=vapid_key.id)
|
|
|
|
# Saving the subscription data to the database
|
|
db.session.add(subscription)
|
|
db.session.commit()
|
|
|
|
# Constructing the subscription information for the push notification
|
|
subscription_info = {
|
|
"endpoint": subscription.endpoint,
|
|
"keys": {
|
|
"p256dh": subscription.p256dh,
|
|
"auth": subscription.auth
|
|
}
|
|
}
|
|
|
|
# Creating a confirmation message for the push notification
|
|
message = {"title": "Subscription Successful", "message": "Thank you for subscribing!"}
|
|
|
|
# Sending the confirmation push notification
|
|
result = WebPushService._send_push_notification(subscription_info, message, vapid_key.private_key)
|
|
|
|
# Returning the operation status
|
|
return jsonify(success=result["success"], message=result["message"]), 200
|
|
|
|
|
|
@staticmethod
|
|
@app.route('/web-push/unsubscribe', methods=['POST'])
|
|
def unsubscribe() -> Tuple[Response, int]:
|
|
"""
|
|
Endpoint to handle web push unsubscription requests.
|
|
|
|
This method:
|
|
1. Reads the endpoint from the incoming request.
|
|
2. Searches for the subscription in the database using the endpoint.
|
|
3. If found, deletes the subscription from the database.
|
|
|
|
URL: /web-push/unsubscribe
|
|
Method: POST
|
|
|
|
Body:
|
|
- JSON object with the following keys:
|
|
- endpoint: the endpoint URL for the push notification
|
|
- keys: a JSON object with the following keys:
|
|
- p256dh: the P-256 elliptic curve Diffie-Hellman key pair
|
|
- auth: the authentication secret for the push subscription
|
|
- notifyType: "DAILY_CHECK" or "DIRECT_NOTIFICATION" -- if empty, all notifications deleted
|
|
|
|
Returns:
|
|
- Tuple[str, int]: A JSON response indicating the success or failure of the operation, along with the appropriate HTTP status code.
|
|
|
|
Notes:
|
|
- If the unsubscription is successful, a JSON response with a success message is returned.
|
|
- If the subscription is not found in the database, an error message is returned with a 404 status code.
|
|
"""
|
|
|
|
# Retrieving the endpoint from the incoming request
|
|
content = request.json
|
|
endpoint = content.get('endpoint')
|
|
if (endpoint is None):
|
|
return jsonify(success=False, message="Missing endpoint information"), 400
|
|
|
|
# Searching for the subscription in the database using the endpoint
|
|
if 'notifyType' in content:
|
|
notify_type = content['notifyType']
|
|
print(f"Deleting subscription for {endpoint} with notifyType {notify_type}", flush=True)
|
|
db.session.query(Subscription).filter(and_(Subscription.endpoint == endpoint, Subscription.notify_type == notify_type)).delete(synchronize_session=False)
|
|
db.session.commit()
|
|
else:
|
|
print(f"Deleting all subscriptions for {endpoint}", flush=True)
|
|
db.session.query(Subscription).filter(Subscription.endpoint == endpoint).delete(synchronize_session=False)
|
|
db.session.commit()
|
|
|
|
return jsonify(success=True, message="Subscription deleted successfully"), 200
|
|
|
|
|
|
@staticmethod
|
|
@app.route('/web-push/send-test', methods=['POST'])
|
|
def send_test() -> Tuple[Response, int]:
|
|
"""
|
|
Endpoint to send a test push notification to a specific client.
|
|
|
|
This method:
|
|
1. Retrieves the subscription information from the incoming request.
|
|
2. Looks up the subscription in the database.
|
|
3. Calls the _send_push_notification method to send a test push notification.
|
|
- The subscription will include the "title" and "message" if supplied in the body object.
|
|
4. Returns the result of the _send_push_notification call.
|
|
|
|
URL: /web-push/send-test
|
|
Method: POST
|
|
|
|
Returns:
|
|
- A JSON response with the result of the _send_push_notification call.
|
|
|
|
Notes:
|
|
- The incoming request should contain the parameters "endpoint", "p256dh", and "auth".
|
|
"""
|
|
|
|
# Retrieving the subscription information from the incoming request
|
|
content = request.json
|
|
endpoint = content.get('endpoint')
|
|
p256dh = content.get('keys', {}).get('p256dh')
|
|
auth = content.get('keys', {}).get('auth')
|
|
if (endpoint is None) or (p256dh is None) or (auth is None):
|
|
return jsonify({"success": False, "message": "Missing subscription information"}), 400
|
|
|
|
# Looking up the subscription in the database
|
|
subscription = Subscription.query.filter_by(endpoint=endpoint, p256dh=p256dh, auth=auth).first()
|
|
|
|
# If the subscription is found, call the _send_push_notification method
|
|
if subscription:
|
|
subscription_info = {
|
|
"endpoint": endpoint,
|
|
"keys": {
|
|
"p256dh": p256dh,
|
|
"auth": auth
|
|
}
|
|
}
|
|
|
|
title = "Test Notification"
|
|
if "title" in content:
|
|
title = content['title']
|
|
message = "This is a test notification."
|
|
if "message" in content:
|
|
message = content['message']
|
|
|
|
vapid_key = VAPIDKey.query.filter_by(id=subscription.vapid_key_id).first()
|
|
result = WebPushService._send_push_notification(
|
|
subscription_info,
|
|
{"title": title, "message": message},
|
|
vapid_key.private_key
|
|
)
|
|
|
|
print(f"Test sent: success={result['success']} message={result['message']}", flush=True)
|
|
return jsonify(success=result["success"], message=result["message"]), 200
|
|
else:
|
|
print(f"Test failed due to missing subscription. Request: {json.dumps(content)}", flush=True)
|
|
return jsonify({"success": False, "message": "Subscription not found"}), 404
|
|
|
|
|
|
web_push_service = WebPushService(app, "app")
|
|
|