You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
571 lines
17 KiB
571 lines
17 KiB
"""
|
|
DID Creation and Registration Flow
|
|
@author Matthew Raymer
|
|
|
|
This module implements the creation and registration of Decentralized Identifiers (DIDs)
|
|
with the endorser.ch service. It handles the complete workflow from mnemonic generation
|
|
to DID registration.
|
|
|
|
Flow:
|
|
1. Generate or load mnemonic seed phrase
|
|
2. Derive Ethereum keys and address
|
|
3. Create DID identifier
|
|
4. Initialize account
|
|
5. Create signed JWT
|
|
6. Register DID with endorser service
|
|
|
|
Features:
|
|
- Secure mnemonic generation and storage
|
|
- HD wallet key derivation
|
|
- ES256K-R JWT signing
|
|
- DID registration with endorser.ch
|
|
- Persistent mnemonic storage
|
|
|
|
Dependencies:
|
|
- mnemonic: For seed phrase generation
|
|
- eth_account: For Ethereum account operations
|
|
- eth_keys: For key manipulation
|
|
- web3: For Ethereum utilities
|
|
- requests: For API communication
|
|
|
|
Usage:
|
|
python new_flow.py
|
|
|
|
Environment:
|
|
API_SERVER: Endorser API endpoint (defaults to test server)
|
|
"""
|
|
|
|
import json
|
|
import time
|
|
from mnemonic import Mnemonic
|
|
from eth_account import Account
|
|
from eth_account.hdaccount import generate_mnemonic
|
|
import jwt
|
|
from eth_account._utils.signing import sign_message_hash
|
|
from eth_utils.curried import keccak
|
|
import requests
|
|
from typing import Tuple, Dict, Any
|
|
import os
|
|
from web3 import Web3
|
|
from eth_keys import keys
|
|
import base64
|
|
import json
|
|
from jwcrypto import jwk
|
|
import asyncio
|
|
from cryptography.hazmat.primitives import serialization
|
|
from cryptography.hazmat.primitives.asymmetric import ec
|
|
from pathlib import Path
|
|
|
|
# Constants
|
|
DEFAULT_ROOT_DERIVATION_PATH = "m/84737769'/0'/0'/0'" # Custom derivation path for TimeSafari
|
|
API_SERVER = "https://test-api.endorser.ch" # Endorser API endpoint
|
|
ENDORSER_DID = "did:ethr:0x0000694B58C2cC69658993A90D3840C560f2F51F" # Keep original case
|
|
ENDORSER_PRIVATE_KEY = "2b6472c026ec2aa2c4235c994a63868fc9212d18b58f6cbfe861b52e71330f5b"
|
|
|
|
# Create .generated directory if it doesn't exist
|
|
GENERATED_DIR = Path('.generated')
|
|
|
|
def initialize_directories():
|
|
"""Create necessary directories for storing generated files."""
|
|
try:
|
|
GENERATED_DIR.mkdir(exist_ok=True)
|
|
except Exception as e:
|
|
print(f"Error creating .generated directory: {e}")
|
|
raise
|
|
|
|
def derive_address(
|
|
mnemonic: str,
|
|
derivation_path: str = DEFAULT_ROOT_DERIVATION_PATH) -> Tuple[str, str, str, str]:
|
|
"""
|
|
Derive Ethereum address and keys from a mnemonic phrase.
|
|
|
|
Args:
|
|
mnemonic (str): The mnemonic seed phrase
|
|
derivation_path (str): HD wallet derivation path
|
|
|
|
Returns:
|
|
Tuple[str, str, str, str]: (address, private_key_hex, public_key_hex, derivation_path)
|
|
"""
|
|
# Enable HD wallet features before using mnemonic
|
|
Account.enable_unaudited_hdwallet_features()
|
|
|
|
mnemonic = mnemonic.strip().lower()
|
|
seed = Mnemonic("english").to_seed(mnemonic)
|
|
|
|
# Derive the account using eth-account
|
|
account = Account.from_mnemonic(mnemonic, account_path=derivation_path)
|
|
address = account.address
|
|
private_key = account.key
|
|
|
|
# Format private key without '0x' prefix
|
|
private_hex = private_key.hex()[2:] if private_key.hex().startswith('0x') else private_key.hex()
|
|
|
|
# Derive public key from private key
|
|
pk = keys.PrivateKey(private_key)
|
|
public_key = pk.public_key
|
|
public_hex = public_key.to_hex()[2:] # Remove '0x' prefix
|
|
|
|
print(f" Address: {address}")
|
|
print(f" Private Key: {private_hex[:8]}...")
|
|
print(f" Public Key: {public_hex[:8]}...")
|
|
|
|
# Save derivation data
|
|
derivation_data = {
|
|
"mnemonic": mnemonic,
|
|
"derivation_path": derivation_path,
|
|
"address": address,
|
|
"private_key": private_hex,
|
|
"public_key": public_hex
|
|
}
|
|
try:
|
|
GENERATED_DIR.mkdir(exist_ok=True)
|
|
with open(GENERATED_DIR / 'key_derivation.json', 'w') as f:
|
|
json.dump(derivation_data, f, indent=2)
|
|
except Exception as e:
|
|
print(f"Error saving derivation data: {e}")
|
|
|
|
return address, private_hex, public_hex, derivation_path
|
|
|
|
def new_identifier(
|
|
address: str,
|
|
public_key_hex: str,
|
|
private_key_hex: str,
|
|
derivation_path: str) -> Dict[str, Any]:
|
|
"""
|
|
Create a new DID identifier with associated keys.
|
|
|
|
Args:
|
|
address (str): Ethereum address
|
|
public_key_hex (str): Public key in hex format
|
|
private_key_hex (str): Private key in hex format
|
|
derivation_path (str): HD wallet derivation path
|
|
|
|
Returns:
|
|
Dict[str, Any]: DID identifier object with keys and services
|
|
"""
|
|
did = f"did:ethr:{address}"
|
|
key_id = f"{did}#keys-1"
|
|
|
|
identifier = {
|
|
"did": did,
|
|
"keys": [{
|
|
"id": key_id,
|
|
"type": "Secp256k1VerificationKey2018",
|
|
"controller": did,
|
|
"ethereumAddress": address,
|
|
"publicKeyHex": public_key_hex,
|
|
"privateKeyHex": private_key_hex
|
|
}],
|
|
"services": []
|
|
}
|
|
return identifier
|
|
|
|
def initialize_account() -> Dict[str, Any]:
|
|
"""
|
|
Initialize a new DID account or load existing one.
|
|
|
|
This function either generates a new mnemonic or loads an existing one
|
|
from mnemonic.txt, then derives the necessary keys and creates a DID.
|
|
|
|
Returns:
|
|
Dict[str, Any]: The initialized DID identity object
|
|
|
|
Side effects:
|
|
- Creates/reads mnemonic.txt file
|
|
- Prints account initialization details
|
|
"""
|
|
Account.enable_unaudited_hdwallet_features()
|
|
|
|
# Generate or load mnemonic
|
|
if not os.path.exists('mnemonic.txt'):
|
|
mnemonic = generate_mnemonic(lang="english", num_words=24)
|
|
with open('mnemonic.txt', 'w', encoding='utf-8') as f:
|
|
f.write(mnemonic)
|
|
else:
|
|
with open('mnemonic.txt', 'r', encoding='utf-8') as f:
|
|
mnemonic = f.read().strip()
|
|
|
|
# Derive keys and create identity
|
|
address, private_hex, public_hex, derivation_path = derive_address(mnemonic)
|
|
|
|
# Create DID identifier
|
|
identity = new_identifier(address, public_hex, private_hex, derivation_path)
|
|
|
|
# Save account data
|
|
account_data = {
|
|
"did": identity["did"],
|
|
"identity": identity,
|
|
"mnemonic": mnemonic,
|
|
"derivation_path": derivation_path
|
|
}
|
|
with open(GENERATED_DIR / 'account_init.json', 'w') as f:
|
|
json.dump(account_data, f, indent=2)
|
|
|
|
print("\nAccount initialized:")
|
|
print(json.dumps(account_data, indent=2))
|
|
print()
|
|
return identity
|
|
|
|
async def create_endorser_jwt(
|
|
did: str,
|
|
private_key_hex: str,
|
|
payload: Dict[str, Any],
|
|
sub_did: str = None,
|
|
expires_in: int = 3600) -> str:
|
|
"""Create a signed JWT for DID registration."""
|
|
# Prepare JWT headers and payload
|
|
headers = {
|
|
"typ": "JWT",
|
|
"alg": "ES256K"
|
|
}
|
|
|
|
current_time = int(time.time())
|
|
jwt_payload = {
|
|
"iat": current_time,
|
|
"exp": current_time + expires_in,
|
|
"iss": did,
|
|
**payload
|
|
}
|
|
|
|
# Convert private key to PEM format
|
|
private_key_int = int(private_key_hex, 16)
|
|
private_key = ec.derive_private_key(
|
|
private_key_int,
|
|
ec.SECP256K1()
|
|
)
|
|
|
|
# Serialize to PEM
|
|
pem_private = private_key.private_bytes(
|
|
encoding=serialization.Encoding.PEM,
|
|
format=serialization.PrivateFormat.PKCS8,
|
|
encryption_algorithm=serialization.NoEncryption()
|
|
)
|
|
|
|
# Sign using PyJWT with custom signing
|
|
token = jwt.encode(
|
|
jwt_payload,
|
|
pem_private,
|
|
algorithm="ES256K",
|
|
headers=headers
|
|
)
|
|
|
|
return token
|
|
|
|
# Step 6: Create and submit a registration claim
|
|
async def register(
|
|
active_did: str,
|
|
private_key_hex: str,
|
|
api_server: str = API_SERVER) -> Dict[str, Any]:
|
|
"""
|
|
Register a DID with the endorser service by submitting a signed verifiable credential.
|
|
|
|
This function creates and submits a registration claim to the endorser service. The process:
|
|
1. Creates a Verifiable Credential (VC) for registration
|
|
2. Signs the VC using ES256K-R JWT format
|
|
3. Submits the signed JWT to the endorser API
|
|
4. Handles the response and any potential errors
|
|
|
|
Args:
|
|
active_did (str): The DID to register (e.g., "did:ethr:0x...")
|
|
private_key_hex (str): Private key in hex format for signing the JWT
|
|
api_server (str, optional): Endorser API endpoint. Defaults to API_SERVER.
|
|
|
|
Returns:
|
|
Dict[str, Any]: Registration result with either:
|
|
{"success": True} for successful registration
|
|
{"error": str, "details": Dict} for failures
|
|
|
|
Raises:
|
|
requests.RequestException: For network or API communication errors
|
|
|
|
Example Success:
|
|
{
|
|
"success": True
|
|
}
|
|
|
|
Example Failure:
|
|
{
|
|
"error": "Registration failed",
|
|
"details": {
|
|
"error": {
|
|
"message": "JWT verification failed",
|
|
"code": "JWT_VERIFY_FAILED"
|
|
}
|
|
}
|
|
}
|
|
|
|
Technical Details:
|
|
- Uses ES256K-R JWT signing
|
|
- Follows W3C Verifiable Credentials data model
|
|
- Implements proper error handling and reporting
|
|
- Supports custom API endpoints for testing
|
|
"""
|
|
print("\nEndorser DID:", ENDORSER_DID)
|
|
print("Active DID:", active_did)
|
|
|
|
# Create registration claim matching TypeScript version
|
|
vcPayload = {
|
|
"vc": {
|
|
"@context": ["https://www.w3.org/2018/credentials/v1"],
|
|
"type": ["VerifiableCredential"],
|
|
"credentialSubject": {
|
|
"@context": "https://schema.org",
|
|
"@type": "RegisterAction",
|
|
"agent": {
|
|
"identifier": ENDORSER_DID
|
|
},
|
|
"participant": {
|
|
"identifier": active_did
|
|
},
|
|
"object": "endorser.ch",
|
|
"endTime": int(time.time()) + 7*24*60*60 # Next week epoch
|
|
}
|
|
}
|
|
}
|
|
|
|
# Debug output
|
|
print("\nRegistration Claim:")
|
|
print(json.dumps(vcPayload, indent=2))
|
|
print()
|
|
|
|
# Sign with endorser's DID and private key
|
|
jwt_token = await create_endorser_jwt(
|
|
did=ENDORSER_DID,
|
|
private_key_hex=ENDORSER_PRIVATE_KEY,
|
|
payload=vcPayload,
|
|
sub_did=active_did
|
|
)
|
|
|
|
# Debug output
|
|
print("Generated JWT:")
|
|
print(jwt_token)
|
|
print()
|
|
|
|
# Debug JWT parts
|
|
header, payload, signature = jwt_token.split('.')
|
|
print("\nJWT Header:", json.loads(base64.urlsafe_b64decode(header + '==').decode()))
|
|
print("JWT Payload:", json.loads(base64.urlsafe_b64decode(payload + '==').decode()))
|
|
print("JWT Signature length:", len(base64.urlsafe_b64decode(signature + '==')), "bytes")
|
|
print()
|
|
|
|
try:
|
|
response = requests.post(
|
|
f"{api_server}/api/v2/claim",
|
|
json={"jwtEncoded": jwt_token},
|
|
headers={"Content-Type": "application/json"}
|
|
)
|
|
response_data = response.json()
|
|
|
|
if response.status_code == 200 and response_data.get("success", {}).get("handleId"):
|
|
# Save registration data
|
|
registration_data = {
|
|
"active_did": active_did,
|
|
"jwt_token": jwt_token,
|
|
"response": response_data
|
|
}
|
|
with open(GENERATED_DIR / 'registration.json', 'w') as f:
|
|
json.dump(registration_data, f, indent=2)
|
|
|
|
return {
|
|
"success": True,
|
|
"handleId": response_data["success"]["handleId"],
|
|
"registrationId": response_data["success"]["registrationId"],
|
|
"claimId": response_data["success"]["claimId"]
|
|
}
|
|
|
|
# Log full error response
|
|
print("\nAPI Error Response:")
|
|
print(json.dumps(response_data, indent=2))
|
|
print()
|
|
|
|
# Handle partial success
|
|
if response_data.get("success"):
|
|
return {
|
|
"success": True,
|
|
**response_data["success"]
|
|
}
|
|
|
|
return {"error": "Registration failed", "details": response_data}
|
|
|
|
except requests.RequestException as e:
|
|
print(f"\nAPI Request Error: {str(e)}")
|
|
return {"error": f"Error submitting claim: {str(e)}"}
|
|
|
|
async def fetch_claim(
|
|
claim_id: str,
|
|
active_did: str,
|
|
private_key_hex: str,
|
|
api_server: str = API_SERVER
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Fetch claim details from the endorser service.
|
|
|
|
Args:
|
|
claim_id (str): The ID of the claim to fetch
|
|
active_did (str): The DID making the request
|
|
private_key_hex (str): Private key for signing auth JWT
|
|
api_server (str, optional): API endpoint. Defaults to API_SERVER.
|
|
|
|
Returns:
|
|
Dict[str, Any]: The claim details
|
|
|
|
Raises:
|
|
requests.RequestException: For API communication errors
|
|
"""
|
|
# Create authentication JWT
|
|
auth_payload = {
|
|
"intent": "claim.read",
|
|
"claimId": claim_id
|
|
}
|
|
|
|
# Sign with the active DID (not endorser DID)
|
|
auth_token = await create_endorser_jwt(
|
|
active_did,
|
|
private_key_hex,
|
|
auth_payload
|
|
)
|
|
|
|
try:
|
|
response = requests.get(
|
|
f"{api_server}/api/claim/byHandle/{claim_id}",
|
|
headers={
|
|
'Authorization': f'Bearer {auth_token}',
|
|
'Content-Type': 'application/json'
|
|
},
|
|
timeout=30 # 30 second timeout
|
|
)
|
|
claim_data = response.json()
|
|
|
|
# Save claim data
|
|
try:
|
|
with open(GENERATED_DIR / 'claim_details.json', 'w', encoding='utf-8') as f:
|
|
json.dump({
|
|
'claim_id': claim_id,
|
|
'active_did': active_did,
|
|
'response': claim_data
|
|
}, f, indent=2)
|
|
except Exception as e:
|
|
print(f"Error saving claim data: {e}")
|
|
|
|
return claim_data
|
|
|
|
except requests.RequestException as e:
|
|
print("\nError fetching claim:")
|
|
print(f"Error: {str(e)}")
|
|
# Save error state
|
|
try:
|
|
with open(
|
|
GENERATED_DIR / 'claim_details.json',
|
|
'w', encoding='utf-8') as f:
|
|
json.dump({
|
|
'claim_id': claim_id,
|
|
'active_did': active_did,
|
|
'error': str(e)
|
|
}, f, indent=2)
|
|
except Exception as write_err:
|
|
print(f"Error saving claim error: {write_err}")
|
|
raise
|
|
|
|
async def generate_test_env():
|
|
"""Generate test environment data for deeplink testing"""
|
|
test_env_data = {
|
|
'CONTACT1_DID': active_did,
|
|
'CONTACT1_KEY': private_key_hex,
|
|
'CONTACT2_DID': recipient_did,
|
|
'CONTACT2_KEY': recipient_key,
|
|
'ISSUER_DID': issuer_did,
|
|
'ISSUER_KEY': issuer_key,
|
|
'TEST_JWT': jwt_token,
|
|
'API_SERVER': API_SERVER
|
|
}
|
|
|
|
# Write test environment variables
|
|
with open(GENERATED_DIR / 'test-env.sh', 'w', encoding='utf-8') as f:
|
|
for key, value in test_env_data.items():
|
|
f.write(f'export {key}="{value}"\n')
|
|
|
|
# Write test contacts data
|
|
contacts_data = {
|
|
'contacts': [
|
|
{'did': active_did, 'name': 'Test Contact 1'},
|
|
{'did': recipient_did, 'name': 'Test Contact 2'}
|
|
]
|
|
}
|
|
with open(GENERATED_DIR / 'contacts.json', 'w', encoding='utf-8') as f:
|
|
json.dump(contacts_data, f, indent=2)
|
|
|
|
# Main execution
|
|
async def main():
|
|
"""
|
|
Main execution flow for DID creation and registration.
|
|
|
|
This function orchestrates the complete DID lifecycle:
|
|
1. Creates or loads a DID identity
|
|
2. Extracts the necessary credentials
|
|
3. Registers the DID with the endorser service
|
|
|
|
The process involves:
|
|
- Generating/loading a mnemonic seed phrase
|
|
- Deriving Ethereum keys and address
|
|
- Creating a DID identifier
|
|
- Signing registration claims
|
|
- Submitting to the endorser API
|
|
|
|
Side Effects:
|
|
- Creates mnemonic.txt if it doesn't exist
|
|
- Prints account initialization details
|
|
- Prints registration results
|
|
|
|
Error Handling:
|
|
- Handles API communication errors
|
|
- Reports registration failures
|
|
- Provides detailed error messages
|
|
|
|
Example Output:
|
|
```
|
|
Account initialized:
|
|
{
|
|
"did": "did:ethr:0x...",
|
|
"identity": {...},
|
|
"mnemonic": "word1 word2 ...",
|
|
"derivation_path": "m/84737769'/0'/0'/0'"
|
|
}
|
|
|
|
Registration result: {"success": true}
|
|
```
|
|
"""
|
|
# Initialize directories first
|
|
initialize_directories()
|
|
|
|
try:
|
|
# Step 1: Create a new DID
|
|
identity = initialize_account()
|
|
active_did = identity["did"]
|
|
private_key_hex = identity["keys"][0]["privateKeyHex"]
|
|
|
|
# Step 2: Register the DID
|
|
result = await register(active_did, private_key_hex)
|
|
print("Registration result:", result)
|
|
|
|
# Step 3: If registration successful, fetch claim details
|
|
if result.get("success") and result.get("claimId"):
|
|
print("\nFetching claim details...")
|
|
try:
|
|
claim_details = await fetch_claim(
|
|
result["claimId"],
|
|
active_did,
|
|
private_key_hex
|
|
)
|
|
print("\nClaim Details:")
|
|
print(json.dumps(claim_details, indent=2))
|
|
except Exception as e:
|
|
print(f"Error fetching claim: {str(e)}")
|
|
|
|
# Step 4: Generate test environment data
|
|
await generate_test_env()
|
|
|
|
except Exception as e:
|
|
print(f"Error: {str(e)}")
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(main())
|
|
|