Files
crowd-funder-from-jason/test-scripts/new_flow.py
Matthew Raymer ad9b4836cd feat: add JSON output files and improve test flow
- Add .generated directory for test artifacts
- Save key derivation data to key_derivation.json
- Save account initialization to account_init.json
- Save registration data to registration.json
- Save claim details to claim_details.json
- Save test environment to test-env.sh
- Save contacts data to contacts.json
- Add proper error handling for file operations
- Improve deeplink test flow with JSON-based data
- Add color output and better status messages
- Add ADB device detection and fallback to print mode

Technical Changes:
- Add file system operations with proper error handling
- Standardize JSON output format across Python/TypeScript
- Update test flow to use generated JSON files
- Add proper typing for registration response
- Improve error reporting and debug output

This improves the test workflow by saving all intermediate
data as JSON files that can be used by other test scripts.
The deeplink testing now uses this data instead of environment
variables for better reliability.
2025-03-08 13:01:15 +00:00

572 lines
17 KiB
Python

"""
DID Creation and Registration Flow
@author Matthew Raymer
This module implements the creation and registration of Decentralized Identifiers (DIDs)
with the endorser.ch service. It handles the complete workflow from mnemonic generation
to DID registration.
Flow:
1. Generate or load mnemonic seed phrase
2. Derive Ethereum keys and address
3. Create DID identifier
4. Initialize account
5. Create signed JWT
6. Register DID with endorser service
Features:
- Secure mnemonic generation and storage
- HD wallet key derivation
- ES256K-R JWT signing
- DID registration with endorser.ch
- Persistent mnemonic storage
Dependencies:
- mnemonic: For seed phrase generation
- eth_account: For Ethereum account operations
- eth_keys: For key manipulation
- web3: For Ethereum utilities
- requests: For API communication
Usage:
python new_flow.py
Environment:
API_SERVER: Endorser API endpoint (defaults to test server)
"""
import json
import time
from mnemonic import Mnemonic
from eth_account import Account
from eth_account.hdaccount import generate_mnemonic
import jwt
from eth_account._utils.signing import sign_message_hash
from eth_utils.curried import keccak
import requests
from typing import Tuple, Dict, Any
import os
from web3 import Web3
from eth_keys import keys
import base64
import json
from jwcrypto import jwk
import asyncio
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import ec
from pathlib import Path
# Constants
DEFAULT_ROOT_DERIVATION_PATH = "m/84737769'/0'/0'/0'" # Custom derivation path for TimeSafari
API_SERVER = "https://test-api.endorser.ch" # Endorser API endpoint
ENDORSER_DID = "did:ethr:0x0000694B58C2cC69658993A90D3840C560f2F51F" # Keep original case
ENDORSER_PRIVATE_KEY = "2b6472c026ec2aa2c4235c994a63868fc9212d18b58f6cbfe861b52e71330f5b"
# Create .generated directory if it doesn't exist
GENERATED_DIR = Path('.generated')
def initialize_directories():
"""Create necessary directories for storing generated files."""
try:
GENERATED_DIR.mkdir(exist_ok=True)
except Exception as e:
print(f"Error creating .generated directory: {e}")
raise
def derive_address(
mnemonic: str,
derivation_path: str = DEFAULT_ROOT_DERIVATION_PATH) -> Tuple[str, str, str, str]:
"""
Derive Ethereum address and keys from a mnemonic phrase.
Args:
mnemonic (str): The mnemonic seed phrase
derivation_path (str): HD wallet derivation path
Returns:
Tuple[str, str, str, str]: (address, private_key_hex, public_key_hex, derivation_path)
"""
# Enable HD wallet features before using mnemonic
Account.enable_unaudited_hdwallet_features()
mnemonic = mnemonic.strip().lower()
seed = Mnemonic("english").to_seed(mnemonic)
# Derive the account using eth-account
account = Account.from_mnemonic(mnemonic, account_path=derivation_path)
address = account.address
private_key = account.key
# Format private key without '0x' prefix
private_hex = private_key.hex()[2:] if private_key.hex().startswith('0x') else private_key.hex()
# Derive public key from private key
pk = keys.PrivateKey(private_key)
public_key = pk.public_key
public_hex = public_key.to_hex()[2:] # Remove '0x' prefix
print(f" Address: {address}")
print(f" Private Key: {private_hex[:8]}...")
print(f" Public Key: {public_hex[:8]}...")
# Save derivation data
derivation_data = {
"mnemonic": mnemonic,
"derivation_path": derivation_path,
"address": address,
"private_key": private_hex,
"public_key": public_hex
}
try:
GENERATED_DIR.mkdir(exist_ok=True)
with open(GENERATED_DIR / 'key_derivation.json', 'w') as f:
json.dump(derivation_data, f, indent=2)
except Exception as e:
print(f"Error saving derivation data: {e}")
return address, private_hex, public_hex, derivation_path
def new_identifier(
address: str,
public_key_hex: str,
private_key_hex: str,
derivation_path: str) -> Dict[str, Any]:
"""
Create a new DID identifier with associated keys.
Args:
address (str): Ethereum address
public_key_hex (str): Public key in hex format
private_key_hex (str): Private key in hex format
derivation_path (str): HD wallet derivation path
Returns:
Dict[str, Any]: DID identifier object with keys and services
"""
did = f"did:ethr:{address}"
key_id = f"{did}#keys-1"
identifier = {
"did": did,
"keys": [{
"id": key_id,
"type": "Secp256k1VerificationKey2018",
"controller": did,
"ethereumAddress": address,
"publicKeyHex": public_key_hex,
"privateKeyHex": private_key_hex
}],
"services": []
}
return identifier
def initialize_account() -> Dict[str, Any]:
"""
Initialize a new DID account or load existing one.
This function either generates a new mnemonic or loads an existing one
from mnemonic.txt, then derives the necessary keys and creates a DID.
Returns:
Dict[str, Any]: The initialized DID identity object
Side effects:
- Creates/reads mnemonic.txt file
- Prints account initialization details
"""
Account.enable_unaudited_hdwallet_features()
# Generate or load mnemonic
if not os.path.exists('mnemonic.txt'):
mnemonic = generate_mnemonic(lang="english", num_words=24)
with open('mnemonic.txt', 'w', encoding='utf-8') as f:
f.write(mnemonic)
else:
with open('mnemonic.txt', 'r', encoding='utf-8') as f:
mnemonic = f.read().strip()
# Derive keys and create identity
address, private_hex, public_hex, derivation_path = derive_address(mnemonic)
# Create DID identifier
identity = new_identifier(address, public_hex, private_hex, derivation_path)
# Save account data
account_data = {
"did": identity["did"],
"identity": identity,
"mnemonic": mnemonic,
"derivation_path": derivation_path
}
with open(GENERATED_DIR / 'account_init.json', 'w') as f:
json.dump(account_data, f, indent=2)
print("\nAccount initialized:")
print(json.dumps(account_data, indent=2))
print()
return identity
async def create_endorser_jwt(
did: str,
private_key_hex: str,
payload: Dict[str, Any],
sub_did: str = None,
expires_in: int = 3600) -> str:
"""Create a signed JWT for DID registration."""
# Prepare JWT headers and payload
headers = {
"typ": "JWT",
"alg": "ES256K"
}
current_time = int(time.time())
jwt_payload = {
"iat": current_time,
"exp": current_time + expires_in,
"iss": did,
**payload
}
# Convert private key to PEM format
private_key_int = int(private_key_hex, 16)
private_key = ec.derive_private_key(
private_key_int,
ec.SECP256K1()
)
# Serialize to PEM
pem_private = private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption()
)
# Sign using PyJWT with custom signing
token = jwt.encode(
jwt_payload,
pem_private,
algorithm="ES256K",
headers=headers
)
return token
# Step 6: Create and submit a registration claim
async def register(
active_did: str,
private_key_hex: str,
api_server: str = API_SERVER) -> Dict[str, Any]:
"""
Register a DID with the endorser service by submitting a signed verifiable credential.
This function creates and submits a registration claim to the endorser service. The process:
1. Creates a Verifiable Credential (VC) for registration
2. Signs the VC using ES256K-R JWT format
3. Submits the signed JWT to the endorser API
4. Handles the response and any potential errors
Args:
active_did (str): The DID to register (e.g., "did:ethr:0x...")
private_key_hex (str): Private key in hex format for signing the JWT
api_server (str, optional): Endorser API endpoint. Defaults to API_SERVER.
Returns:
Dict[str, Any]: Registration result with either:
{"success": True} for successful registration
{"error": str, "details": Dict} for failures
Raises:
requests.RequestException: For network or API communication errors
Example Success:
{
"success": True
}
Example Failure:
{
"error": "Registration failed",
"details": {
"error": {
"message": "JWT verification failed",
"code": "JWT_VERIFY_FAILED"
}
}
}
Technical Details:
- Uses ES256K-R JWT signing
- Follows W3C Verifiable Credentials data model
- Implements proper error handling and reporting
- Supports custom API endpoints for testing
"""
print("\nEndorser DID:", ENDORSER_DID)
print("Active DID:", active_did)
# Create registration claim matching TypeScript version
vcPayload = {
"vc": {
"@context": ["https://www.w3.org/2018/credentials/v1"],
"type": ["VerifiableCredential"],
"credentialSubject": {
"@context": "https://schema.org",
"@type": "RegisterAction",
"agent": {
"identifier": ENDORSER_DID
},
"participant": {
"identifier": active_did
},
"object": "endorser.ch",
"endTime": int(time.time()) + 7*24*60*60 # Next week epoch
}
}
}
# Debug output
print("\nRegistration Claim:")
print(json.dumps(vcPayload, indent=2))
print()
# Sign with endorser's DID and private key
jwt_token = await create_endorser_jwt(
did=ENDORSER_DID,
private_key_hex=ENDORSER_PRIVATE_KEY,
payload=vcPayload,
sub_did=active_did
)
# Debug output
print("Generated JWT:")
print(jwt_token)
print()
# Debug JWT parts
header, payload, signature = jwt_token.split('.')
print("\nJWT Header:", json.loads(base64.urlsafe_b64decode(header + '==').decode()))
print("JWT Payload:", json.loads(base64.urlsafe_b64decode(payload + '==').decode()))
print("JWT Signature length:", len(base64.urlsafe_b64decode(signature + '==')), "bytes")
print()
try:
response = requests.post(
f"{api_server}/api/v2/claim",
json={"jwtEncoded": jwt_token},
headers={"Content-Type": "application/json"}
)
response_data = response.json()
if response.status_code == 200 and response_data.get("success", {}).get("handleId"):
# Save registration data
registration_data = {
"active_did": active_did,
"jwt_token": jwt_token,
"response": response_data
}
with open(GENERATED_DIR / 'registration.json', 'w') as f:
json.dump(registration_data, f, indent=2)
return {
"success": True,
"handleId": response_data["success"]["handleId"],
"registrationId": response_data["success"]["registrationId"],
"claimId": response_data["success"]["claimId"]
}
# Log full error response
print("\nAPI Error Response:")
print(json.dumps(response_data, indent=2))
print()
# Handle partial success
if response_data.get("success"):
return {
"success": True,
**response_data["success"]
}
return {"error": "Registration failed", "details": response_data}
except requests.RequestException as e:
print(f"\nAPI Request Error: {str(e)}")
return {"error": f"Error submitting claim: {str(e)}"}
async def fetch_claim(
claim_id: str,
active_did: str,
private_key_hex: str,
api_server: str = API_SERVER
) -> Dict[str, Any]:
"""
Fetch claim details from the endorser service.
Args:
claim_id (str): The ID of the claim to fetch
active_did (str): The DID making the request
private_key_hex (str): Private key for signing auth JWT
api_server (str, optional): API endpoint. Defaults to API_SERVER.
Returns:
Dict[str, Any]: The claim details
Raises:
requests.RequestException: For API communication errors
"""
# Create authentication JWT
auth_payload = {
"intent": "claim.read",
"claimId": claim_id
}
# Sign with the active DID (not endorser DID)
auth_token = await create_endorser_jwt(
active_did,
private_key_hex,
auth_payload
)
try:
response = requests.get(
f"{api_server}/api/claim/byHandle/{claim_id}",
headers={
'Authorization': f'Bearer {auth_token}',
'Content-Type': 'application/json'
},
timeout=30 # 30 second timeout
)
claim_data = response.json()
# Save claim data
try:
with open(GENERATED_DIR / 'claim_details.json', 'w', encoding='utf-8') as f:
json.dump({
'claim_id': claim_id,
'active_did': active_did,
'response': claim_data
}, f, indent=2)
except Exception as e:
print(f"Error saving claim data: {e}")
return claim_data
except requests.RequestException as e:
print("\nError fetching claim:")
print(f"Error: {str(e)}")
# Save error state
try:
with open(
GENERATED_DIR / 'claim_details.json',
'w', encoding='utf-8') as f:
json.dump({
'claim_id': claim_id,
'active_did': active_did,
'error': str(e)
}, f, indent=2)
except Exception as write_err:
print(f"Error saving claim error: {write_err}")
raise
async def generate_test_env():
"""Generate test environment data for deeplink testing"""
test_env_data = {
'CONTACT1_DID': active_did,
'CONTACT1_KEY': private_key_hex,
'CONTACT2_DID': recipient_did,
'CONTACT2_KEY': recipient_key,
'ISSUER_DID': issuer_did,
'ISSUER_KEY': issuer_key,
'TEST_JWT': jwt_token,
'API_SERVER': API_SERVER
}
# Write test environment variables
with open(GENERATED_DIR / 'test-env.sh', 'w', encoding='utf-8') as f:
for key, value in test_env_data.items():
f.write(f'export {key}="{value}"\n')
# Write test contacts data
contacts_data = {
'contacts': [
{'did': active_did, 'name': 'Test Contact 1'},
{'did': recipient_did, 'name': 'Test Contact 2'}
]
}
with open(GENERATED_DIR / 'contacts.json', 'w', encoding='utf-8') as f:
json.dump(contacts_data, f, indent=2)
# Main execution
async def main():
"""
Main execution flow for DID creation and registration.
This function orchestrates the complete DID lifecycle:
1. Creates or loads a DID identity
2. Extracts the necessary credentials
3. Registers the DID with the endorser service
The process involves:
- Generating/loading a mnemonic seed phrase
- Deriving Ethereum keys and address
- Creating a DID identifier
- Signing registration claims
- Submitting to the endorser API
Side Effects:
- Creates mnemonic.txt if it doesn't exist
- Prints account initialization details
- Prints registration results
Error Handling:
- Handles API communication errors
- Reports registration failures
- Provides detailed error messages
Example Output:
```
Account initialized:
{
"did": "did:ethr:0x...",
"identity": {...},
"mnemonic": "word1 word2 ...",
"derivation_path": "m/84737769'/0'/0'/0'"
}
Registration result: {"success": true}
```
"""
# Initialize directories first
initialize_directories()
try:
# Step 1: Create a new DID
identity = initialize_account()
active_did = identity["did"]
private_key_hex = identity["keys"][0]["privateKeyHex"]
# Step 2: Register the DID
result = await register(active_did, private_key_hex)
print("Registration result:", result)
# Step 3: If registration successful, fetch claim details
if result.get("success") and result.get("claimId"):
print("\nFetching claim details...")
try:
claim_details = await fetch_claim(
result["claimId"],
active_did,
private_key_hex
)
print("\nClaim Details:")
print(json.dumps(claim_details, indent=2))
except Exception as e:
print(f"Error fetching claim: {str(e)}")
# Step 4: Generate test environment data
await generate_test_env()
except Exception as e:
print(f"Error: {str(e)}")
if __name__ == "__main__":
asyncio.run(main())