|
@ -1,9 +1,13 @@ |
|
|
from cryptography.hazmat.primitives import serialization |
|
|
from cryptography.hazmat.primitives import serialization |
|
|
from flask import Flask, request, jsonify |
|
|
from flask import Flask, request, jsonify |
|
|
from models import db, VAPIDKey, Subscription |
|
|
from models import db, VAPIDKey, Subscription |
|
|
from py_vapid import Vapid |
|
|
|
|
|
from pywebpush import webpush, WebPushException |
|
|
from pywebpush import webpush, WebPushException |
|
|
|
|
|
|
|
|
|
|
|
from cryptography.hazmat.backends import default_backend |
|
|
|
|
|
from cryptography.hazmat.primitives import serialization |
|
|
|
|
|
from cryptography.hazmat.primitives.asymmetric import ec |
|
|
|
|
|
|
|
|
|
|
|
import binascii |
|
|
import base64 |
|
|
import base64 |
|
|
import json |
|
|
import json |
|
|
import os |
|
|
import os |
|
@ -16,16 +20,24 @@ def create_app(config_name): |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def generate_and_save_vapid_keys(): |
|
|
def generate_and_save_vapid_keys(): |
|
|
vapid = Vapid() |
|
|
|
|
|
try: |
|
|
try: |
|
|
vapid.generate_keys() |
|
|
private_key = ec.generate_private_key(ec.SECP256R1(), default_backend()) |
|
|
private_key = vapid.private_pem().decode('utf-8').strip() |
|
|
public_key_bytes = private_key.public_key().public_bytes( |
|
|
public_key = vapid.public_pem().decode('utf-8').strip() |
|
|
encoding=serialization.Encoding.X962, |
|
|
|
|
|
format=serialization.PublicFormat.UncompressedPoint |
|
|
|
|
|
) |
|
|
|
|
|
public_key_base64 = base64.b64encode(public_key_bytes).decode() |
|
|
|
|
|
|
|
|
|
|
|
private_key_hex = binascii.hexlify(private_key.private_bytes( |
|
|
|
|
|
encoding=serialization.Encoding.DER, |
|
|
|
|
|
format=serialization.PrivateFormat.PKCS8, |
|
|
|
|
|
encryption_algorithm=serialization.NoEncryption() |
|
|
|
|
|
)).decode() |
|
|
|
|
|
|
|
|
except Exception as e: |
|
|
except Exception as e: |
|
|
print(f"Error generating VAPID keys: {e}") |
|
|
print(f"Error generating VAPID keys: {e}") |
|
|
|
|
|
|
|
|
key = VAPIDKey(public_key=public_key, private_key=private_key) |
|
|
key = VAPIDKey(public_key=public_key_base64, private_key=private_key_hex) |
|
|
db.session.add(key) |
|
|
db.session.add(key) |
|
|
db.session.commit() |
|
|
db.session.commit() |
|
|
|
|
|
|
|
@ -37,48 +49,58 @@ def create_app(config_name): |
|
|
|
|
|
|
|
|
def send_push_notification(subscription_info, message, vapid_key): |
|
|
def send_push_notification(subscription_info, message, vapid_key): |
|
|
try: |
|
|
try: |
|
|
|
|
|
private_key_bytes = bytes.fromhex(vapid_key.private_key) |
|
|
|
|
|
|
|
|
webpush( |
|
|
webpush( |
|
|
subscription_info=subscription_info, |
|
|
subscription_info=subscription_info, |
|
|
data=json.dumps(message), |
|
|
data=json.dumps(message), |
|
|
vapid_private_key=vapid_key.private_key, |
|
|
vapid_private_key=private_key_bytes, |
|
|
vapid_claims={ |
|
|
vapid_claims={ |
|
|
"sub": "mailto:your-email@example.com" |
|
|
"sub": "mailto:your-email@example.com" |
|
|
} |
|
|
} |
|
|
) |
|
|
) |
|
|
|
|
|
|
|
|
except WebPushException as e: |
|
|
except WebPushException as e: |
|
|
print(f"Failed to send notification: {e}") |
|
|
print(f"Failed to send notification: {e}") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def is_valid_base64_url(s): |
|
|
def is_valid_base64_url(s): |
|
|
return re.match(r'^[A-Za-z0-9_-]*$', s) is not None |
|
|
return re.match(r'^[A-Za-z0-9_-]*$', s) is not None |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def is_valid_server_key(server_key): |
|
|
def is_valid_server_key(server_key): |
|
|
if not is_valid_base64_url(server_key): |
|
|
if not is_valid_base64_url(server_key): |
|
|
return False |
|
|
return False |
|
|
|
|
|
|
|
|
return len(server_key) == 88 |
|
|
return len(server_key) == 88 |
|
|
|
|
|
|
|
|
@app.route('/web-push/vapid', methods=['GET']) |
|
|
|
|
|
def get_vapid(): |
|
|
@app.route('/web-push/clear_subscriptions', methods=['POST']) |
|
|
key = VAPIDKey.query.first() |
|
|
def clear_subscriptions(): |
|
|
pem_bytes = key.private_key.encode("utf-8") |
|
|
try: |
|
|
private_key = serialization.load_pem_private_key(pem_bytes, password=None) |
|
|
Subscription.query.delete() |
|
|
public_key = private_key.public_key() |
|
|
return jsonify(message='Subscriptions cleared') |
|
|
public_key_der = public_key.public_bytes( |
|
|
|
|
|
encoding=serialization.Encoding.DER, |
|
|
|
|
|
format=serialization.PublicFormat.SubjectPublicKeyInfo |
|
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
base64_data = base64.urlsafe_b64encode(public_key_der) |
|
|
except Exception as e: |
|
|
base64_str = base64_data.decode('utf-8') |
|
|
return jsonify(error=f'Error clearing subscriptions: {str(e)}'), 500 |
|
|
base64_str = base64_str.rstrip('=') |
|
|
|
|
|
|
|
|
|
|
|
if is_valid_server_key(base64_str): |
|
|
|
|
|
return jsonify(vapidKey=base64_str) |
|
|
|
|
|
|
|
|
|
|
|
else: |
|
|
@app.route('/web-push/regenerate_vapid_keys', methods=['POST']) |
|
|
return jsonify(error=f'VAPID server key is not valid. {len(base64_str)} {is_valid_base64_url(base64_str)}'), 422 |
|
|
def regenerate_vapid_keys(): |
|
|
|
|
|
try: |
|
|
|
|
|
# Generate new VAPID keys |
|
|
|
|
|
VAPIDKey.query.delete() |
|
|
|
|
|
generate_and_save_vapid_keys() |
|
|
|
|
|
return jsonify(message='VAPID keys regenerated successfully') |
|
|
|
|
|
|
|
|
return jsonify(error='No VAPID keys found'), 404 |
|
|
except Exception as e: |
|
|
|
|
|
return jsonify(error=f'Error regenerating VAPID keys: {str(e)}'), 500 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@app.route('/web-push/vapid', methods=['GET']) |
|
|
|
|
|
def get_vapid(): |
|
|
|
|
|
key = VAPIDKey.query.first() |
|
|
|
|
|
return jsonify(vapidKey=key.public_key) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@app.route('/web-push/subscribe', methods=['POST']) |
|
|
@app.route('/web-push/subscribe', methods=['POST']) |
|
@ -128,4 +150,3 @@ def create_app(config_name): |
|
|
initialize() |
|
|
initialize() |
|
|
|
|
|
|
|
|
return app |
|
|
return app |
|
|
|
|
|
|
|
|