forked from jsnbuchanan/crowd-funder-for-time-pwa
feat: Improve DID registration with admin credentials
- Add admin keypair (DID + private key) for proper registration signing - Remove SQLite database dependency for admin DID lookup - Update both TypeScript and Python implementations to use admin credentials - Enhance documentation with detailed usage, options, and security notes - Fix JWT signing to use admin's key as issuer - Standardize API URL handling with defaults - Add command-line options for admin DID and API URL override Both implementations now successfully register new DIDs using the admin's credentials to sign the registration claims. The admin acts as both the agent in the claim and the issuer of the JWT.
This commit is contained in:
@@ -21,14 +21,18 @@ Dependencies:
|
||||
hashlib: For SHA-256 hashing
|
||||
base64: For JWT encoding
|
||||
argparse: For command-line argument parsing
|
||||
sqlite3: For database operations
|
||||
pathlib: For path handling
|
||||
|
||||
Usage:
|
||||
python did_generator.py <admin_did>
|
||||
python did_generator.py [options]
|
||||
|
||||
Example:
|
||||
python did_generator.py did:ethr:0x1234...5678
|
||||
Options:
|
||||
--admin-did <did> Override default admin DID
|
||||
--api-url <url> Override default API endpoint
|
||||
|
||||
Environment:
|
||||
Default Admin DID: did:ethr:0x0000694B58C2cC69658993A90D3840C560f2F51F
|
||||
Default API URL: https://test-api.endorser.ch/api/v2/claim
|
||||
"""
|
||||
|
||||
from eth_account import Account
|
||||
@@ -41,34 +45,39 @@ import requests
|
||||
import argparse
|
||||
import secrets
|
||||
import hashlib
|
||||
import sqlite3
|
||||
from pathlib import Path
|
||||
|
||||
class DIDRegistration:
|
||||
"""
|
||||
Handles the creation and registration of DIDs with admin authorization.
|
||||
|
||||
This class manages the complete lifecycle of DID creation:
|
||||
1. Generating secure Ethereum keypairs
|
||||
2. Creating DIDs from public keys
|
||||
3. Signing registration claims
|
||||
This class manages the complete lifecycle of DID creation and registration:
|
||||
1. Generating secure Ethereum keypairs with compressed public keys
|
||||
2. Creating DIDs from public keys in did:ethr format
|
||||
3. Signing registration claims with admin credentials
|
||||
4. Submitting registration to the endorser.ch API
|
||||
|
||||
The registration process uses two keypairs:
|
||||
1. Admin keypair: Used to sign and authorize the registration
|
||||
2. New DID keypair: The identity being registered
|
||||
|
||||
Attributes:
|
||||
api_url (str): Endpoint for DID registration
|
||||
admin_did (str): Administrator DID for authorization
|
||||
admin_keypair (dict): Admin's credentials containing:
|
||||
- did: Admin's DID in did:ethr format
|
||||
- private_key: Admin's private key for signing
|
||||
"""
|
||||
|
||||
def __init__(self, admin_did: str):
|
||||
def __init__(self, admin_keypair: dict, api_url: str = None):
|
||||
"""
|
||||
Initialize DID registration with admin credentials.
|
||||
|
||||
Args:
|
||||
admin_did (str): Administrator DID for authorizing registrations
|
||||
Format: did:ethr:0x...
|
||||
admin_keypair (dict): Admin's DID and private key for signing
|
||||
api_url (str, optional): Override default API URL
|
||||
"""
|
||||
self.api_url = "https://api.endorser.ch/api/v2/claim"
|
||||
self.admin_did = admin_did
|
||||
self.api_url = api_url or "https://test-api.endorser.ch/api/v2/claim"
|
||||
self.admin_keypair = admin_keypair # Store full admin keypair
|
||||
Account.enable_unaudited_hdwallet_features()
|
||||
|
||||
def create_keypair(self) -> dict:
|
||||
@@ -76,7 +85,7 @@ class DIDRegistration:
|
||||
Generate a new Ethereum keypair and associated DID.
|
||||
|
||||
Creates a secure random keypair and formats it for use with the
|
||||
endorser.ch API, including compressed public key format matching
|
||||
endorser.ch API. Uses compressed public key format to match
|
||||
ethers.js implementation.
|
||||
|
||||
Returns:
|
||||
@@ -154,41 +163,30 @@ class DIDRegistration:
|
||||
).decode().rstrip('=')
|
||||
message = f"{header_b64}.{payload_b64}"
|
||||
|
||||
# Hash the message with sha256 first (like did-jwt)
|
||||
# Hash the message with sha256
|
||||
message_hash = hashlib.sha256(message.encode()).digest()
|
||||
|
||||
# Sign the hash directly (not as an Ethereum message)
|
||||
# Sign using eth_keys directly
|
||||
private_key_bytes = bytes.fromhex(private_key)
|
||||
account = Account.from_key(private_key_bytes)
|
||||
signed = account.sign_message(message_hash)
|
||||
private_key_obj = keys.PrivateKey(private_key_bytes)
|
||||
signature = private_key_obj.sign_msg_hash(message_hash)
|
||||
|
||||
# Get r and s from signature
|
||||
r = signed.r.to_bytes(32, 'big')
|
||||
s = signed.s.to_bytes(32, 'big')
|
||||
r = signature.r.to_bytes(32, 'big')
|
||||
s = signature.s.to_bytes(32, 'big')
|
||||
signature_bytes = r + s
|
||||
|
||||
signature = base64.urlsafe_b64encode(signature_bytes).decode().rstrip('=')
|
||||
# Format signature
|
||||
signature_b64 = base64.urlsafe_b64encode(signature_bytes).decode().rstrip('=')
|
||||
|
||||
return f"{message}.{signature}"
|
||||
return f"{message}.{signature_b64}"
|
||||
|
||||
def create_jwt(self, keypair: dict) -> str:
|
||||
def create_jwt(self, new_did: str) -> str:
|
||||
"""
|
||||
Create a signed JWT for DID registration.
|
||||
|
||||
Creates a registration claim that includes:
|
||||
1. Admin DID as agent
|
||||
2. New DID as participant
|
||||
3. Timestamp and expiration
|
||||
4. Required credential context
|
||||
Create a signed JWT for DID registration
|
||||
|
||||
Args:
|
||||
keypair (dict): Generated keypair information
|
||||
|
||||
Returns:
|
||||
str: Signed JWT containing registration claim
|
||||
|
||||
Note:
|
||||
Matches TypeScript implementation exactly for compatibility
|
||||
new_did (str): The DID being registered
|
||||
"""
|
||||
now = int(time.time())
|
||||
|
||||
@@ -196,12 +194,11 @@ class DIDRegistration:
|
||||
register_claim = {
|
||||
"@context": "https://schema.org",
|
||||
"@type": "RegisterAction",
|
||||
"agent": { "did": self.admin_did },
|
||||
"participant": { "did": keypair['did'] },
|
||||
"agent": { "did": self.admin_keypair['did'] },
|
||||
"participant": { "did": new_did },
|
||||
"object": "endorser.ch"
|
||||
}
|
||||
|
||||
# Match the TypeScript vcPayload exactly - no iss field
|
||||
payload = {
|
||||
"iat": now,
|
||||
"exp": now + 300,
|
||||
@@ -215,8 +212,12 @@ class DIDRegistration:
|
||||
|
||||
print(f"\nDebug - JWT payload: {json.dumps(payload, indent=2)}")
|
||||
|
||||
# Sign with new DID's private key
|
||||
return self.sign_jwt(payload, keypair['private_key'], keypair['did'])
|
||||
# Sign with admin's private key
|
||||
return self.sign_jwt(
|
||||
payload,
|
||||
self.admin_keypair['private_key'],
|
||||
self.admin_keypair['did']
|
||||
)
|
||||
|
||||
def register_did(self, jwt_token: str) -> dict:
|
||||
"""
|
||||
@@ -281,30 +282,6 @@ class DIDRegistration:
|
||||
'error': f"Request failed: {str(e)}"
|
||||
}
|
||||
|
||||
def get_root_did_from_db(db_path: str = None) -> str:
|
||||
"""Get admin DID from the most recent registration."""
|
||||
try:
|
||||
db_path = db_path or str(Path.home() / '.endorser' / 'accounts.db')
|
||||
conn = sqlite3.connect(db_path)
|
||||
cursor = conn.cursor()
|
||||
|
||||
cursor.execute("""
|
||||
SELECT issuer as did
|
||||
FROM registration
|
||||
WHERE type = 'RegisterAction'
|
||||
ORDER BY issuanceDate DESC
|
||||
LIMIT 1
|
||||
""")
|
||||
|
||||
result = cursor.fetchone()
|
||||
if not result:
|
||||
raise ValueError('No admin DID found in registration table')
|
||||
|
||||
return result[0]
|
||||
finally:
|
||||
if 'conn' in locals():
|
||||
conn.close()
|
||||
|
||||
def main():
|
||||
"""
|
||||
Main entry point for DID generation script.
|
||||
@@ -315,7 +292,7 @@ def main():
|
||||
3. Result output and error display
|
||||
|
||||
Usage:
|
||||
python did_generator.py <admin_did>
|
||||
python did_generator.py [options]
|
||||
"""
|
||||
parser = argparse.ArgumentParser(
|
||||
description='Generate a DID with admin authorization'
|
||||
@@ -326,38 +303,39 @@ def main():
|
||||
required=False
|
||||
)
|
||||
parser.add_argument(
|
||||
'--db-path',
|
||||
help='Path to SQLite database containing root DID',
|
||||
required=False
|
||||
'--api-url',
|
||||
help='Override API URL',
|
||||
default="https://test-api.endorser.ch/api/v2/claim"
|
||||
)
|
||||
args = parser.parse_args()
|
||||
admin_keypair = {
|
||||
'did': 'did:ethr:0x0000694B58C2cC69658993A90D3840C560f2F51F',
|
||||
'private_key': '2b6472c026ec2aa2c4235c994a63868fc9212d18b58f6cbfe861b52e71330f5b'
|
||||
}
|
||||
|
||||
admin_did = args.admin_did
|
||||
if not admin_did:
|
||||
try:
|
||||
admin_did = get_root_did_from_db(args.db_path)
|
||||
print(f"Found root DID in database: {admin_did}")
|
||||
except (FileNotFoundError, ValueError, sqlite3.Error) as e:
|
||||
print(f"Error: {str(e)}")
|
||||
print("Please provide --admin-did argument")
|
||||
return
|
||||
admin_did = admin_keypair['did']
|
||||
|
||||
print(f"Admin DID: {admin_did}")
|
||||
print(f"API URL: {args.api_url}")
|
||||
|
||||
print('Starting DID Generation...\n')
|
||||
registrar = DIDRegistration(admin_did)
|
||||
registrar = DIDRegistration(admin_keypair, args.api_url)
|
||||
|
||||
print("Generating new keypair...")
|
||||
keypair = registrar.create_keypair()
|
||||
new_keypair = registrar.create_keypair()
|
||||
|
||||
print("\nGenerated DID Details:")
|
||||
print("----------------------")
|
||||
print(f"DID: {keypair['did']}")
|
||||
print(f"DID: {new_keypair['did']}")
|
||||
print(f"Admin DID: {admin_did}")
|
||||
print(f"Address: {keypair['address']}")
|
||||
print(f"Private Key: {keypair['private_key']}")
|
||||
print(f"Public Key: {keypair['public_key']}\n")
|
||||
print(f"Address: {new_keypair['address']}")
|
||||
print(f"Private Key: {new_keypair['private_key']}")
|
||||
print(f"Public Key: {new_keypair['public_key']}\n")
|
||||
|
||||
print("Creating JWT...")
|
||||
jwt_token = registrar.create_jwt(keypair)
|
||||
jwt_token = registrar.create_jwt(new_keypair['did'])
|
||||
|
||||
print('\nSuccessfully generated DID with admin authorization!')
|
||||
print(f'Registration JWT: {jwt_token[:50]}...')
|
||||
|
||||
Reference in New Issue
Block a user