""" DID Creation and Registration Flow @author Matthew Raymer This module implements the creation and registration of Decentralized Identifiers (DIDs) with the endorser.ch service. It handles the complete workflow from mnemonic generation to DID registration. Flow: 1. Generate or load mnemonic seed phrase 2. Derive Ethereum keys and address 3. Create DID identifier 4. Initialize account 5. Create signed JWT 6. Register DID with endorser service Features: - Secure mnemonic generation and storage - HD wallet key derivation - ES256K-R JWT signing - DID registration with endorser.ch - Persistent mnemonic storage Dependencies: - mnemonic: For seed phrase generation - eth_account: For Ethereum account operations - eth_keys: For key manipulation - web3: For Ethereum utilities - requests: For API communication Usage: python new_flow.py Environment: API_SERVER: Endorser API endpoint (defaults to test server) """ import json import time from mnemonic import Mnemonic from eth_account import Account from eth_account.hdaccount import generate_mnemonic import jwt from eth_account._utils.signing import sign_message_hash from eth_utils.curried import keccak import requests from typing import Tuple, Dict, Any import os from web3 import Web3 from eth_keys import keys import base64 import json from jwcrypto import jwk import asyncio from cryptography.hazmat.primitives import serialization from cryptography.hazmat.primitives.asymmetric import ec from pathlib import Path # Constants DEFAULT_ROOT_DERIVATION_PATH = "m/84737769'/0'/0'/0'" # Custom derivation path for TimeSafari API_SERVER = "https://test-api.endorser.ch" # Endorser API endpoint ENDORSER_DID = "did:ethr:0x0000694B58C2cC69658993A90D3840C560f2F51F" # Keep original case ENDORSER_PRIVATE_KEY = "2b6472c026ec2aa2c4235c994a63868fc9212d18b58f6cbfe861b52e71330f5b" # Create .generated directory if it doesn't exist GENERATED_DIR = Path('.generated') def initialize_directories(): """Create necessary directories for storing generated files.""" try: GENERATED_DIR.mkdir(exist_ok=True) except Exception as e: print(f"Error creating .generated directory: {e}") raise def derive_address( mnemonic: str, derivation_path: str = DEFAULT_ROOT_DERIVATION_PATH) -> Tuple[str, str, str, str]: """ Derive Ethereum address and keys from a mnemonic phrase. Args: mnemonic (str): The mnemonic seed phrase derivation_path (str): HD wallet derivation path Returns: Tuple[str, str, str, str]: (address, private_key_hex, public_key_hex, derivation_path) """ # Enable HD wallet features before using mnemonic Account.enable_unaudited_hdwallet_features() mnemonic = mnemonic.strip().lower() seed = Mnemonic("english").to_seed(mnemonic) # Derive the account using eth-account account = Account.from_mnemonic(mnemonic, account_path=derivation_path) address = account.address private_key = account.key # Format private key without '0x' prefix private_hex = private_key.hex()[2:] if private_key.hex().startswith('0x') else private_key.hex() # Derive public key from private key pk = keys.PrivateKey(private_key) public_key = pk.public_key public_hex = public_key.to_hex()[2:] # Remove '0x' prefix print(f" Address: {address}") print(f" Private Key: {private_hex[:8]}...") print(f" Public Key: {public_hex[:8]}...") # Save derivation data derivation_data = { "mnemonic": mnemonic, "derivation_path": derivation_path, "address": address, "private_key": private_hex, "public_key": public_hex } try: GENERATED_DIR.mkdir(exist_ok=True) with open(GENERATED_DIR / 'key_derivation.json', 'w') as f: json.dump(derivation_data, f, indent=2) except Exception as e: print(f"Error saving derivation data: {e}") return address, private_hex, public_hex, derivation_path def new_identifier( address: str, public_key_hex: str, private_key_hex: str, derivation_path: str) -> Dict[str, Any]: """ Create a new DID identifier with associated keys. Args: address (str): Ethereum address public_key_hex (str): Public key in hex format private_key_hex (str): Private key in hex format derivation_path (str): HD wallet derivation path Returns: Dict[str, Any]: DID identifier object with keys and services """ did = f"did:ethr:{address}" key_id = f"{did}#keys-1" identifier = { "did": did, "keys": [{ "id": key_id, "type": "Secp256k1VerificationKey2018", "controller": did, "ethereumAddress": address, "publicKeyHex": public_key_hex, "privateKeyHex": private_key_hex }], "services": [] } return identifier def initialize_account() -> Dict[str, Any]: """ Initialize a new DID account or load existing one. This function either generates a new mnemonic or loads an existing one from mnemonic.txt, then derives the necessary keys and creates a DID. Returns: Dict[str, Any]: The initialized DID identity object Side effects: - Creates/reads mnemonic.txt file - Prints account initialization details """ Account.enable_unaudited_hdwallet_features() # Generate or load mnemonic if not os.path.exists('mnemonic.txt'): mnemonic = generate_mnemonic(lang="english", num_words=24) with open('mnemonic.txt', 'w', encoding='utf-8') as f: f.write(mnemonic) else: with open('mnemonic.txt', 'r', encoding='utf-8') as f: mnemonic = f.read().strip() # Derive keys and create identity address, private_hex, public_hex, derivation_path = derive_address(mnemonic) # Create DID identifier identity = new_identifier(address, public_hex, private_hex, derivation_path) # Save account data account_data = { "did": identity["did"], "identity": identity, "mnemonic": mnemonic, "derivation_path": derivation_path } with open(GENERATED_DIR / 'account_init.json', 'w') as f: json.dump(account_data, f, indent=2) print("\nAccount initialized:") print(json.dumps(account_data, indent=2)) print() return identity async def create_endorser_jwt( did: str, private_key_hex: str, payload: Dict[str, Any], sub_did: str = None, expires_in: int = 3600) -> str: """Create a signed JWT for DID registration.""" # Prepare JWT headers and payload headers = { "typ": "JWT", "alg": "ES256K" } current_time = int(time.time()) jwt_payload = { "iat": current_time, "exp": current_time + expires_in, "iss": did, **payload } # Convert private key to PEM format private_key_int = int(private_key_hex, 16) private_key = ec.derive_private_key( private_key_int, ec.SECP256K1() ) # Serialize to PEM pem_private = private_key.private_bytes( encoding=serialization.Encoding.PEM, format=serialization.PrivateFormat.PKCS8, encryption_algorithm=serialization.NoEncryption() ) # Sign using PyJWT with custom signing token = jwt.encode( jwt_payload, pem_private, algorithm="ES256K", headers=headers ) return token # Step 6: Create and submit a registration claim async def register( active_did: str, private_key_hex: str, api_server: str = API_SERVER) -> Dict[str, Any]: """ Register a DID with the endorser service by submitting a signed verifiable credential. This function creates and submits a registration claim to the endorser service. The process: 1. Creates a Verifiable Credential (VC) for registration 2. Signs the VC using ES256K-R JWT format 3. Submits the signed JWT to the endorser API 4. Handles the response and any potential errors Args: active_did (str): The DID to register (e.g., "did:ethr:0x...") private_key_hex (str): Private key in hex format for signing the JWT api_server (str, optional): Endorser API endpoint. Defaults to API_SERVER. Returns: Dict[str, Any]: Registration result with either: {"success": True} for successful registration {"error": str, "details": Dict} for failures Raises: requests.RequestException: For network or API communication errors Example Success: { "success": True } Example Failure: { "error": "Registration failed", "details": { "error": { "message": "JWT verification failed", "code": "JWT_VERIFY_FAILED" } } } Technical Details: - Uses ES256K-R JWT signing - Follows W3C Verifiable Credentials data model - Implements proper error handling and reporting - Supports custom API endpoints for testing """ print("\nEndorser DID:", ENDORSER_DID) print("Active DID:", active_did) # Create registration claim matching TypeScript version vcPayload = { "vc": { "@context": ["https://www.w3.org/2018/credentials/v1"], "type": ["VerifiableCredential"], "credentialSubject": { "@context": "https://schema.org", "@type": "RegisterAction", "agent": { "identifier": ENDORSER_DID }, "participant": { "identifier": active_did }, "object": "endorser.ch", "endTime": int(time.time()) + 7*24*60*60 # Next week epoch } } } # Debug output print("\nRegistration Claim:") print(json.dumps(vcPayload, indent=2)) print() # Sign with endorser's DID and private key jwt_token = await create_endorser_jwt( did=ENDORSER_DID, private_key_hex=ENDORSER_PRIVATE_KEY, payload=vcPayload, sub_did=active_did ) # Debug output print("Generated JWT:") print(jwt_token) print() # Debug JWT parts header, payload, signature = jwt_token.split('.') print("\nJWT Header:", json.loads(base64.urlsafe_b64decode(header + '==').decode())) print("JWT Payload:", json.loads(base64.urlsafe_b64decode(payload + '==').decode())) print("JWT Signature length:", len(base64.urlsafe_b64decode(signature + '==')), "bytes") print() try: response = requests.post( f"{api_server}/api/v2/claim", json={"jwtEncoded": jwt_token}, headers={"Content-Type": "application/json"} ) response_data = response.json() if response.status_code == 200 and response_data.get("success", {}).get("handleId"): # Save registration data registration_data = { "active_did": active_did, "jwt_token": jwt_token, "response": response_data } with open(GENERATED_DIR / 'registration.json', 'w') as f: json.dump(registration_data, f, indent=2) return { "success": True, "handleId": response_data["success"]["handleId"], "registrationId": response_data["success"]["registrationId"], "claimId": response_data["success"]["claimId"] } # Log full error response print("\nAPI Error Response:") print(json.dumps(response_data, indent=2)) print() # Handle partial success if response_data.get("success"): return { "success": True, **response_data["success"] } return {"error": "Registration failed", "details": response_data} except requests.RequestException as e: print(f"\nAPI Request Error: {str(e)}") return {"error": f"Error submitting claim: {str(e)}"} async def fetch_claim( claim_id: str, active_did: str, private_key_hex: str, api_server: str = API_SERVER ) -> Dict[str, Any]: """ Fetch claim details from the endorser service. Args: claim_id (str): The ID of the claim to fetch active_did (str): The DID making the request private_key_hex (str): Private key for signing auth JWT api_server (str, optional): API endpoint. Defaults to API_SERVER. Returns: Dict[str, Any]: The claim details Raises: requests.RequestException: For API communication errors """ # Create authentication JWT auth_payload = { "intent": "claim.read", "claimId": claim_id } # Sign with the active DID (not endorser DID) auth_token = await create_endorser_jwt( active_did, private_key_hex, auth_payload ) try: response = requests.get( f"{api_server}/api/claim/byHandle/{claim_id}", headers={ 'Authorization': f'Bearer {auth_token}', 'Content-Type': 'application/json' }, timeout=30 # 30 second timeout ) claim_data = response.json() # Save claim data try: with open(GENERATED_DIR / 'claim_details.json', 'w', encoding='utf-8') as f: json.dump({ 'claim_id': claim_id, 'active_did': active_did, 'response': claim_data }, f, indent=2) except Exception as e: print(f"Error saving claim data: {e}") return claim_data except requests.RequestException as e: print("\nError fetching claim:") print(f"Error: {str(e)}") # Save error state try: with open( GENERATED_DIR / 'claim_details.json', 'w', encoding='utf-8') as f: json.dump({ 'claim_id': claim_id, 'active_did': active_did, 'error': str(e) }, f, indent=2) except Exception as write_err: print(f"Error saving claim error: {write_err}") raise async def generate_test_env(): """Generate test environment data for deeplink testing""" test_env_data = { 'CONTACT1_DID': active_did, 'CONTACT1_KEY': private_key_hex, 'CONTACT2_DID': recipient_did, 'CONTACT2_KEY': recipient_key, 'ISSUER_DID': issuer_did, 'ISSUER_KEY': issuer_key, 'TEST_JWT': jwt_token, 'API_SERVER': API_SERVER } # Write test environment variables with open(GENERATED_DIR / 'test-env.sh', 'w', encoding='utf-8') as f: for key, value in test_env_data.items(): f.write(f'export {key}="{value}"\n') # Write test contacts data contacts_data = { 'contacts': [ {'did': active_did, 'name': 'Test Contact 1'}, {'did': recipient_did, 'name': 'Test Contact 2'} ] } with open(GENERATED_DIR / 'contacts.json', 'w', encoding='utf-8') as f: json.dump(contacts_data, f, indent=2) # Main execution async def main(): """ Main execution flow for DID creation and registration. This function orchestrates the complete DID lifecycle: 1. Creates or loads a DID identity 2. Extracts the necessary credentials 3. Registers the DID with the endorser service The process involves: - Generating/loading a mnemonic seed phrase - Deriving Ethereum keys and address - Creating a DID identifier - Signing registration claims - Submitting to the endorser API Side Effects: - Creates mnemonic.txt if it doesn't exist - Prints account initialization details - Prints registration results Error Handling: - Handles API communication errors - Reports registration failures - Provides detailed error messages Example Output: ``` Account initialized: { "did": "did:ethr:0x...", "identity": {...}, "mnemonic": "word1 word2 ...", "derivation_path": "m/84737769'/0'/0'/0'" } Registration result: {"success": true} ``` """ # Initialize directories first initialize_directories() try: # Step 1: Create a new DID identity = initialize_account() active_did = identity["did"] private_key_hex = identity["keys"][0]["privateKeyHex"] # Step 2: Register the DID result = await register(active_did, private_key_hex) print("Registration result:", result) # Step 3: If registration successful, fetch claim details if result.get("success") and result.get("claimId"): print("\nFetching claim details...") try: claim_details = await fetch_claim( result["claimId"], active_did, private_key_hex ) print("\nClaim Details:") print(json.dumps(claim_details, indent=2)) except Exception as e: print(f"Error fetching claim: {str(e)}") # Step 4: Generate test environment data await generate_test_env() except Exception as e: print(f"Error: {str(e)}") if __name__ == "__main__": asyncio.run(main())