| 
						
						
						
					 | 
				
				 | 
				
					@ -1,9 +1,13 @@ | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				
					# Test subscription creation | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				
					# | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				
					# Setup with `pip install -r requirements.txt` and then `source venv/bin/activate` | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				
					
 | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				
					from pywebpush import webpush, WebPushException | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				
					
 | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				
					import json | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				
					import base64 | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				
					import datetime | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				
					import json | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				
					import sys | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				
					
 | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				
					# these below will only work for the browser that acquired the subscription info.  You will need to extract those from our console OR the db. | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				
					
 | 
				
			
			
		
	
	
		
			
				
					| 
						
						
						
							
								
							
						
					 | 
				
				 | 
				
					@ -12,9 +16,11 @@ import base64 | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				
					# sVR_s8J4JHv3h4ZmvemL5w | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				
					
 | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				
					
 | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				
					
 | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				
					private_key_hex = "308187020100301306072a8648ce3d020106082a8648ce3d030107046d306b0201010420ac868a9588a69ec9626db857caae42b0d654288abf73b0d8f1a6b43fff093508a1440342000466da453c1e793d7e21ab63b334e80f96715aa97e578639b5fe8092f8752fa6e1f9182324846c70bf1a3480411ba787e652be8049a36a14294681f745c4c4c4f7" | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				
					
 | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				
					private_key_der = bytes.fromhex(private_key_hex) | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				
					
 | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				
					private_key_base64 = base64.b64encode(private_key_der).decode() | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				
					
 | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				
					subscription_info = { | 
				
			
			
		
	
	
		
			
				
					| 
						
						
						
							
								
							
						
					 | 
				
				 | 
				
					@ -25,18 +31,29 @@ subscription_info = { | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				
					    } | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				
					} | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				
					
 | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				
					data = json.dumps({"title": "test", "message": "here is a message"}) | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				
					# set subscription_info from arg 1 (sqlite results) | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				
					if len(sys.argv) > 1: | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				
					    argument = sys.argv[1] | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				
					    parts = argument.split('|') | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				
					    if len(parts) > 0: | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				
					        subscription_info['endpoint'] = parts[0] | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				
					        if len(parts) > 1: | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				
					            subscription_info['keys']['p256dh'] = parts[1] | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				
					            if len(parts) > 2: | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				
					                subscription_info['keys']['auth'] = parts[2] | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				
					
 | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				
					now = datetime.datetime.now().isoformat() | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				
					data = json.dumps({"title": "test", "message": f"Message at {now}"}) | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				
					
 | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				
					try: | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				
					    webpush(subscription_info, | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				
					    result = webpush(subscription_info, | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				
					            data, | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				
					#            vapid_private_key="./private_key.pem", | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				
					#            vapid_private_key="MIGHAgEAMBMGByqGSM49AgEGCCqGSM49AwEHBG0wawIBAQQgrIaKlYimnslibbhXyq5CsNZUKIq/c7DY8aa0P/8JNQihRANCAARm2kU8Hnk9fiGrY7M06A+WcVqpfleGObX+gJL4dS+m4fkYIySEbHC/GjSAQRunh+ZSvoBJo2oUKUaB90XExMT3", | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				
					            vapid_private_key=private_key_base64, | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				
					            vapid_claims={"sub": "mailto:matthew.raymer@gmail.com"}) | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				
					    print(f"Result from remote service: {result}") | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				
					except WebPushException as ex: | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				
					    print(f"An error occurred: {ex}") | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				
					    # Check if there is a response from the remote service. | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				
					    if ex.response: | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				
					        response_data = ex.response.json() | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				
					        print(f"Response from remote service: {response_data}") | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				
					        print(f"Error response from remote service: {response_data}") | 
				
			
			
		
	
	
		
			
				
					| 
						
						
						
					 | 
				
				 | 
				
					
  |