You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

469 lines
14 KiB

"""
DID Creation and Registration Flow
@author Matthew Raymer
This module implements the creation and registration of Decentralized Identifiers (DIDs)
with the endorser.ch service. It handles the complete workflow from mnemonic generation
to DID registration.
Flow:
1. Generate or load mnemonic seed phrase
2. Derive Ethereum keys and address
3. Create DID identifier
4. Initialize account
5. Create signed JWT
6. Register DID with endorser service
Features:
- Secure mnemonic generation and storage
- HD wallet key derivation
- ES256K-R JWT signing
- DID registration with endorser.ch
- Persistent mnemonic storage
Dependencies:
- mnemonic: For seed phrase generation
- eth_account: For Ethereum account operations
- eth_keys: For key manipulation
- web3: For Ethereum utilities
- requests: For API communication
Usage:
python new_flow.py
Environment:
API_SERVER: Endorser API endpoint (defaults to test server)
"""
import json
import time
from mnemonic import Mnemonic
from eth_account import Account
from eth_account.hdaccount import generate_mnemonic
import jwt
from eth_account._utils.signing import sign_message_hash
from eth_utils.curried import keccak
import requests
from typing import Tuple, Dict, Any
import os
from web3 import Web3
from eth_keys import keys
import base64
import json
from jwcrypto import jwk
import asyncio
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import ec
# Constants
DEFAULT_ROOT_DERIVATION_PATH = "m/84737769'/0'/0'/0'" # Custom derivation path for TimeSafari
API_SERVER = "https://test-api.endorser.ch" # Endorser API endpoint
ENDORSER_DID = "did:ethr:0x0000694B58C2cC69658993A90D3840C560f2F51F" # Keep original case
ENDORSER_PRIVATE_KEY = "2b6472c026ec2aa2c4235c994a63868fc9212d18b58f6cbfe861b52e71330f5b"
def derive_address(
mnemonic: str,
derivation_path: str = DEFAULT_ROOT_DERIVATION_PATH) -> Tuple[str, str, str, str]:
"""
Derive Ethereum address and keys from a mnemonic phrase.
Args:
mnemonic (str): The mnemonic seed phrase
derivation_path (str): HD wallet derivation path
Returns:
Tuple[str, str, str, str]: (address, private_key_hex, public_key_hex, derivation_path)
"""
# Enable HD wallet features before using mnemonic
Account.enable_unaudited_hdwallet_features()
mnemonic = mnemonic.strip().lower()
seed = Mnemonic("english").to_seed(mnemonic)
# Derive the account using eth-account
account = Account.from_mnemonic(mnemonic, account_path=derivation_path)
address = account.address
private_key = account.key
# Format private key without '0x' prefix
private_hex = private_key.hex()[2:] if private_key.hex().startswith('0x') else private_key.hex()
# Derive public key from private key
pk = keys.PrivateKey(private_key)
public_key = pk.public_key
public_hex = public_key.to_hex()[2:] # Remove '0x' prefix
print(f" Address: {address}")
print(f" Private Key: {private_hex[:8]}...")
print(f" Public Key: {public_hex[:8]}...")
return address, private_hex, public_hex, derivation_path
def new_identifier(
address: str,
public_key_hex: str,
private_key_hex: str,
derivation_path: str) -> Dict[str, Any]:
"""
Create a new DID identifier with associated keys.
Args:
address (str): Ethereum address
public_key_hex (str): Public key in hex format
private_key_hex (str): Private key in hex format
derivation_path (str): HD wallet derivation path
Returns:
Dict[str, Any]: DID identifier object with keys and services
"""
did = f"did:ethr:{address}"
key_id = f"{did}#keys-1"
identifier = {
"did": did,
"keys": [{
"id": key_id,
"type": "Secp256k1VerificationKey2018",
"controller": did,
"ethereumAddress": address,
"publicKeyHex": public_key_hex,
"privateKeyHex": private_key_hex
}],
"services": []
}
return identifier
def initialize_account() -> Dict[str, Any]:
"""
Initialize a new DID account or load existing one.
This function either generates a new mnemonic or loads an existing one
from mnemonic.txt, then derives the necessary keys and creates a DID.
Returns:
Dict[str, Any]: The initialized DID identity object
Side effects:
- Creates/reads mnemonic.txt file
- Prints account initialization details
"""
Account.enable_unaudited_hdwallet_features()
# Generate or load mnemonic
if not os.path.exists('mnemonic.txt'):
mnemonic = generate_mnemonic(lang="english", num_words=24)
with open('mnemonic.txt', 'w', encoding='utf-8') as f:
f.write(mnemonic)
else:
with open('mnemonic.txt', 'r', encoding='utf-8') as f:
mnemonic = f.read().strip()
# Derive keys and create identity
address, private_hex, public_hex, derivation_path = derive_address(mnemonic)
# Create DID identifier
identity = new_identifier(address, public_hex, private_hex, derivation_path)
# Format account data
account_data = {
"did": identity["did"],
"identity": identity,
"mnemonic": mnemonic,
"derivation_path": derivation_path
}
print("\nAccount initialized:")
print(json.dumps(account_data, indent=2))
print()
return identity
async def create_endorser_jwt(
did: str,
private_key_hex: str,
payload: Dict[str, Any],
sub_did: str = None,
expires_in: int = 3600) -> str:
"""Create a signed JWT for DID registration."""
# Prepare JWT headers and payload
headers = {
"typ": "JWT",
"alg": "ES256K"
}
current_time = int(time.time())
jwt_payload = {
"iat": current_time,
"exp": current_time + expires_in,
"iss": did,
**payload
}
# Convert private key to PEM format
private_key_int = int(private_key_hex, 16)
private_key = ec.derive_private_key(
private_key_int,
ec.SECP256K1()
)
# Serialize to PEM
pem_private = private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption()
)
# Sign using PyJWT with custom signing
token = jwt.encode(
jwt_payload,
pem_private,
algorithm="ES256K",
headers=headers
)
return token
# Step 6: Create and submit a registration claim
async def register(
active_did: str,
private_key_hex: str,
api_server: str = API_SERVER) -> Dict[str, Any]:
"""
Register a DID with the endorser service by submitting a signed verifiable credential.
This function creates and submits a registration claim to the endorser service. The process:
1. Creates a Verifiable Credential (VC) for registration
2. Signs the VC using ES256K-R JWT format
3. Submits the signed JWT to the endorser API
4. Handles the response and any potential errors
Args:
active_did (str): The DID to register (e.g., "did:ethr:0x...")
private_key_hex (str): Private key in hex format for signing the JWT
api_server (str, optional): Endorser API endpoint. Defaults to API_SERVER.
Returns:
Dict[str, Any]: Registration result with either:
{"success": True} for successful registration
{"error": str, "details": Dict} for failures
Raises:
requests.RequestException: For network or API communication errors
Example Success:
{
"success": True
}
Example Failure:
{
"error": "Registration failed",
"details": {
"error": {
"message": "JWT verification failed",
"code": "JWT_VERIFY_FAILED"
}
}
}
Technical Details:
- Uses ES256K-R JWT signing
- Follows W3C Verifiable Credentials data model
- Implements proper error handling and reporting
- Supports custom API endpoints for testing
"""
print("\nEndorser DID:", ENDORSER_DID)
print("Active DID:", active_did)
# Create registration claim matching TypeScript version
vcPayload = {
"vc": {
"@context": ["https://www.w3.org/2018/credentials/v1"],
"type": ["VerifiableCredential"],
"credentialSubject": {
"@context": "https://schema.org",
"@type": "RegisterAction",
"agent": {
"identifier": ENDORSER_DID
},
"participant": {
"identifier": active_did
},
"object": "endorser.ch",
"endTime": int(time.time()) + 7*24*60*60 # Next week epoch
}
}
}
# Debug output
print("\nRegistration Claim:")
print(json.dumps(vcPayload, indent=2))
print()
# Sign with endorser's DID and private key
jwt_token = await create_endorser_jwt(
did=ENDORSER_DID,
private_key_hex=ENDORSER_PRIVATE_KEY,
payload=vcPayload,
sub_did=active_did
)
# Debug output
print("Generated JWT:")
print(jwt_token)
print()
# Debug JWT parts
header, payload, signature = jwt_token.split('.')
print("\nJWT Header:", json.loads(base64.urlsafe_b64decode(header + '==').decode()))
print("JWT Payload:", json.loads(base64.urlsafe_b64decode(payload + '==').decode()))
print("JWT Signature length:", len(base64.urlsafe_b64decode(signature + '==')), "bytes")
print()
try:
response = requests.post(
f"{api_server}/api/v2/claim",
json={"jwtEncoded": jwt_token},
headers={"Content-Type": "application/json"}
)
response_data = response.json()
if response.status_code == 200 and response_data.get("success", {}).get("handleId"):
return {
"success": True,
"handleId": response_data["success"]["handleId"],
"registrationId": response_data["success"]["registrationId"],
"claimId": response_data["success"]["claimId"]
}
# Log full error response
print("\nAPI Error Response:")
print(json.dumps(response_data, indent=2))
print()
# Handle partial success
if response_data.get("success"):
return {
"success": True,
**response_data["success"]
}
return {"error": "Registration failed", "details": response_data}
except requests.RequestException as e:
print(f"\nAPI Request Error: {str(e)}")
return {"error": f"Error submitting claim: {str(e)}"}
async def fetch_claim(
claim_id: str,
active_did: str,
private_key_hex: str,
api_server: str = API_SERVER
) -> Dict[str, Any]:
"""
Fetch claim details from the endorser service.
Args:
claim_id (str): The ID of the claim to fetch
active_did (str): The DID making the request
private_key_hex (str): Private key for signing auth JWT
api_server (str, optional): API endpoint. Defaults to API_SERVER.
Returns:
Dict[str, Any]: The claim details
Raises:
requests.RequestException: For API communication errors
"""
# Create authentication JWT
auth_payload = {
"intent": "claim.read",
"claimId": claim_id
}
# Sign with the active DID (not endorser DID)
auth_token = await create_endorser_jwt(
active_did,
private_key_hex,
auth_payload
)
try:
response = requests.get(
f"{api_server}/api/claim/byHandle/{claim_id}",
headers={
'Authorization': f'Bearer {auth_token}',
'Content-Type': 'application/json'
}
)
return response.json()
except requests.RequestException as e:
print("\nError fetching claim:")
print(f"Error: {str(e)}")
raise
# Main execution
async def main():
"""
Main execution flow for DID creation and registration.
This function orchestrates the complete DID lifecycle:
1. Creates or loads a DID identity
2. Extracts the necessary credentials
3. Registers the DID with the endorser service
The process involves:
- Generating/loading a mnemonic seed phrase
- Deriving Ethereum keys and address
- Creating a DID identifier
- Signing registration claims
- Submitting to the endorser API
Side Effects:
- Creates mnemonic.txt if it doesn't exist
- Prints account initialization details
- Prints registration results
Error Handling:
- Handles API communication errors
- Reports registration failures
- Provides detailed error messages
Example Output:
```
Account initialized:
{
"did": "did:ethr:0x...",
"identity": {...},
"mnemonic": "word1 word2 ...",
"derivation_path": "m/84737769'/0'/0'/0'"
}
Registration result: {"success": true}
```
"""
# Step 1: Create a new DID
identity = initialize_account()
active_did = identity["did"]
private_key_hex = identity["keys"][0]["privateKeyHex"]
# Step 2: Register the DID
result = await register(active_did, private_key_hex)
print("Registration result:", result)
# Step 3: If registration successful, fetch claim details
if result.get("success") and result.get("claimId"):
print("\nFetching claim details...")
try:
claim_details = await fetch_claim(
result["claimId"],
active_did,
private_key_hex
)
print("\nClaim Details:")
print(json.dumps(claim_details, indent=2))
except Exception as e:
print(f"Error fetching claim: {str(e)}")
if __name__ == "__main__":
asyncio.run(main())