forked from jsnbuchanan/crowd-funder-for-time-pwa
- Add fetch_claim method to match TypeScript version - Implement JWT authentication for claim fetching - Update main flow to fetch claim after registration - Add error handling and logging - Match TypeScript API structure This adds the ability to fetch claim details after successful DID registration, completing the full registration and verification flow.
466 lines
14 KiB
Python
466 lines
14 KiB
Python
"""
|
|
DID Creation and Registration Flow
|
|
@author Matthew Raymer
|
|
|
|
This module implements the creation and registration of Decentralized Identifiers (DIDs)
|
|
with the endorser.ch service. It handles the complete workflow from mnemonic generation
|
|
to DID registration.
|
|
|
|
Flow:
|
|
1. Generate or load mnemonic seed phrase
|
|
2. Derive Ethereum keys and address
|
|
3. Create DID identifier
|
|
4. Initialize account
|
|
5. Create signed JWT
|
|
6. Register DID with endorser service
|
|
|
|
Features:
|
|
- Secure mnemonic generation and storage
|
|
- HD wallet key derivation
|
|
- ES256K-R JWT signing
|
|
- DID registration with endorser.ch
|
|
- Persistent mnemonic storage
|
|
|
|
Dependencies:
|
|
- mnemonic: For seed phrase generation
|
|
- eth_account: For Ethereum account operations
|
|
- eth_keys: For key manipulation
|
|
- web3: For Ethereum utilities
|
|
- requests: For API communication
|
|
|
|
Usage:
|
|
python new_flow.py
|
|
|
|
Environment:
|
|
API_SERVER: Endorser API endpoint (defaults to test server)
|
|
"""
|
|
|
|
import json
|
|
import time
|
|
from mnemonic import Mnemonic
|
|
from eth_account import Account
|
|
from eth_account.hdaccount import generate_mnemonic
|
|
import jwt
|
|
from eth_account._utils.signing import sign_message_hash
|
|
from eth_utils.curried import keccak
|
|
import requests
|
|
from typing import Tuple, Dict, Any
|
|
import os
|
|
from web3 import Web3
|
|
from eth_keys import keys
|
|
import base64
|
|
import json
|
|
from jwcrypto import jwk
|
|
import asyncio
|
|
from cryptography.hazmat.primitives import serialization
|
|
from cryptography.hazmat.primitives.asymmetric import ec
|
|
|
|
# Constants
|
|
DEFAULT_ROOT_DERIVATION_PATH = "m/84737769'/0'/0'/0'" # Custom derivation path for TimeSafari
|
|
API_SERVER = "https://test-api.endorser.ch" # Endorser API endpoint
|
|
ENDORSER_DID = "did:ethr:0x0000694B58C2cC69658993A90D3840C560f2F51F" # Keep original case
|
|
ENDORSER_PRIVATE_KEY = "2b6472c026ec2aa2c4235c994a63868fc9212d18b58f6cbfe861b52e71330f5b"
|
|
|
|
|
|
def derive_address(
|
|
mnemonic: str,
|
|
derivation_path: str = DEFAULT_ROOT_DERIVATION_PATH) -> Tuple[str, str, str, str]:
|
|
"""
|
|
Derive Ethereum address and keys from a mnemonic phrase.
|
|
|
|
Args:
|
|
mnemonic (str): The mnemonic seed phrase
|
|
derivation_path (str): HD wallet derivation path
|
|
|
|
Returns:
|
|
Tuple[str, str, str, str]: (address, private_key_hex, public_key_hex, derivation_path)
|
|
"""
|
|
# Enable HD wallet features before using mnemonic
|
|
Account.enable_unaudited_hdwallet_features()
|
|
|
|
mnemonic = mnemonic.strip().lower()
|
|
seed = Mnemonic("english").to_seed(mnemonic)
|
|
|
|
# Derive the account using eth-account
|
|
account = Account.from_mnemonic(mnemonic, account_path=derivation_path)
|
|
address = account.address
|
|
private_key = account.key
|
|
|
|
# Format private key without '0x' prefix
|
|
private_hex = private_key.hex()[2:] if private_key.hex().startswith('0x') else private_key.hex()
|
|
|
|
# Derive public key from private key
|
|
pk = keys.PrivateKey(private_key)
|
|
public_key = pk.public_key
|
|
public_hex = public_key.to_hex()[2:] # Remove '0x' prefix
|
|
|
|
return address, private_hex, public_hex, derivation_path
|
|
|
|
def new_identifier(
|
|
address: str,
|
|
public_key_hex: str,
|
|
private_key_hex: str,
|
|
derivation_path: str) -> Dict[str, Any]:
|
|
"""
|
|
Create a new DID identifier with associated keys.
|
|
|
|
Args:
|
|
address (str): Ethereum address
|
|
public_key_hex (str): Public key in hex format
|
|
private_key_hex (str): Private key in hex format
|
|
derivation_path (str): HD wallet derivation path
|
|
|
|
Returns:
|
|
Dict[str, Any]: DID identifier object with keys and services
|
|
"""
|
|
did = f"did:ethr:{address}"
|
|
key_id = f"{did}#keys-1"
|
|
|
|
identifier = {
|
|
"did": did,
|
|
"keys": [{
|
|
"id": key_id,
|
|
"type": "Secp256k1VerificationKey2018",
|
|
"controller": did,
|
|
"ethereumAddress": address,
|
|
"publicKeyHex": public_key_hex,
|
|
"privateKeyHex": private_key_hex
|
|
}],
|
|
"services": []
|
|
}
|
|
return identifier
|
|
|
|
def initialize_account() -> Dict[str, Any]:
|
|
"""
|
|
Initialize a new DID account or load existing one.
|
|
|
|
This function either generates a new mnemonic or loads an existing one
|
|
from mnemonic.txt, then derives the necessary keys and creates a DID.
|
|
|
|
Returns:
|
|
Dict[str, Any]: The initialized DID identity object
|
|
|
|
Side effects:
|
|
- Creates/reads mnemonic.txt file
|
|
- Prints account initialization details
|
|
"""
|
|
Account.enable_unaudited_hdwallet_features()
|
|
|
|
# Generate or load mnemonic
|
|
if not os.path.exists('mnemonic.txt'):
|
|
mnemonic = generate_mnemonic(lang="english", num_words=24)
|
|
with open('mnemonic.txt', 'w', encoding='utf-8') as f:
|
|
f.write(mnemonic)
|
|
else:
|
|
with open('mnemonic.txt', 'r', encoding='utf-8') as f:
|
|
mnemonic = f.read().strip()
|
|
|
|
# Derive keys and create identity
|
|
address, private_hex, public_hex, derivation_path = derive_address(mnemonic)
|
|
|
|
# Create DID identifier
|
|
identity = new_identifier(address, public_hex, private_hex, derivation_path)
|
|
|
|
# Format account data
|
|
account_data = {
|
|
"did": identity["did"],
|
|
"identity": identity,
|
|
"mnemonic": mnemonic,
|
|
"derivation_path": derivation_path
|
|
}
|
|
print("\nAccount initialized:")
|
|
print(json.dumps(account_data, indent=2))
|
|
print()
|
|
return identity
|
|
|
|
async def create_endorser_jwt(
|
|
did: str,
|
|
private_key_hex: str,
|
|
payload: Dict[str, Any],
|
|
sub_did: str = None,
|
|
expires_in: int = 3600) -> str:
|
|
"""Create a signed JWT for DID registration."""
|
|
# Prepare JWT headers and payload
|
|
headers = {
|
|
"typ": "JWT",
|
|
"alg": "ES256K"
|
|
}
|
|
|
|
current_time = int(time.time())
|
|
jwt_payload = {
|
|
"iat": current_time,
|
|
"exp": current_time + expires_in,
|
|
"iss": did,
|
|
**payload
|
|
}
|
|
|
|
# Convert private key to PEM format
|
|
private_key_int = int(private_key_hex, 16)
|
|
private_key = ec.derive_private_key(
|
|
private_key_int,
|
|
ec.SECP256K1()
|
|
)
|
|
|
|
# Serialize to PEM
|
|
pem_private = private_key.private_bytes(
|
|
encoding=serialization.Encoding.PEM,
|
|
format=serialization.PrivateFormat.PKCS8,
|
|
encryption_algorithm=serialization.NoEncryption()
|
|
)
|
|
|
|
# Sign using PyJWT with custom signing
|
|
token = jwt.encode(
|
|
jwt_payload,
|
|
pem_private,
|
|
algorithm="ES256K",
|
|
headers=headers
|
|
)
|
|
|
|
return token
|
|
|
|
# Step 6: Create and submit a registration claim
|
|
async def register(
|
|
active_did: str,
|
|
private_key_hex: str,
|
|
api_server: str = API_SERVER) -> Dict[str, Any]:
|
|
"""
|
|
Register a DID with the endorser service by submitting a signed verifiable credential.
|
|
|
|
This function creates and submits a registration claim to the endorser service. The process:
|
|
1. Creates a Verifiable Credential (VC) for registration
|
|
2. Signs the VC using ES256K-R JWT format
|
|
3. Submits the signed JWT to the endorser API
|
|
4. Handles the response and any potential errors
|
|
|
|
Args:
|
|
active_did (str): The DID to register (e.g., "did:ethr:0x...")
|
|
private_key_hex (str): Private key in hex format for signing the JWT
|
|
api_server (str, optional): Endorser API endpoint. Defaults to API_SERVER.
|
|
|
|
Returns:
|
|
Dict[str, Any]: Registration result with either:
|
|
{"success": True} for successful registration
|
|
{"error": str, "details": Dict} for failures
|
|
|
|
Raises:
|
|
requests.RequestException: For network or API communication errors
|
|
|
|
Example Success:
|
|
{
|
|
"success": True
|
|
}
|
|
|
|
Example Failure:
|
|
{
|
|
"error": "Registration failed",
|
|
"details": {
|
|
"error": {
|
|
"message": "JWT verification failed",
|
|
"code": "JWT_VERIFY_FAILED"
|
|
}
|
|
}
|
|
}
|
|
|
|
Technical Details:
|
|
- Uses ES256K-R JWT signing
|
|
- Follows W3C Verifiable Credentials data model
|
|
- Implements proper error handling and reporting
|
|
- Supports custom API endpoints for testing
|
|
"""
|
|
print("\nEndorser DID:", ENDORSER_DID)
|
|
print("Active DID:", active_did)
|
|
|
|
# Create registration claim matching TypeScript version
|
|
vcPayload = {
|
|
"vc": {
|
|
"@context": ["https://www.w3.org/2018/credentials/v1"],
|
|
"type": ["VerifiableCredential"],
|
|
"credentialSubject": {
|
|
"@context": "https://schema.org",
|
|
"@type": "RegisterAction",
|
|
"agent": {
|
|
"identifier": ENDORSER_DID
|
|
},
|
|
"participant": {
|
|
"identifier": active_did
|
|
},
|
|
"object": "endorser.ch",
|
|
"endTime": int(time.time()) + 7*24*60*60 # Next week epoch
|
|
}
|
|
}
|
|
}
|
|
|
|
# Debug output
|
|
print("\nRegistration Claim:")
|
|
print(json.dumps(vcPayload, indent=2))
|
|
print()
|
|
|
|
# Sign with endorser's DID and private key
|
|
jwt_token = await create_endorser_jwt(
|
|
did=ENDORSER_DID,
|
|
private_key_hex=ENDORSER_PRIVATE_KEY,
|
|
payload=vcPayload,
|
|
sub_did=active_did
|
|
)
|
|
|
|
# Debug output
|
|
print("Generated JWT:")
|
|
print(jwt_token)
|
|
print()
|
|
|
|
# Debug JWT parts
|
|
header, payload, signature = jwt_token.split('.')
|
|
print("\nJWT Header:", json.loads(base64.urlsafe_b64decode(header + '==').decode()))
|
|
print("JWT Payload:", json.loads(base64.urlsafe_b64decode(payload + '==').decode()))
|
|
print("JWT Signature length:", len(base64.urlsafe_b64decode(signature + '==')), "bytes")
|
|
print()
|
|
|
|
try:
|
|
response = requests.post(
|
|
f"{api_server}/api/v2/claim",
|
|
json={"jwtEncoded": jwt_token},
|
|
headers={"Content-Type": "application/json"}
|
|
)
|
|
response_data = response.json()
|
|
|
|
if response.status_code == 200 and response_data.get("success", {}).get("handleId"):
|
|
return {
|
|
"success": True,
|
|
"handleId": response_data["success"]["handleId"],
|
|
"registrationId": response_data["success"]["registrationId"],
|
|
"claimId": response_data["success"]["claimId"]
|
|
}
|
|
|
|
# Log full error response
|
|
print("\nAPI Error Response:")
|
|
print(json.dumps(response_data, indent=2))
|
|
print()
|
|
|
|
# Handle partial success
|
|
if response_data.get("success"):
|
|
return {
|
|
"success": True,
|
|
**response_data["success"]
|
|
}
|
|
|
|
return {"error": "Registration failed", "details": response_data}
|
|
|
|
except requests.RequestException as e:
|
|
print(f"\nAPI Request Error: {str(e)}")
|
|
return {"error": f"Error submitting claim: {str(e)}"}
|
|
|
|
async def fetch_claim(
|
|
claim_id: str,
|
|
active_did: str,
|
|
private_key_hex: str,
|
|
api_server: str = API_SERVER
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Fetch claim details from the endorser service.
|
|
|
|
Args:
|
|
claim_id (str): The ID of the claim to fetch
|
|
active_did (str): The DID making the request
|
|
private_key_hex (str): Private key for signing auth JWT
|
|
api_server (str, optional): API endpoint. Defaults to API_SERVER.
|
|
|
|
Returns:
|
|
Dict[str, Any]: The claim details
|
|
|
|
Raises:
|
|
requests.RequestException: For API communication errors
|
|
"""
|
|
# Create authentication JWT
|
|
auth_payload = {
|
|
"intent": "claim.read",
|
|
"claimId": claim_id
|
|
}
|
|
|
|
# Sign with the active DID (not endorser DID)
|
|
auth_token = await create_endorser_jwt(
|
|
active_did,
|
|
private_key_hex,
|
|
auth_payload
|
|
)
|
|
|
|
try:
|
|
response = requests.get(
|
|
f"{api_server}/api/claim/byHandle/{claim_id}",
|
|
headers={
|
|
'Authorization': f'Bearer {auth_token}',
|
|
'Content-Type': 'application/json'
|
|
}
|
|
)
|
|
return response.json()
|
|
|
|
except requests.RequestException as e:
|
|
print("\nError fetching claim:")
|
|
print(f"Error: {str(e)}")
|
|
raise
|
|
|
|
# Main execution
|
|
async def main():
|
|
"""
|
|
Main execution flow for DID creation and registration.
|
|
|
|
This function orchestrates the complete DID lifecycle:
|
|
1. Creates or loads a DID identity
|
|
2. Extracts the necessary credentials
|
|
3. Registers the DID with the endorser service
|
|
|
|
The process involves:
|
|
- Generating/loading a mnemonic seed phrase
|
|
- Deriving Ethereum keys and address
|
|
- Creating a DID identifier
|
|
- Signing registration claims
|
|
- Submitting to the endorser API
|
|
|
|
Side Effects:
|
|
- Creates mnemonic.txt if it doesn't exist
|
|
- Prints account initialization details
|
|
- Prints registration results
|
|
|
|
Error Handling:
|
|
- Handles API communication errors
|
|
- Reports registration failures
|
|
- Provides detailed error messages
|
|
|
|
Example Output:
|
|
```
|
|
Account initialized:
|
|
{
|
|
"did": "did:ethr:0x...",
|
|
"identity": {...},
|
|
"mnemonic": "word1 word2 ...",
|
|
"derivation_path": "m/84737769'/0'/0'/0'"
|
|
}
|
|
|
|
Registration result: {"success": true}
|
|
```
|
|
"""
|
|
# Step 1: Create a new DID
|
|
identity = initialize_account()
|
|
active_did = identity["did"]
|
|
private_key_hex = identity["keys"][0]["privateKeyHex"]
|
|
|
|
# Step 2: Register the DID
|
|
result = await register(active_did, private_key_hex)
|
|
print("Registration result:", result)
|
|
|
|
# Step 3: If registration successful, fetch claim details
|
|
if result.get("success") and result.get("claimId"):
|
|
print("\nFetching claim details...")
|
|
try:
|
|
claim_details = await fetch_claim(
|
|
result["claimId"],
|
|
active_did,
|
|
private_key_hex
|
|
)
|
|
print("\nClaim Details:")
|
|
print(json.dumps(claim_details, indent=2))
|
|
except Exception as e:
|
|
print(f"Error fetching claim: {str(e)}")
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(main())
|