Browse Source

Raw DER in base64

pull/1/head
Matthew Raymer 1 year ago
parent
commit
bdbbc09376
  1. 1
      Dockerfile
  2. 19
      app.py
  3. 34
      webpush.py

1
Dockerfile

@ -6,6 +6,7 @@ RUN apk add --no-cache --virtual .build-deps build-base git
RUN apk add --upgrade --no-cache bash sqlite libffi-dev tzdata RUN apk add --upgrade --no-cache bash sqlite libffi-dev tzdata
ENV TZ America/New_York ENV TZ America/New_York
ENV PYTHONUNBUFFERED 1
# Set the working directory in the container to /app # Set the working directory in the container to /app
WORKDIR /app WORKDIR /app

19
app.py

@ -48,20 +48,27 @@ def create_app(config_name):
def send_push_notification(subscription_info, message, vapid_key): def send_push_notification(subscription_info, message, vapid_key):
result = True
try: try:
private_key_bytes = bytes.fromhex(vapid_key.private_key) private_key_hex = vapid_key.private_key
private_key_der = bytes.fromhex(private_key_hex)
private_key_base64 = base64.b64encode(private_key_der).decode()
webpush( webpush(
subscription_info=subscription_info, subscription_info=subscription_info,
data=json.dumps(message), data=json.dumps(message),
vapid_private_key=private_key_bytes, vapid_private_key=private_key_base64,
vapid_claims={ vapid_claims={
"sub": "mailto:your-email@example.com" "sub": "mailto:your-email@example.com"
} }
) )
except WebPushException as e: except Exception as e:
result = False
print(f"Failed to send notification: {e}") print(f"Failed to send notification: {e}")
return result
def is_valid_base64_url(s): def is_valid_base64_url(s):
@ -127,9 +134,9 @@ def create_app(config_name):
} }
message = {"title": "Subscription Successful", "body": "Thank you for subscribing!"} message = {"title": "Subscription Successful", "body": "Thank you for subscribing!"}
send_push_notification(subscription_info, message, vapid_key) success = send_push_notification(subscription_info, message, vapid_key)
return jsonify(success=True) return jsonify(success=success, message=vapid_key.private_key)
@app.route('/web-push/unsubscribe', methods=['DELETE']) @app.route('/web-push/unsubscribe', methods=['DELETE'])

34
webpush.py

@ -0,0 +1,34 @@
from pywebpush import webpush, WebPushException
import json
# these below will only work for the browser that acquired the subscription info. You will need to extract those from our console OR the db.
# https://fcm.googleapis.com/fcm/send/e5jo1pnUZPA:APA91bEwHV2DeLybHc23G_-zJ4FXIdK_fh16ANVOrBekDeKNdmvZ0ukkPXsYGvq629KR2_ZK2w6rQHwC6hkbZ6Xqg0HktarvFT7wTQPjq7bkLgqqjOoONYZDPriDd9DvkD2vamzrPoBx
# BCj38CJs7M9rXDnZLBVU-XHYrAIdqCuTdscC_Waj96Z2KcMuTHI46kKssjmNwdkIae3Plr__UXbisdC5YPsIeFA
# 8cyYFZDqjGiZ4nBMfa2L8Q
# MIGHAgEAMBMGByqGSM49AgEGCCqGSM49AwEHBG0wawIBAQQgrIaKlYimnslibbhXyq5CsNZUKIq/c7DY8aa0P/8JNQihRANCAARm2kU8Hnk9fiGrY7M06A+WcVqpfleGObX+gJL4dS+m4fkYIySEbHC/GjSAQRunh+ZSvoBJo2oUKUaB90XExMT3
subscription_info = {
"endpoint": "https://fcm.googleapis.com/fcm/send/e5jo1pnUZPA:APA91bEwHV2DeLybHc23G_-zJ4FXIdK_fh16ANVOrBekDeKNdmvZ0ukkPXsYGvq629KR2_ZK2w6rQHwC6hkbZ6Xqg0HktarvFT7wTQPjq7bkLgqqjOoONYZDPriDd9DvkD2vamzrPoBx",
"keys": {
"p256dh": "BCj38CJs7M9rXDnZLBVU-XHYrAIdqCuTdscC_Waj96Z2KcMuTHI46kKssjmNwdkIae3Plr__UXbisdC5YPsIeFA",
"auth": "8cyYFZDqjGiZ4nBMfa2L8Q"
}
}
data = json.dumps({"title": "test", "message": "here is a message"})
try:
webpush(subscription_info,
data,
# vapid_private_key="./private_key.pem",
vapid_private_key="MIGHAgEAMBMGByqGSM49AgEGCCqGSM49AwEHBG0wawIBAQQgrIaKlYimnslibbhXyq5CsNZUKIq/c7DY8aa0P/8JNQihRANCAARm2kU8Hnk9fiGrY7M06A+WcVqpfleGObX+gJL4dS+m4fkYIySEbHC/GjSAQRunh+ZSvoBJo2oUKUaB90XExMT3",
vapid_claims={"sub": "mailto:matthew.raymer@gmail.com"})
except WebPushException as ex:
print(f"An error occurred: {ex}")
# Check if there is a response from the remote service.
if ex.response:
response_data = ex.response.json()
print(f"Response from remote service: {response_data}")
Loading…
Cancel
Save