You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

378 lines
12 KiB

"""
DID Generator Script
@author Matthew Raymer
This script generates and registers Decentralized Identifiers (DIDs) with admin authorization.
It supports the creation of Ethereum-based DIDs (did:ethr) and handles the complete
registration flow with the endorser.ch API.
Features:
- Ethereum keypair generation with compressed public keys
- JWT creation and signing using ES256K
- DID registration with admin authorization
- Detailed error handling and logging
- Command-line interface for admin DID input
Dependencies:
eth_account: For Ethereum account operations
eth_keys: For key manipulation and compression
requests: For API communication
secrets: For secure random number generation
hashlib: For SHA-256 hashing
base64: For JWT encoding
argparse: For command-line argument parsing
sqlite3: For database operations
pathlib: For path handling
Usage:
python did_generator.py <admin_did>
Example:
python did_generator.py did:ethr:0x1234...5678
"""
from eth_account import Account
import json
import base64
from eth_account.messages import encode_defunct
from eth_keys import keys
import time
import requests
import argparse
import secrets
import hashlib
import sqlite3
from pathlib import Path
class DIDRegistration:
"""
Handles the creation and registration of DIDs with admin authorization.
This class manages the complete lifecycle of DID creation:
1. Generating secure Ethereum keypairs
2. Creating DIDs from public keys
3. Signing registration claims
4. Submitting registration to the endorser.ch API
Attributes:
api_url (str): Endpoint for DID registration
admin_did (str): Administrator DID for authorization
"""
def __init__(self, admin_did: str):
"""
Initialize DID registration with admin credentials.
Args:
admin_did (str): Administrator DID for authorizing registrations
Format: did:ethr:0x...
"""
self.api_url = "https://api.endorser.ch/api/v2/claim"
self.admin_did = admin_did
Account.enable_unaudited_hdwallet_features()
def create_keypair(self) -> dict:
"""
Generate a new Ethereum keypair and associated DID.
Creates a secure random keypair and formats it for use with the
endorser.ch API, including compressed public key format matching
ethers.js implementation.
Returns:
dict: Keypair information containing:
- private_key: Raw private key without 0x prefix
- public_key: Compressed public key with 0x prefix
- address: Ethereum address
- did: Generated DID in did:ethr format
Security:
- Uses secrets module for cryptographically secure randomness
- Implements compressed public key format
- Maintains private key security
"""
private_key = secrets.token_hex(32)
# Create private key object and derive public key
private_key_bytes = bytes.fromhex(private_key)
private_key_obj = keys.PrivateKey(private_key_bytes)
# Get compressed public key (like ethers.js)
public_key_obj = private_key_obj.public_key
public_key = '0x' + public_key_obj.to_compressed_bytes().hex()
# Create account from private key (for address)
account = Account.from_key(private_key_bytes)
return {
'private_key': private_key, # No 0x prefix
'public_key': public_key, # With 0x prefix, compressed format
'address': account.address,
'did': f"did:ethr:{account.address}"
}
def sign_jwt(self, payload: dict, private_key: str, did: str) -> str:
"""
Sign a JWT using ES256K algorithm.
Creates and signs a JWT following the did-jwt specification:
1. Constructs header and payload
2. Base64url encodes components
3. Signs using ES256K
4. Assembles final JWT
Args:
payload (dict): JWT payload to sign
private_key (str): Private key for signing (without 0x prefix)
did (str): DID to use as issuer
Returns:
str: Signed JWT string in format: header.payload.signature
Security:
- Implements ES256K signing
- Follows did-jwt specification
- Handles message hashing correctly
"""
# Add issuer to payload like did-jwt does
full_payload = {
**payload,
"iss": did
}
header = {
"typ": "JWT",
"alg": "ES256K"
}
# Create the JWT segments
header_b64 = base64.urlsafe_b64encode(
json.dumps(header, separators=(',', ':')).encode()
).decode().rstrip('=')
payload_b64 = base64.urlsafe_b64encode(
json.dumps(full_payload, separators=(',', ':')).encode()
).decode().rstrip('=')
message = f"{header_b64}.{payload_b64}"
# Hash the message with sha256 first (like did-jwt)
message_hash = hashlib.sha256(message.encode()).digest()
# Sign the hash directly (not as an Ethereum message)
private_key_bytes = bytes.fromhex(private_key)
account = Account.from_key(private_key_bytes)
signed = account.sign_message(message_hash)
# Get r and s from signature
r = signed.r.to_bytes(32, 'big')
s = signed.s.to_bytes(32, 'big')
signature_bytes = r + s
signature = base64.urlsafe_b64encode(signature_bytes).decode().rstrip('=')
return f"{message}.{signature}"
def create_jwt(self, keypair: dict) -> str:
"""
Create a signed JWT for DID registration.
Creates a registration claim that includes:
1. Admin DID as agent
2. New DID as participant
3. Timestamp and expiration
4. Required credential context
Args:
keypair (dict): Generated keypair information
Returns:
str: Signed JWT containing registration claim
Note:
Matches TypeScript implementation exactly for compatibility
"""
now = int(time.time())
# Create registration claim with admin as agent
register_claim = {
"@context": "https://schema.org",
"@type": "RegisterAction",
"agent": { "did": self.admin_did },
"participant": { "did": keypair['did'] },
"object": "endorser.ch"
}
# Match the TypeScript vcPayload exactly - no iss field
payload = {
"iat": now,
"exp": now + 300,
"sub": "RegisterAction",
"vc": {
"@context": ["https://www.w3.org/2018/credentials/v1"],
"type": ["VerifiableCredential"],
"credentialSubject": register_claim
}
}
print(f"\nDebug - JWT payload: {json.dumps(payload, indent=2)}")
# Sign with new DID's private key
return self.sign_jwt(payload, keypair['private_key'], keypair['did'])
def register_did(self, jwt_token: str) -> dict:
"""
Submit DID registration to the endorser.ch API.
Handles the complete registration process:
1. Submits JWT to API
2. Processes response
3. Formats result or error message
Args:
jwt_token (str): Signed JWT for registration
Returns:
dict: Registration result containing:
- success: Boolean indicating success
- response: API response data
- error: Error message if failed
Security:
- Uses HTTPS for API communication
- Validates response status
- Handles errors gracefully
"""
try:
response = requests.post(
self.api_url,
json={"jwtEncoded": jwt_token},
headers={'Content-Type': 'application/json'}
)
print(f"\nServer Response Status: {response.status_code}")
print(f"Server Response Body: {response.text}")
if response.status_code in [200, 201]:
return {
'success': True,
'response': response.json()
}
else:
try:
error_json = response.json()
error_msg = error_json.get('error', {}).get(
'message', 'Unknown error'
)
return {
'success': False,
'error': f"Registration failed ({response.status_code}): "
f"{error_msg}",
'response': error_json
}
except json.JSONDecodeError:
return {
'success': False,
'error': f"Registration failed ({response.status_code}): "
f"{response.text}",
'response': response.text
}
except (requests.RequestException, ConnectionError) as e:
return {
'success': False,
'error': f"Request failed: {str(e)}"
}
def get_root_did_from_db(db_path: str = None) -> str:
"""Get admin DID from the most recent registration."""
try:
db_path = db_path or str(Path.home() / '.endorser' / 'accounts.db')
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
cursor.execute("""
SELECT issuer as did
FROM registration
WHERE type = 'RegisterAction'
ORDER BY issuanceDate DESC
LIMIT 1
""")
result = cursor.fetchone()
if not result:
raise ValueError('No admin DID found in registration table')
return result[0]
finally:
if 'conn' in locals():
conn.close()
def main():
"""
Main entry point for DID generation script.
Handles:
1. Command line argument parsing
2. DID generation and registration process
3. Result output and error display
Usage:
python did_generator.py <admin_did>
"""
parser = argparse.ArgumentParser(
description='Generate a DID with admin authorization'
)
parser.add_argument(
'--admin-did',
help='Admin DID (e.g., did:ethr:0x0000...)',
required=False
)
parser.add_argument(
'--db-path',
help='Path to SQLite database containing root DID',
required=False
)
args = parser.parse_args()
admin_did = args.admin_did
if not admin_did:
try:
admin_did = get_root_did_from_db(args.db_path)
print(f"Found root DID in database: {admin_did}")
except (FileNotFoundError, ValueError, sqlite3.Error) as e:
print(f"Error: {str(e)}")
print("Please provide --admin-did argument")
return
print('Starting DID Generation...\n')
registrar = DIDRegistration(admin_did)
print("Generating new keypair...")
keypair = registrar.create_keypair()
print("\nGenerated DID Details:")
print("----------------------")
print(f"DID: {keypair['did']}")
print(f"Admin DID: {admin_did}")
print(f"Address: {keypair['address']}")
print(f"Private Key: {keypair['private_key']}")
print(f"Public Key: {keypair['public_key']}\n")
print("Creating JWT...")
jwt_token = registrar.create_jwt(keypair)
print('\nSuccessfully generated DID with admin authorization!')
print(f'Registration JWT: {jwt_token[:50]}...')
print("\nAttempting registration...")
result = registrar.register_did(jwt_token)
if result['success']:
print("Registration successful!")
print("Response: {json.dumps(result['response'], indent=2)}")
else:
print("Registration failed!")
print(f"Error: {result['error']}")
if 'response' in result:
print(f"Full response: {json.dumps(result['response'], indent=2)}")
if __name__ == "__main__":
main()