Browse Source

Refactored to class with long-running thread

pull/1/head
Matthew Raymer 1 year ago
parent
commit
2b048c5a77
  1. 127
      app.py

127
app.py

@ -1,26 +1,30 @@
from cryptography.hazmat.primitives import serialization
from flask import Flask, request, jsonify
from models import db, VAPIDKey, Subscription
from pywebpush import webpush, WebPushException
from typing import Dict, Tuple, Union, Optional
from flask import Flask, request, jsonify, Response
from models import db, VAPIDKey, Subscription
from cryptography.hazmat.backends import default_backend from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import ec from cryptography.hazmat.primitives.asymmetric import ec
from pywebpush import webpush, WebPushException
import base64 import base64
import binascii import threading
import json
import os
import re
import time import time
def create_app(config_name): class WebPushService:
app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///data/webpush.db' def __init__(self, config_name: str) -> None:
db.init_app(app) self.app = Flask(__name__)
self.app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///data/webpush.db'
db.init_app(self.app)
with self.app.app_context():
self._initialize()
self.daily_notification_thread = threading.Thread(target=self._send_daily_notifications)
self.daily_notification_thread.start()
def generate_and_save_vapid_keys():
def _generate_and_save_vapid_keys(self) -> None:
try: try:
private_key = ec.generate_private_key(ec.SECP256R1(), default_backend()) private_key = ec.generate_private_key(ec.SECP256R1(), default_backend())
public_key_bytes = private_key.public_key().public_bytes( public_key_bytes = private_key.public_key().public_bytes(
@ -34,7 +38,6 @@ def create_app(config_name):
format=serialization.PrivateFormat.PKCS8, format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption() encryption_algorithm=serialization.NoEncryption()
) )
private_key_base64 = base64.b64encode(private_key_bytes).decode() private_key_base64 = base64.b64encode(private_key_bytes).decode()
key = VAPIDKey(public_key=public_key_base64, private_key=private_key_base64) key = VAPIDKey(public_key=public_key_base64, private_key=private_key_base64)
@ -45,74 +48,68 @@ def create_app(config_name):
print(f"Error generating VAPID keys: {e}") print(f"Error generating VAPID keys: {e}")
def _initialize(self) -> None:
def initialize():
if not VAPIDKey.query.first(): if not VAPIDKey.query.first():
generate_and_save_vapid_keys() self._generate_and_save_vapid_keys()
def send_push_notification(subscription_info, message, vapid_key): def _send_push_notification(self, subscription_info: Dict, message: Dict, vapid_key: VAPIDKey) -> bool:
result = True
try: try:
private_key_base64 = vapid_key.private_key
webpush( webpush(
subscription_info=subscription_info, subscription_info=subscription_info,
data=json.dumps(message), data=json.dumps(message),
vapid_private_key=private_key_base64, vapid_private_key=vapid_key.private_key,
vapid_claims={ vapid_claims={"sub": "mailto:admin@yourdomain.com"}
"sub": "mailto:info@timesafari.org"
}
) )
return True
except Exception as e: except WebPushException as ex:
result = False print(f"Failed to send push notification: {ex}")
print(f"Failed to send notification: {e}")
return result
def is_valid_base64_url(s):
return re.match(r'^[A-Za-z0-9_-]*$', s) is not None
def is_valid_server_key(server_key):
if not is_valid_base64_url(server_key):
return False return False
return len(server_key) == 88
@app.route('/web-push/clear_subscriptions', methods=['POST']) def _send_daily_notifications(self) -> None:
def clear_subscriptions(): while True:
with self.app.app_context():
all_subscriptions = Subscription.query.all()
vapid_key = VAPIDKey.query.first()
message = {"title": "Daily Update", "message": "Here's your daily update!"}
for subscription in all_subscriptions:
subscription_info = {
"endpoint": subscription.endpoint,
"keys": {
"p256dh": subscription.p256dh,
"auth": subscription.auth
}
}
self._send_push_notification(subscription_info, message, vapid_key)
time.sleep(24 * 60 * 60)
# Route handlers and other methods would go here...
@app.route('/regenerate_vapid', methods=['POST'])
def regenerate_vapid(self) -> Tuple[str, int]:
try: try:
Subscription.query.delete() with self.app.app_context():
return jsonify(message='Subscriptions cleared') VAPIDKey.query.delete()
db.session.commit()
except Exception as e: self._generate_and_save_vapid_keys()
return jsonify(error=f'Error clearing subscriptions: {str(e)}'), 500
@app.route('/web-push/regenerate_vapid_keys', methods=['POST']) return jsonify(success=True, message="VAPID keys regenerated successfully"), 200
def regenerate_vapid_keys():
try:
# Generate new VAPID keys
VAPIDKey.query.delete()
generate_and_save_vapid_keys()
return jsonify(message='VAPID keys regenerated successfully')
except Exception as e: except Exception as e:
return jsonify(error=f'Error regenerating VAPID keys: {str(e)}'), 500 return jsonify(error=f'Error regenerating VAPID keys: {str(e)}'), 500
@app.route('/web-push/vapid', methods=['GET']) @app.route('/get_vapid')
def get_vapid(): def get_vapid(self) -> Response:
key = VAPIDKey.query.first() key = VAPIDKey.query.first()
return jsonify(vapidKey=key.public_key) return jsonify(vapidKey=key.public_key)
@app.route('/web-push/subscribe', methods=['POST']) @app.route('/subscribe', methods=['POST'])
def subscribe(): def subscribe(self) -> Tuple[str, int]:
content = request.json content = request.json
vapid_key = VAPIDKey.query.first() vapid_key = VAPIDKey.query.first()
@ -137,13 +134,13 @@ def create_app(config_name):
} }
message = {"title": "Subscription Successful", "message": "Thank you for subscribing!"} message = {"title": "Subscription Successful", "message": "Thank you for subscribing!"}
success = send_push_notification(subscription_info, message, vapid_key) success = self._send_push_notification(subscription_info, message, vapid_key)
return jsonify(success=success, message=vapid_key.private_key) return jsonify(success=success, message=vapid_key.private_key)
@app.route('/web-push/unsubscribe', methods=['DELETE']) @app.route('/unsubscribe', methods=['POST'])
def unsubscribe(): def unsubscribe(self) -> Tuple[str, int]:
content = request.json content = request.json
endpoint = content['endpoint'] endpoint = content['endpoint']
subscription = Subscription.query.filter_by(endpoint=endpoint).first() subscription = Subscription.query.filter_by(endpoint=endpoint).first()
@ -152,11 +149,5 @@ def create_app(config_name):
db.session.delete(subscription) db.session.delete(subscription)
db.session.commit() db.session.commit()
return jsonify(success=True, message="Subscription deleted successfully") return jsonify(success=True, message="Subscription deleted successfully")
else: else:
return jsonify(success=False, error="Subscription not found"), 404 return jsonify(success=False, error="Subscription not found"), 404
with app.app_context():
initialize()
return app

Loading…
Cancel
Save