You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
364 lines
12 KiB
364 lines
12 KiB
"""
|
|
DID Generator Script
|
|
@author Matthew Raymer
|
|
|
|
This script generates and registers Decentralized Identifiers (DIDs) with admin authorization.
|
|
It supports the creation of Ethereum-based DIDs (did:ethr) and handles the complete
|
|
registration flow with the endorser.ch API.
|
|
|
|
Features:
|
|
- Ethereum keypair generation with compressed public keys
|
|
- JWT creation and signing using ES256K
|
|
- DID registration with admin authorization
|
|
- Detailed error handling and logging
|
|
- Command-line interface for admin DID input
|
|
|
|
Dependencies:
|
|
eth_account: For Ethereum account operations
|
|
eth_keys: For key manipulation and compression
|
|
requests: For API communication
|
|
secrets: For secure random number generation
|
|
hashlib: For SHA-256 hashing
|
|
base64: For JWT encoding
|
|
argparse: For command-line argument parsing
|
|
pathlib: For path handling
|
|
dotenv: For environment variable loading
|
|
os: For environment variable access
|
|
|
|
Usage:
|
|
python did_generator.py [options]
|
|
|
|
Options:
|
|
--admin-did <did> Override default admin DID
|
|
--api-url <url> Override default API endpoint
|
|
|
|
Environment:
|
|
Default Admin DID: did:ethr:0x0000694B58C2cC69658993A90D3840C560f2F51F
|
|
Default API URL: https://test-api.endorser.ch/api/v2/claim
|
|
"""
|
|
|
|
from eth_account import Account
|
|
import json
|
|
import base64
|
|
from eth_account.messages import encode_defunct
|
|
from eth_keys import keys
|
|
import time
|
|
import requests
|
|
import argparse
|
|
import secrets
|
|
import hashlib
|
|
from pathlib import Path
|
|
from dotenv import load_dotenv
|
|
import os
|
|
|
|
# Load environment variables
|
|
load_dotenv()
|
|
|
|
class DIDRegistration:
|
|
"""
|
|
Handles the creation and registration of DIDs with admin authorization.
|
|
|
|
This class manages the complete lifecycle of DID creation and registration:
|
|
1. Generating secure Ethereum keypairs with compressed public keys
|
|
2. Creating DIDs from public keys in did:ethr format
|
|
3. Signing registration claims with admin credentials
|
|
4. Submitting registration to the endorser.ch API
|
|
|
|
The registration process uses two keypairs:
|
|
1. Admin keypair: Used to sign and authorize the registration
|
|
2. New DID keypair: The identity being registered
|
|
|
|
Attributes:
|
|
api_url (str): Endpoint for DID registration
|
|
admin_keypair (dict): Admin's credentials containing:
|
|
- did: Admin's DID in did:ethr format
|
|
- private_key: Admin's private key for signing
|
|
"""
|
|
|
|
def __init__(self, admin_keypair: dict, api_url: str = None):
|
|
"""
|
|
Initialize DID registration with admin credentials.
|
|
|
|
Args:
|
|
admin_keypair (dict): Admin's DID and private key for signing
|
|
api_url (str, optional): Override default API URL
|
|
"""
|
|
self.api_url = api_url or "https://test-api.endorser.ch/api/v2/claim"
|
|
self.admin_keypair = admin_keypair # Store full admin keypair
|
|
Account.enable_unaudited_hdwallet_features()
|
|
|
|
def create_keypair(self) -> dict:
|
|
"""
|
|
Generate a new Ethereum keypair and associated DID.
|
|
|
|
Creates a secure random keypair and formats it for use with the
|
|
endorser.ch API. Uses compressed public key format to match
|
|
ethers.js implementation.
|
|
|
|
Returns:
|
|
dict: Keypair information containing:
|
|
- private_key: Raw private key without 0x prefix
|
|
- public_key: Compressed public key with 0x prefix
|
|
- address: Ethereum address
|
|
- did: Generated DID in did:ethr format
|
|
|
|
Security:
|
|
- Uses secrets module for cryptographically secure randomness
|
|
- Implements compressed public key format
|
|
- Maintains private key security
|
|
"""
|
|
private_key = secrets.token_hex(32)
|
|
|
|
# Create private key object and derive public key
|
|
private_key_bytes = bytes.fromhex(private_key)
|
|
private_key_obj = keys.PrivateKey(private_key_bytes)
|
|
|
|
# Get compressed public key (like ethers.js)
|
|
public_key_obj = private_key_obj.public_key
|
|
public_key = '0x' + public_key_obj.to_compressed_bytes().hex()
|
|
|
|
# Create account from private key (for address)
|
|
account = Account.from_key(private_key_bytes)
|
|
|
|
return {
|
|
'private_key': private_key, # No 0x prefix
|
|
'public_key': public_key, # With 0x prefix, compressed format
|
|
'address': account.address,
|
|
'did': f"did:ethr:{account.address}"
|
|
}
|
|
|
|
def sign_jwt(self, payload: dict, private_key: str, did: str) -> str:
|
|
"""
|
|
Sign a JWT using ES256K algorithm.
|
|
|
|
Creates and signs a JWT following the did-jwt specification:
|
|
1. Constructs header and payload
|
|
2. Base64url encodes components
|
|
3. Signs using ES256K
|
|
4. Assembles final JWT
|
|
|
|
Args:
|
|
payload (dict): JWT payload to sign
|
|
private_key (str): Private key for signing (without 0x prefix)
|
|
did (str): DID to use as issuer
|
|
|
|
Returns:
|
|
str: Signed JWT string in format: header.payload.signature
|
|
|
|
Security:
|
|
- Implements ES256K signing
|
|
- Follows did-jwt specification
|
|
- Handles message hashing correctly
|
|
"""
|
|
# Add issuer to payload like did-jwt does
|
|
full_payload = {
|
|
**payload,
|
|
"iss": did
|
|
}
|
|
|
|
header = {
|
|
"typ": "JWT",
|
|
"alg": "ES256K"
|
|
}
|
|
|
|
# Create the JWT segments
|
|
header_b64 = base64.urlsafe_b64encode(
|
|
json.dumps(header, separators=(',', ':')).encode()
|
|
).decode().rstrip('=')
|
|
payload_b64 = base64.urlsafe_b64encode(
|
|
json.dumps(full_payload, separators=(',', ':')).encode()
|
|
).decode().rstrip('=')
|
|
message = f"{header_b64}.{payload_b64}"
|
|
|
|
# Hash the message with sha256
|
|
message_hash = hashlib.sha256(message.encode()).digest()
|
|
|
|
# Sign using eth_keys directly
|
|
private_key_bytes = bytes.fromhex(private_key)
|
|
private_key_obj = keys.PrivateKey(private_key_bytes)
|
|
signature = private_key_obj.sign_msg_hash(message_hash)
|
|
|
|
# Get r and s from signature
|
|
r = signature.r.to_bytes(32, 'big')
|
|
s = signature.s.to_bytes(32, 'big')
|
|
signature_bytes = r + s
|
|
|
|
# Format signature
|
|
signature_b64 = base64.urlsafe_b64encode(signature_bytes).decode().rstrip('=')
|
|
|
|
return f"{message}.{signature_b64}"
|
|
|
|
def create_jwt(self, new_did: str) -> str:
|
|
"""
|
|
Create a signed JWT for DID registration
|
|
|
|
Args:
|
|
new_did (str): The DID being registered
|
|
"""
|
|
now = int(time.time())
|
|
|
|
# Create registration claim with admin as agent
|
|
register_claim = {
|
|
"@context": "https://schema.org",
|
|
"@type": "RegisterAction",
|
|
"agent": { "did": self.admin_keypair['did'] },
|
|
"participant": { "did": new_did },
|
|
"object": "endorser.ch"
|
|
}
|
|
|
|
payload = {
|
|
"iat": now,
|
|
"exp": now + 300,
|
|
"sub": "RegisterAction",
|
|
"vc": {
|
|
"@context": ["https://www.w3.org/2018/credentials/v1"],
|
|
"type": ["VerifiableCredential"],
|
|
"credentialSubject": register_claim
|
|
}
|
|
}
|
|
|
|
print(f"\nDebug - JWT payload: {json.dumps(payload, indent=2)}")
|
|
|
|
# Sign with admin's private key
|
|
return self.sign_jwt(
|
|
payload,
|
|
self.admin_keypair['private_key'],
|
|
self.admin_keypair['did']
|
|
)
|
|
|
|
def register_did(self, jwt_token: str) -> dict:
|
|
"""
|
|
Submit DID registration to the endorser.ch API.
|
|
|
|
Handles the complete registration process:
|
|
1. Submits JWT to API
|
|
2. Processes response
|
|
3. Formats result or error message
|
|
|
|
Args:
|
|
jwt_token (str): Signed JWT for registration
|
|
|
|
Returns:
|
|
dict: Registration result containing:
|
|
- success: Boolean indicating success
|
|
- response: API response data
|
|
- error: Error message if failed
|
|
|
|
Security:
|
|
- Uses HTTPS for API communication
|
|
- Validates response status
|
|
- Handles errors gracefully
|
|
"""
|
|
try:
|
|
response = requests.post(
|
|
self.api_url,
|
|
json={"jwtEncoded": jwt_token},
|
|
headers={'Content-Type': 'application/json'}
|
|
)
|
|
|
|
print(f"\nServer Response Status: {response.status_code}")
|
|
print(f"Server Response Body: {response.text}")
|
|
|
|
if response.status_code in [200, 201]:
|
|
return {
|
|
'success': True,
|
|
'response': response.json()
|
|
}
|
|
else:
|
|
try:
|
|
error_json = response.json()
|
|
error_msg = error_json.get('error', {}).get(
|
|
'message', 'Unknown error'
|
|
)
|
|
return {
|
|
'success': False,
|
|
'error': f"Registration failed ({response.status_code}): "
|
|
f"{error_msg}",
|
|
'response': error_json
|
|
}
|
|
except json.JSONDecodeError:
|
|
return {
|
|
'success': False,
|
|
'error': f"Registration failed ({response.status_code}): "
|
|
f"{response.text}",
|
|
'response': response.text
|
|
}
|
|
except (requests.RequestException, ConnectionError) as e:
|
|
return {
|
|
'success': False,
|
|
'error': f"Request failed: {str(e)}"
|
|
}
|
|
|
|
def main():
|
|
"""
|
|
Main entry point for DID generation script.
|
|
|
|
Handles:
|
|
1. Command line argument parsing
|
|
2. DID generation and registration process
|
|
3. Result output and error display
|
|
|
|
Usage:
|
|
python did_generator.py [options]
|
|
"""
|
|
parser = argparse.ArgumentParser(
|
|
description='Generate a DID with admin authorization'
|
|
)
|
|
parser.add_argument(
|
|
'--admin-did',
|
|
help='Admin DID (e.g., did:ethr:0x0000...)',
|
|
required=False
|
|
)
|
|
parser.add_argument(
|
|
'--api-url',
|
|
help='Override API URL',
|
|
default=os.getenv('ENDORSER_API_URL', 'https://test-api.endorser.ch/api/v2/claim')
|
|
)
|
|
args = parser.parse_args()
|
|
|
|
# Get admin credentials from environment
|
|
admin_keypair = {
|
|
'did': os.getenv('ADMIN_DID', 'did:ethr:0x0000694B58C2cC69658993A90D3840C560f2F51F'),
|
|
'private_key': os.getenv('ADMIN_PRIVATE_KEY', '2b6472c026ec2aa2c4235c994a63868fc9212d18b58f6cbfe861b52e71330f5b')
|
|
}
|
|
|
|
admin_did = args.admin_did
|
|
if not admin_did:
|
|
admin_did = admin_keypair['did']
|
|
|
|
print(f"Admin DID: {admin_did}")
|
|
print(f"API URL: {args.api_url}")
|
|
|
|
print('Starting DID Generation...\n')
|
|
registrar = DIDRegistration(admin_keypair, args.api_url)
|
|
|
|
print("Generating new keypair...")
|
|
new_keypair = registrar.create_keypair()
|
|
|
|
print("\nGenerated DID Details:")
|
|
print("----------------------")
|
|
print(f"DID: {new_keypair['did']}")
|
|
print(f"Admin DID: {admin_did}")
|
|
print(f"Address: {new_keypair['address']}")
|
|
print(f"Private Key: {new_keypair['private_key']}")
|
|
print(f"Public Key: {new_keypair['public_key']}\n")
|
|
|
|
print("Creating JWT...")
|
|
jwt_token = registrar.create_jwt(new_keypair['did'])
|
|
|
|
print('\nSuccessfully generated DID with admin authorization!')
|
|
print(f'Registration JWT: {jwt_token[:50]}...')
|
|
|
|
print("\nAttempting registration...")
|
|
result = registrar.register_did(jwt_token)
|
|
if result['success']:
|
|
print("Registration successful!")
|
|
print("Response: {json.dumps(result['response'], indent=2)}")
|
|
else:
|
|
print("Registration failed!")
|
|
print(f"Error: {result['error']}")
|
|
if 'response' in result:
|
|
print(f"Full response: {json.dumps(result['response'], indent=2)}")
|
|
|
|
if __name__ == "__main__":
|
|
main()
|
|
|