forked from jsnbuchanan/crowd-funder-for-time-pwa
- Add full DID registration flow matching TypeScript version - Implement ES256K JWT signing with PEM key format - Add async/await support for JWT operations - Improve error handling and debug output - Add rich documentation and type hints Technical Changes: - Convert private key to PEM format for JWT signing - Match TypeScript's JWT payload structure - Add proper JWT header with ES256K algorithm - Implement async functions for JWT creation - Add detailed debug output for JWT parts Documentation: - Add module-level docstring with flow description - Add function-level docstrings with examples - Document security considerations - Add technical details and error handling info Dependencies: - Add cryptography for key format conversion - Add jwcrypto for JWT operations - Update requirements.txt with versions and comments This commit implements the complete DID registration flow, matching the TypeScript implementation's behavior and adding comprehensive documentation and error handling.
403 lines
12 KiB
Python
403 lines
12 KiB
Python
"""
|
|
DID Creation and Registration Flow
|
|
@author Matthew Raymer
|
|
|
|
This module implements the creation and registration of Decentralized Identifiers (DIDs)
|
|
with the endorser.ch service. It handles the complete workflow from mnemonic generation
|
|
to DID registration.
|
|
|
|
Flow:
|
|
1. Generate or load mnemonic seed phrase
|
|
2. Derive Ethereum keys and address
|
|
3. Create DID identifier
|
|
4. Initialize account
|
|
5. Create signed JWT
|
|
6. Register DID with endorser service
|
|
|
|
Features:
|
|
- Secure mnemonic generation and storage
|
|
- HD wallet key derivation
|
|
- ES256K-R JWT signing
|
|
- DID registration with endorser.ch
|
|
- Persistent mnemonic storage
|
|
|
|
Dependencies:
|
|
- mnemonic: For seed phrase generation
|
|
- eth_account: For Ethereum account operations
|
|
- eth_keys: For key manipulation
|
|
- web3: For Ethereum utilities
|
|
- requests: For API communication
|
|
|
|
Usage:
|
|
python new_flow.py
|
|
|
|
Environment:
|
|
API_SERVER: Endorser API endpoint (defaults to test server)
|
|
"""
|
|
|
|
import json
|
|
import time
|
|
from mnemonic import Mnemonic
|
|
from eth_account import Account
|
|
from eth_account.hdaccount import generate_mnemonic
|
|
import jwt
|
|
from eth_account._utils.signing import sign_message_hash
|
|
from eth_utils.curried import keccak
|
|
import requests
|
|
from typing import Tuple, Dict, Any
|
|
import os
|
|
from web3 import Web3
|
|
from eth_keys import keys
|
|
import base64
|
|
import json
|
|
from jwcrypto import jwk
|
|
import asyncio
|
|
from cryptography.hazmat.primitives import serialization
|
|
from cryptography.hazmat.primitives.asymmetric import ec
|
|
|
|
# Constants
|
|
DEFAULT_ROOT_DERIVATION_PATH = "m/84737769'/0'/0'/0'" # Custom derivation path for TimeSafari
|
|
API_SERVER = "https://test-api.endorser.ch" # Endorser API endpoint
|
|
ENDORSER_DID = "did:ethr:0x0000694B58C2cC69658993A90D3840C560f2F51F" # Keep original case
|
|
ENDORSER_PRIVATE_KEY = "2b6472c026ec2aa2c4235c994a63868fc9212d18b58f6cbfe861b52e71330f5b"
|
|
|
|
|
|
def derive_address(
|
|
mnemonic: str,
|
|
derivation_path: str = DEFAULT_ROOT_DERIVATION_PATH) -> Tuple[str, str, str, str]:
|
|
"""
|
|
Derive Ethereum address and keys from a mnemonic phrase.
|
|
|
|
Args:
|
|
mnemonic (str): The mnemonic seed phrase
|
|
derivation_path (str): HD wallet derivation path
|
|
|
|
Returns:
|
|
Tuple[str, str, str, str]: (address, private_key_hex, public_key_hex, derivation_path)
|
|
"""
|
|
# Enable HD wallet features before using mnemonic
|
|
Account.enable_unaudited_hdwallet_features()
|
|
|
|
mnemonic = mnemonic.strip().lower()
|
|
seed = Mnemonic("english").to_seed(mnemonic)
|
|
|
|
# Derive the account using eth-account
|
|
account = Account.from_mnemonic(mnemonic, account_path=derivation_path)
|
|
address = account.address
|
|
private_key = account.key
|
|
|
|
# Format private key without '0x' prefix
|
|
private_hex = private_key.hex()[2:] if private_key.hex().startswith('0x') else private_key.hex()
|
|
|
|
# Derive public key from private key
|
|
pk = keys.PrivateKey(private_key)
|
|
public_key = pk.public_key
|
|
public_hex = public_key.to_hex()[2:] # Remove '0x' prefix
|
|
|
|
return address, private_hex, public_hex, derivation_path
|
|
|
|
def new_identifier(
|
|
address: str,
|
|
public_key_hex: str,
|
|
private_key_hex: str,
|
|
derivation_path: str) -> Dict[str, Any]:
|
|
"""
|
|
Create a new DID identifier with associated keys.
|
|
|
|
Args:
|
|
address (str): Ethereum address
|
|
public_key_hex (str): Public key in hex format
|
|
private_key_hex (str): Private key in hex format
|
|
derivation_path (str): HD wallet derivation path
|
|
|
|
Returns:
|
|
Dict[str, Any]: DID identifier object with keys and services
|
|
"""
|
|
did = f"did:ethr:{address}"
|
|
key_id = f"{did}#keys-1"
|
|
|
|
identifier = {
|
|
"did": did,
|
|
"keys": [{
|
|
"id": key_id,
|
|
"type": "Secp256k1VerificationKey2018",
|
|
"controller": did,
|
|
"ethereumAddress": address,
|
|
"publicKeyHex": public_key_hex,
|
|
"privateKeyHex": private_key_hex
|
|
}],
|
|
"services": []
|
|
}
|
|
return identifier
|
|
|
|
def initialize_account() -> Dict[str, Any]:
|
|
"""
|
|
Initialize a new DID account or load existing one.
|
|
|
|
This function either generates a new mnemonic or loads an existing one
|
|
from mnemonic.txt, then derives the necessary keys and creates a DID.
|
|
|
|
Returns:
|
|
Dict[str, Any]: The initialized DID identity object
|
|
|
|
Side effects:
|
|
- Creates/reads mnemonic.txt file
|
|
- Prints account initialization details
|
|
"""
|
|
Account.enable_unaudited_hdwallet_features()
|
|
|
|
# Generate or load mnemonic
|
|
if not os.path.exists('mnemonic.txt'):
|
|
mnemonic = generate_mnemonic(lang="english", num_words=24)
|
|
with open('mnemonic.txt', 'w', encoding='utf-8') as f:
|
|
f.write(mnemonic)
|
|
else:
|
|
with open('mnemonic.txt', 'r', encoding='utf-8') as f:
|
|
mnemonic = f.read().strip()
|
|
|
|
# Derive keys and create identity
|
|
address, private_hex, public_hex, derivation_path = derive_address(mnemonic)
|
|
|
|
# Create DID identifier
|
|
identity = new_identifier(address, public_hex, private_hex, derivation_path)
|
|
|
|
# Format account data
|
|
account_data = {
|
|
"did": identity["did"],
|
|
"identity": identity,
|
|
"mnemonic": mnemonic,
|
|
"derivation_path": derivation_path
|
|
}
|
|
print("\nAccount initialized:")
|
|
print(json.dumps(account_data, indent=2))
|
|
print()
|
|
return identity
|
|
|
|
async def create_endorser_jwt(
|
|
did: str,
|
|
private_key_hex: str,
|
|
payload: Dict[str, Any],
|
|
sub_did: str = None,
|
|
expires_in: int = 3600) -> str:
|
|
"""Create a signed JWT for DID registration."""
|
|
# Prepare JWT headers and payload
|
|
headers = {
|
|
"typ": "JWT",
|
|
"alg": "ES256K"
|
|
}
|
|
|
|
current_time = int(time.time())
|
|
jwt_payload = {
|
|
"iat": current_time,
|
|
"exp": current_time + expires_in,
|
|
"iss": did,
|
|
**payload
|
|
}
|
|
|
|
# Convert private key to PEM format
|
|
private_key_int = int(private_key_hex, 16)
|
|
private_key = ec.derive_private_key(
|
|
private_key_int,
|
|
ec.SECP256K1()
|
|
)
|
|
|
|
# Serialize to PEM
|
|
pem_private = private_key.private_bytes(
|
|
encoding=serialization.Encoding.PEM,
|
|
format=serialization.PrivateFormat.PKCS8,
|
|
encryption_algorithm=serialization.NoEncryption()
|
|
)
|
|
|
|
# Sign using PyJWT with custom signing
|
|
token = jwt.encode(
|
|
jwt_payload,
|
|
pem_private,
|
|
algorithm="ES256K",
|
|
headers=headers
|
|
)
|
|
|
|
return token
|
|
|
|
# Step 6: Create and submit a registration claim
|
|
async def register(
|
|
active_did: str,
|
|
private_key_hex: str,
|
|
api_server: str = API_SERVER) -> Dict[str, Any]:
|
|
"""
|
|
Register a DID with the endorser service by submitting a signed verifiable credential.
|
|
|
|
This function creates and submits a registration claim to the endorser service. The process:
|
|
1. Creates a Verifiable Credential (VC) for registration
|
|
2. Signs the VC using ES256K-R JWT format
|
|
3. Submits the signed JWT to the endorser API
|
|
4. Handles the response and any potential errors
|
|
|
|
Args:
|
|
active_did (str): The DID to register (e.g., "did:ethr:0x...")
|
|
private_key_hex (str): Private key in hex format for signing the JWT
|
|
api_server (str, optional): Endorser API endpoint. Defaults to API_SERVER.
|
|
|
|
Returns:
|
|
Dict[str, Any]: Registration result with either:
|
|
{"success": True} for successful registration
|
|
{"error": str, "details": Dict} for failures
|
|
|
|
Raises:
|
|
requests.RequestException: For network or API communication errors
|
|
|
|
Example Success:
|
|
{
|
|
"success": True
|
|
}
|
|
|
|
Example Failure:
|
|
{
|
|
"error": "Registration failed",
|
|
"details": {
|
|
"error": {
|
|
"message": "JWT verification failed",
|
|
"code": "JWT_VERIFY_FAILED"
|
|
}
|
|
}
|
|
}
|
|
|
|
Technical Details:
|
|
- Uses ES256K-R JWT signing
|
|
- Follows W3C Verifiable Credentials data model
|
|
- Implements proper error handling and reporting
|
|
- Supports custom API endpoints for testing
|
|
"""
|
|
print("\nEndorser DID:", ENDORSER_DID)
|
|
print("Active DID:", active_did)
|
|
|
|
# Create registration claim matching TypeScript version
|
|
vcPayload = {
|
|
"vc": {
|
|
"@context": ["https://www.w3.org/2018/credentials/v1"],
|
|
"type": ["VerifiableCredential"],
|
|
"credentialSubject": {
|
|
"@context": "https://schema.org",
|
|
"@type": "RegisterAction",
|
|
"agent": {
|
|
"identifier": ENDORSER_DID
|
|
},
|
|
"participant": {
|
|
"identifier": active_did
|
|
},
|
|
"object": "endorser.ch",
|
|
"endTime": int(time.time()) + 7*24*60*60 # Next week epoch
|
|
}
|
|
}
|
|
}
|
|
|
|
# Debug output
|
|
print("\nRegistration Claim:")
|
|
print(json.dumps(vcPayload, indent=2))
|
|
print()
|
|
|
|
# Sign with endorser's DID and private key
|
|
jwt_token = await create_endorser_jwt(
|
|
did=ENDORSER_DID,
|
|
private_key_hex=ENDORSER_PRIVATE_KEY,
|
|
payload=vcPayload,
|
|
sub_did=active_did
|
|
)
|
|
|
|
# Debug output
|
|
print("Generated JWT:")
|
|
print(jwt_token)
|
|
print()
|
|
|
|
# Debug JWT parts
|
|
header, payload, signature = jwt_token.split('.')
|
|
print("\nJWT Header:", json.loads(base64.urlsafe_b64decode(header + '==').decode()))
|
|
print("JWT Payload:", json.loads(base64.urlsafe_b64decode(payload + '==').decode()))
|
|
print("JWT Signature length:", len(base64.urlsafe_b64decode(signature + '==')), "bytes")
|
|
print()
|
|
|
|
try:
|
|
response = requests.post(
|
|
f"{api_server}/api/v2/claim",
|
|
json={"jwtEncoded": jwt_token},
|
|
headers={"Content-Type": "application/json"}
|
|
)
|
|
response_data = response.json()
|
|
|
|
if response.status_code == 200 and response_data.get("success", {}).get("handleId"):
|
|
return {
|
|
"success": True,
|
|
"handleId": response_data["success"]["handleId"],
|
|
"registrationId": response_data["success"]["registrationId"],
|
|
"claimId": response_data["success"]["claimId"]
|
|
}
|
|
|
|
# Log full error response
|
|
print("\nAPI Error Response:")
|
|
print(json.dumps(response_data, indent=2))
|
|
print()
|
|
|
|
# Handle partial success
|
|
if response_data.get("success"):
|
|
return {
|
|
"success": True,
|
|
**response_data["success"]
|
|
}
|
|
|
|
return {"error": "Registration failed", "details": response_data}
|
|
|
|
except requests.RequestException as e:
|
|
print(f"\nAPI Request Error: {str(e)}")
|
|
return {"error": f"Error submitting claim: {str(e)}"}
|
|
|
|
# Main execution
|
|
async def main():
|
|
"""
|
|
Main execution flow for DID creation and registration.
|
|
|
|
This function orchestrates the complete DID lifecycle:
|
|
1. Creates or loads a DID identity
|
|
2. Extracts the necessary credentials
|
|
3. Registers the DID with the endorser service
|
|
|
|
The process involves:
|
|
- Generating/loading a mnemonic seed phrase
|
|
- Deriving Ethereum keys and address
|
|
- Creating a DID identifier
|
|
- Signing registration claims
|
|
- Submitting to the endorser API
|
|
|
|
Side Effects:
|
|
- Creates mnemonic.txt if it doesn't exist
|
|
- Prints account initialization details
|
|
- Prints registration results
|
|
|
|
Error Handling:
|
|
- Handles API communication errors
|
|
- Reports registration failures
|
|
- Provides detailed error messages
|
|
|
|
Example Output:
|
|
```
|
|
Account initialized:
|
|
{
|
|
"did": "did:ethr:0x...",
|
|
"identity": {...},
|
|
"mnemonic": "word1 word2 ...",
|
|
"derivation_path": "m/84737769'/0'/0'/0'"
|
|
}
|
|
|
|
Registration result: {"success": true}
|
|
```
|
|
"""
|
|
# Step 1: Create a new DID
|
|
identity = initialize_account()
|
|
active_did = identity["did"]
|
|
private_key_hex = identity["keys"][0]["privateKeyHex"]
|
|
|
|
# Step 2: Register the DID
|
|
result = await register(active_did, private_key_hex)
|
|
print("Registration result:", result)
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(main())
|