forked from jsnbuchanan/crowd-funder-for-time-pwa
Changes: - Update SQL query to use registration table instead of accounts - Add proper column names for registration table schema - Add issuanceDate sorting for latest admin DID - Improve error messages for database queries - Add TypeScript types for database row results This fixes DID generation by using the correct table schema from the endorser database.
378 lines
12 KiB
Python
378 lines
12 KiB
Python
"""
|
|
DID Generator Script
|
|
@author Matthew Raymer
|
|
|
|
This script generates and registers Decentralized Identifiers (DIDs) with admin authorization.
|
|
It supports the creation of Ethereum-based DIDs (did:ethr) and handles the complete
|
|
registration flow with the endorser.ch API.
|
|
|
|
Features:
|
|
- Ethereum keypair generation with compressed public keys
|
|
- JWT creation and signing using ES256K
|
|
- DID registration with admin authorization
|
|
- Detailed error handling and logging
|
|
- Command-line interface for admin DID input
|
|
|
|
Dependencies:
|
|
eth_account: For Ethereum account operations
|
|
eth_keys: For key manipulation and compression
|
|
requests: For API communication
|
|
secrets: For secure random number generation
|
|
hashlib: For SHA-256 hashing
|
|
base64: For JWT encoding
|
|
argparse: For command-line argument parsing
|
|
sqlite3: For database operations
|
|
pathlib: For path handling
|
|
|
|
Usage:
|
|
python did_generator.py <admin_did>
|
|
|
|
Example:
|
|
python did_generator.py did:ethr:0x1234...5678
|
|
"""
|
|
|
|
from eth_account import Account
|
|
import json
|
|
import base64
|
|
from eth_account.messages import encode_defunct
|
|
from eth_keys import keys
|
|
import time
|
|
import requests
|
|
import argparse
|
|
import secrets
|
|
import hashlib
|
|
import sqlite3
|
|
from pathlib import Path
|
|
|
|
class DIDRegistration:
|
|
"""
|
|
Handles the creation and registration of DIDs with admin authorization.
|
|
|
|
This class manages the complete lifecycle of DID creation:
|
|
1. Generating secure Ethereum keypairs
|
|
2. Creating DIDs from public keys
|
|
3. Signing registration claims
|
|
4. Submitting registration to the endorser.ch API
|
|
|
|
Attributes:
|
|
api_url (str): Endpoint for DID registration
|
|
admin_did (str): Administrator DID for authorization
|
|
"""
|
|
|
|
def __init__(self, admin_did: str):
|
|
"""
|
|
Initialize DID registration with admin credentials.
|
|
|
|
Args:
|
|
admin_did (str): Administrator DID for authorizing registrations
|
|
Format: did:ethr:0x...
|
|
"""
|
|
self.api_url = "https://api.endorser.ch/api/v2/claim"
|
|
self.admin_did = admin_did
|
|
Account.enable_unaudited_hdwallet_features()
|
|
|
|
def create_keypair(self) -> dict:
|
|
"""
|
|
Generate a new Ethereum keypair and associated DID.
|
|
|
|
Creates a secure random keypair and formats it for use with the
|
|
endorser.ch API, including compressed public key format matching
|
|
ethers.js implementation.
|
|
|
|
Returns:
|
|
dict: Keypair information containing:
|
|
- private_key: Raw private key without 0x prefix
|
|
- public_key: Compressed public key with 0x prefix
|
|
- address: Ethereum address
|
|
- did: Generated DID in did:ethr format
|
|
|
|
Security:
|
|
- Uses secrets module for cryptographically secure randomness
|
|
- Implements compressed public key format
|
|
- Maintains private key security
|
|
"""
|
|
private_key = secrets.token_hex(32)
|
|
|
|
# Create private key object and derive public key
|
|
private_key_bytes = bytes.fromhex(private_key)
|
|
private_key_obj = keys.PrivateKey(private_key_bytes)
|
|
|
|
# Get compressed public key (like ethers.js)
|
|
public_key_obj = private_key_obj.public_key
|
|
public_key = '0x' + public_key_obj.to_compressed_bytes().hex()
|
|
|
|
# Create account from private key (for address)
|
|
account = Account.from_key(private_key_bytes)
|
|
|
|
return {
|
|
'private_key': private_key, # No 0x prefix
|
|
'public_key': public_key, # With 0x prefix, compressed format
|
|
'address': account.address,
|
|
'did': f"did:ethr:{account.address}"
|
|
}
|
|
|
|
def sign_jwt(self, payload: dict, private_key: str, did: str) -> str:
|
|
"""
|
|
Sign a JWT using ES256K algorithm.
|
|
|
|
Creates and signs a JWT following the did-jwt specification:
|
|
1. Constructs header and payload
|
|
2. Base64url encodes components
|
|
3. Signs using ES256K
|
|
4. Assembles final JWT
|
|
|
|
Args:
|
|
payload (dict): JWT payload to sign
|
|
private_key (str): Private key for signing (without 0x prefix)
|
|
did (str): DID to use as issuer
|
|
|
|
Returns:
|
|
str: Signed JWT string in format: header.payload.signature
|
|
|
|
Security:
|
|
- Implements ES256K signing
|
|
- Follows did-jwt specification
|
|
- Handles message hashing correctly
|
|
"""
|
|
# Add issuer to payload like did-jwt does
|
|
full_payload = {
|
|
**payload,
|
|
"iss": did
|
|
}
|
|
|
|
header = {
|
|
"typ": "JWT",
|
|
"alg": "ES256K"
|
|
}
|
|
|
|
# Create the JWT segments
|
|
header_b64 = base64.urlsafe_b64encode(
|
|
json.dumps(header, separators=(',', ':')).encode()
|
|
).decode().rstrip('=')
|
|
payload_b64 = base64.urlsafe_b64encode(
|
|
json.dumps(full_payload, separators=(',', ':')).encode()
|
|
).decode().rstrip('=')
|
|
message = f"{header_b64}.{payload_b64}"
|
|
|
|
# Hash the message with sha256 first (like did-jwt)
|
|
message_hash = hashlib.sha256(message.encode()).digest()
|
|
|
|
# Sign the hash directly (not as an Ethereum message)
|
|
private_key_bytes = bytes.fromhex(private_key)
|
|
account = Account.from_key(private_key_bytes)
|
|
signed = account.sign_message(message_hash)
|
|
|
|
# Get r and s from signature
|
|
r = signed.r.to_bytes(32, 'big')
|
|
s = signed.s.to_bytes(32, 'big')
|
|
signature_bytes = r + s
|
|
|
|
signature = base64.urlsafe_b64encode(signature_bytes).decode().rstrip('=')
|
|
|
|
return f"{message}.{signature}"
|
|
|
|
def create_jwt(self, keypair: dict) -> str:
|
|
"""
|
|
Create a signed JWT for DID registration.
|
|
|
|
Creates a registration claim that includes:
|
|
1. Admin DID as agent
|
|
2. New DID as participant
|
|
3. Timestamp and expiration
|
|
4. Required credential context
|
|
|
|
Args:
|
|
keypair (dict): Generated keypair information
|
|
|
|
Returns:
|
|
str: Signed JWT containing registration claim
|
|
|
|
Note:
|
|
Matches TypeScript implementation exactly for compatibility
|
|
"""
|
|
now = int(time.time())
|
|
|
|
# Create registration claim with admin as agent
|
|
register_claim = {
|
|
"@context": "https://schema.org",
|
|
"@type": "RegisterAction",
|
|
"agent": { "did": self.admin_did },
|
|
"participant": { "did": keypair['did'] },
|
|
"object": "endorser.ch"
|
|
}
|
|
|
|
# Match the TypeScript vcPayload exactly - no iss field
|
|
payload = {
|
|
"iat": now,
|
|
"exp": now + 300,
|
|
"sub": "RegisterAction",
|
|
"vc": {
|
|
"@context": ["https://www.w3.org/2018/credentials/v1"],
|
|
"type": ["VerifiableCredential"],
|
|
"credentialSubject": register_claim
|
|
}
|
|
}
|
|
|
|
print(f"\nDebug - JWT payload: {json.dumps(payload, indent=2)}")
|
|
|
|
# Sign with new DID's private key
|
|
return self.sign_jwt(payload, keypair['private_key'], keypair['did'])
|
|
|
|
def register_did(self, jwt_token: str) -> dict:
|
|
"""
|
|
Submit DID registration to the endorser.ch API.
|
|
|
|
Handles the complete registration process:
|
|
1. Submits JWT to API
|
|
2. Processes response
|
|
3. Formats result or error message
|
|
|
|
Args:
|
|
jwt_token (str): Signed JWT for registration
|
|
|
|
Returns:
|
|
dict: Registration result containing:
|
|
- success: Boolean indicating success
|
|
- response: API response data
|
|
- error: Error message if failed
|
|
|
|
Security:
|
|
- Uses HTTPS for API communication
|
|
- Validates response status
|
|
- Handles errors gracefully
|
|
"""
|
|
try:
|
|
response = requests.post(
|
|
self.api_url,
|
|
json={"jwtEncoded": jwt_token},
|
|
headers={'Content-Type': 'application/json'}
|
|
)
|
|
|
|
print(f"\nServer Response Status: {response.status_code}")
|
|
print(f"Server Response Body: {response.text}")
|
|
|
|
if response.status_code in [200, 201]:
|
|
return {
|
|
'success': True,
|
|
'response': response.json()
|
|
}
|
|
else:
|
|
try:
|
|
error_json = response.json()
|
|
error_msg = error_json.get('error', {}).get(
|
|
'message', 'Unknown error'
|
|
)
|
|
return {
|
|
'success': False,
|
|
'error': f"Registration failed ({response.status_code}): "
|
|
f"{error_msg}",
|
|
'response': error_json
|
|
}
|
|
except json.JSONDecodeError:
|
|
return {
|
|
'success': False,
|
|
'error': f"Registration failed ({response.status_code}): "
|
|
f"{response.text}",
|
|
'response': response.text
|
|
}
|
|
except (requests.RequestException, ConnectionError) as e:
|
|
return {
|
|
'success': False,
|
|
'error': f"Request failed: {str(e)}"
|
|
}
|
|
|
|
def get_root_did_from_db(db_path: str = None) -> str:
|
|
"""Get admin DID from the most recent registration."""
|
|
try:
|
|
db_path = db_path or str(Path.home() / '.endorser' / 'accounts.db')
|
|
conn = sqlite3.connect(db_path)
|
|
cursor = conn.cursor()
|
|
|
|
cursor.execute("""
|
|
SELECT issuer as did
|
|
FROM registration
|
|
WHERE type = 'RegisterAction'
|
|
ORDER BY issuanceDate DESC
|
|
LIMIT 1
|
|
""")
|
|
|
|
result = cursor.fetchone()
|
|
if not result:
|
|
raise ValueError('No admin DID found in registration table')
|
|
|
|
return result[0]
|
|
finally:
|
|
if 'conn' in locals():
|
|
conn.close()
|
|
|
|
def main():
|
|
"""
|
|
Main entry point for DID generation script.
|
|
|
|
Handles:
|
|
1. Command line argument parsing
|
|
2. DID generation and registration process
|
|
3. Result output and error display
|
|
|
|
Usage:
|
|
python did_generator.py <admin_did>
|
|
"""
|
|
parser = argparse.ArgumentParser(
|
|
description='Generate a DID with admin authorization'
|
|
)
|
|
parser.add_argument(
|
|
'--admin-did',
|
|
help='Admin DID (e.g., did:ethr:0x0000...)',
|
|
required=False
|
|
)
|
|
parser.add_argument(
|
|
'--db-path',
|
|
help='Path to SQLite database containing root DID',
|
|
required=False
|
|
)
|
|
args = parser.parse_args()
|
|
|
|
admin_did = args.admin_did
|
|
if not admin_did:
|
|
try:
|
|
admin_did = get_root_did_from_db(args.db_path)
|
|
print(f"Found root DID in database: {admin_did}")
|
|
except (FileNotFoundError, ValueError, sqlite3.Error) as e:
|
|
print(f"Error: {str(e)}")
|
|
print("Please provide --admin-did argument")
|
|
return
|
|
|
|
print('Starting DID Generation...\n')
|
|
registrar = DIDRegistration(admin_did)
|
|
|
|
print("Generating new keypair...")
|
|
keypair = registrar.create_keypair()
|
|
|
|
print("\nGenerated DID Details:")
|
|
print("----------------------")
|
|
print(f"DID: {keypair['did']}")
|
|
print(f"Admin DID: {admin_did}")
|
|
print(f"Address: {keypair['address']}")
|
|
print(f"Private Key: {keypair['private_key']}")
|
|
print(f"Public Key: {keypair['public_key']}\n")
|
|
|
|
print("Creating JWT...")
|
|
jwt_token = registrar.create_jwt(keypair)
|
|
|
|
print('\nSuccessfully generated DID with admin authorization!')
|
|
print(f'Registration JWT: {jwt_token[:50]}...')
|
|
|
|
print("\nAttempting registration...")
|
|
result = registrar.register_did(jwt_token)
|
|
if result['success']:
|
|
print("Registration successful!")
|
|
print("Response: {json.dumps(result['response'], indent=2)}")
|
|
else:
|
|
print("Registration failed!")
|
|
print(f"Error: {result['error']}")
|
|
if 'response' in result:
|
|
print(f"Full response: {json.dumps(result['response'], indent=2)}")
|
|
|
|
if __name__ == "__main__":
|
|
main()
|