Files
crowd-funder-from-jason/test-scripts/did_generator.py
Matthew Raymer a4279fab34 feat: Add environment variable support for DID registration
- Bash implementation of DID creation-registration
- Move admin credentials to .env file for better security
- Add .env.example with default values
- Add dotenv support to TypeScript, Python and Bash implementations
- Update dependencies to include dotenv packages
- Fix JWT signature format in Bash implementation
- Add DER signature parsing for ES256K in Bash script

The admin DID and private key can now be configured via environment
variables, with fallback to default values if not set. This allows
for easier testing and deployment across different environments.
2025-03-04 06:27:20 +00:00

365 lines
12 KiB
Python

"""
DID Generator Script
@author Matthew Raymer
This script generates and registers Decentralized Identifiers (DIDs) with admin authorization.
It supports the creation of Ethereum-based DIDs (did:ethr) and handles the complete
registration flow with the endorser.ch API.
Features:
- Ethereum keypair generation with compressed public keys
- JWT creation and signing using ES256K
- DID registration with admin authorization
- Detailed error handling and logging
- Command-line interface for admin DID input
Dependencies:
eth_account: For Ethereum account operations
eth_keys: For key manipulation and compression
requests: For API communication
secrets: For secure random number generation
hashlib: For SHA-256 hashing
base64: For JWT encoding
argparse: For command-line argument parsing
pathlib: For path handling
dotenv: For environment variable loading
os: For environment variable access
Usage:
python did_generator.py [options]
Options:
--admin-did <did> Override default admin DID
--api-url <url> Override default API endpoint
Environment:
Default Admin DID: did:ethr:0x0000694B58C2cC69658993A90D3840C560f2F51F
Default API URL: https://test-api.endorser.ch/api/v2/claim
"""
from eth_account import Account
import json
import base64
from eth_account.messages import encode_defunct
from eth_keys import keys
import time
import requests
import argparse
import secrets
import hashlib
from pathlib import Path
from dotenv import load_dotenv
import os
# Load environment variables
load_dotenv()
class DIDRegistration:
"""
Handles the creation and registration of DIDs with admin authorization.
This class manages the complete lifecycle of DID creation and registration:
1. Generating secure Ethereum keypairs with compressed public keys
2. Creating DIDs from public keys in did:ethr format
3. Signing registration claims with admin credentials
4. Submitting registration to the endorser.ch API
The registration process uses two keypairs:
1. Admin keypair: Used to sign and authorize the registration
2. New DID keypair: The identity being registered
Attributes:
api_url (str): Endpoint for DID registration
admin_keypair (dict): Admin's credentials containing:
- did: Admin's DID in did:ethr format
- private_key: Admin's private key for signing
"""
def __init__(self, admin_keypair: dict, api_url: str = None):
"""
Initialize DID registration with admin credentials.
Args:
admin_keypair (dict): Admin's DID and private key for signing
api_url (str, optional): Override default API URL
"""
self.api_url = api_url or "https://test-api.endorser.ch/api/v2/claim"
self.admin_keypair = admin_keypair # Store full admin keypair
Account.enable_unaudited_hdwallet_features()
def create_keypair(self) -> dict:
"""
Generate a new Ethereum keypair and associated DID.
Creates a secure random keypair and formats it for use with the
endorser.ch API. Uses compressed public key format to match
ethers.js implementation.
Returns:
dict: Keypair information containing:
- private_key: Raw private key without 0x prefix
- public_key: Compressed public key with 0x prefix
- address: Ethereum address
- did: Generated DID in did:ethr format
Security:
- Uses secrets module for cryptographically secure randomness
- Implements compressed public key format
- Maintains private key security
"""
private_key = secrets.token_hex(32)
# Create private key object and derive public key
private_key_bytes = bytes.fromhex(private_key)
private_key_obj = keys.PrivateKey(private_key_bytes)
# Get compressed public key (like ethers.js)
public_key_obj = private_key_obj.public_key
public_key = '0x' + public_key_obj.to_compressed_bytes().hex()
# Create account from private key (for address)
account = Account.from_key(private_key_bytes)
return {
'private_key': private_key, # No 0x prefix
'public_key': public_key, # With 0x prefix, compressed format
'address': account.address,
'did': f"did:ethr:{account.address}"
}
def sign_jwt(self, payload: dict, private_key: str, did: str) -> str:
"""
Sign a JWT using ES256K algorithm.
Creates and signs a JWT following the did-jwt specification:
1. Constructs header and payload
2. Base64url encodes components
3. Signs using ES256K
4. Assembles final JWT
Args:
payload (dict): JWT payload to sign
private_key (str): Private key for signing (without 0x prefix)
did (str): DID to use as issuer
Returns:
str: Signed JWT string in format: header.payload.signature
Security:
- Implements ES256K signing
- Follows did-jwt specification
- Handles message hashing correctly
"""
# Add issuer to payload like did-jwt does
full_payload = {
**payload,
"iss": did
}
header = {
"typ": "JWT",
"alg": "ES256K"
}
# Create the JWT segments
header_b64 = base64.urlsafe_b64encode(
json.dumps(header, separators=(',', ':')).encode()
).decode().rstrip('=')
payload_b64 = base64.urlsafe_b64encode(
json.dumps(full_payload, separators=(',', ':')).encode()
).decode().rstrip('=')
message = f"{header_b64}.{payload_b64}"
# Hash the message with sha256
message_hash = hashlib.sha256(message.encode()).digest()
# Sign using eth_keys directly
private_key_bytes = bytes.fromhex(private_key)
private_key_obj = keys.PrivateKey(private_key_bytes)
signature = private_key_obj.sign_msg_hash(message_hash)
# Get r and s from signature
r = signature.r.to_bytes(32, 'big')
s = signature.s.to_bytes(32, 'big')
signature_bytes = r + s
# Format signature
signature_b64 = base64.urlsafe_b64encode(signature_bytes).decode().rstrip('=')
return f"{message}.{signature_b64}"
def create_jwt(self, new_did: str) -> str:
"""
Create a signed JWT for DID registration
Args:
new_did (str): The DID being registered
"""
now = int(time.time())
# Create registration claim with admin as agent
register_claim = {
"@context": "https://schema.org",
"@type": "RegisterAction",
"agent": { "did": self.admin_keypair['did'] },
"participant": { "did": new_did },
"object": "endorser.ch"
}
payload = {
"iat": now,
"exp": now + 300,
"sub": "RegisterAction",
"vc": {
"@context": ["https://www.w3.org/2018/credentials/v1"],
"type": ["VerifiableCredential"],
"credentialSubject": register_claim
}
}
print(f"\nDebug - JWT payload: {json.dumps(payload, indent=2)}")
# Sign with admin's private key
return self.sign_jwt(
payload,
self.admin_keypair['private_key'],
self.admin_keypair['did']
)
def register_did(self, jwt_token: str) -> dict:
"""
Submit DID registration to the endorser.ch API.
Handles the complete registration process:
1. Submits JWT to API
2. Processes response
3. Formats result or error message
Args:
jwt_token (str): Signed JWT for registration
Returns:
dict: Registration result containing:
- success: Boolean indicating success
- response: API response data
- error: Error message if failed
Security:
- Uses HTTPS for API communication
- Validates response status
- Handles errors gracefully
"""
try:
response = requests.post(
self.api_url,
json={"jwtEncoded": jwt_token},
headers={'Content-Type': 'application/json'}
)
print(f"\nServer Response Status: {response.status_code}")
print(f"Server Response Body: {response.text}")
if response.status_code in [200, 201]:
return {
'success': True,
'response': response.json()
}
else:
try:
error_json = response.json()
error_msg = error_json.get('error', {}).get(
'message', 'Unknown error'
)
return {
'success': False,
'error': f"Registration failed ({response.status_code}): "
f"{error_msg}",
'response': error_json
}
except json.JSONDecodeError:
return {
'success': False,
'error': f"Registration failed ({response.status_code}): "
f"{response.text}",
'response': response.text
}
except (requests.RequestException, ConnectionError) as e:
return {
'success': False,
'error': f"Request failed: {str(e)}"
}
def main():
"""
Main entry point for DID generation script.
Handles:
1. Command line argument parsing
2. DID generation and registration process
3. Result output and error display
Usage:
python did_generator.py [options]
"""
parser = argparse.ArgumentParser(
description='Generate a DID with admin authorization'
)
parser.add_argument(
'--admin-did',
help='Admin DID (e.g., did:ethr:0x0000...)',
required=False
)
parser.add_argument(
'--api-url',
help='Override API URL',
default=os.getenv('ENDORSER_API_URL', 'https://test-api.endorser.ch/api/v2/claim')
)
args = parser.parse_args()
# Get admin credentials from environment
admin_keypair = {
'did': os.getenv('ADMIN_DID', 'did:ethr:0x0000694B58C2cC69658993A90D3840C560f2F51F'),
'private_key': os.getenv('ADMIN_PRIVATE_KEY', '2b6472c026ec2aa2c4235c994a63868fc9212d18b58f6cbfe861b52e71330f5b')
}
admin_did = args.admin_did
if not admin_did:
admin_did = admin_keypair['did']
print(f"Admin DID: {admin_did}")
print(f"API URL: {args.api_url}")
print('Starting DID Generation...\n')
registrar = DIDRegistration(admin_keypair, args.api_url)
print("Generating new keypair...")
new_keypair = registrar.create_keypair()
print("\nGenerated DID Details:")
print("----------------------")
print(f"DID: {new_keypair['did']}")
print(f"Admin DID: {admin_did}")
print(f"Address: {new_keypair['address']}")
print(f"Private Key: {new_keypair['private_key']}")
print(f"Public Key: {new_keypair['public_key']}\n")
print("Creating JWT...")
jwt_token = registrar.create_jwt(new_keypair['did'])
print('\nSuccessfully generated DID with admin authorization!')
print(f'Registration JWT: {jwt_token[:50]}...')
print("\nAttempting registration...")
result = registrar.register_did(jwt_token)
if result['success']:
print("Registration successful!")
print("Response: {json.dumps(result['response'], indent=2)}")
else:
print("Registration failed!")
print(f"Error: {result['error']}")
if 'response' in result:
print(f"Full response: {json.dumps(result['response'], indent=2)}")
if __name__ == "__main__":
main()