You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
182 lines
6.5 KiB
182 lines
6.5 KiB
1 week ago
|
from eth_account import Account
|
||
|
import json
|
||
|
import base64
|
||
|
from eth_account.messages import encode_defunct
|
||
|
from eth_keys import keys
|
||
|
import time
|
||
|
import requests
|
||
|
import argparse
|
||
|
import secrets
|
||
|
import hashlib
|
||
|
|
||
|
class DIDRegistration:
|
||
|
def __init__(self, admin_did: str):
|
||
|
self.api_url = "https://api.endorser.ch/api/v2/claim"
|
||
|
self.admin_did = admin_did
|
||
|
Account.enable_unaudited_hdwallet_features()
|
||
|
|
||
|
def create_keypair(self):
|
||
|
"""Generate a new Ethereum keypair"""
|
||
|
private_key = secrets.token_hex(32)
|
||
|
|
||
|
# Create private key object and derive public key
|
||
|
private_key_bytes = bytes.fromhex(private_key)
|
||
|
private_key_obj = keys.PrivateKey(private_key_bytes)
|
||
|
|
||
|
# Get compressed public key (like ethers.js)
|
||
|
public_key_obj = private_key_obj.public_key
|
||
|
public_key = '0x' + public_key_obj.to_compressed_bytes().hex()
|
||
|
|
||
|
# Create account from private key (for address)
|
||
|
account = Account.from_key('0x' + private_key)
|
||
|
|
||
|
return {
|
||
|
'private_key': private_key, # No 0x prefix
|
||
|
'public_key': public_key, # With 0x prefix, compressed format
|
||
|
'address': account.address,
|
||
|
'did': f"did:ethr:{account.address}"
|
||
|
}
|
||
|
|
||
|
def sign_jwt(self, payload: dict, private_key: str, did: str) -> str:
|
||
|
"""Sign JWT using ES256K like the TS version"""
|
||
|
# Add issuer to payload like did-jwt does
|
||
|
full_payload = {
|
||
|
**payload,
|
||
|
"iss": did
|
||
|
}
|
||
|
|
||
|
header = {
|
||
|
"typ": "JWT",
|
||
|
"alg": "ES256K"
|
||
|
}
|
||
|
|
||
|
# Create the JWT segments
|
||
|
header_b64 = base64.urlsafe_b64encode(json.dumps(header, separators=(',', ':')).encode()).decode().rstrip('=')
|
||
|
payload_b64 = base64.urlsafe_b64encode(json.dumps(full_payload, separators=(',', ':')).encode()).decode().rstrip('=')
|
||
|
message = f"{header_b64}.{payload_b64}"
|
||
|
|
||
|
# Hash the message with sha256 first (like did-jwt)
|
||
|
message_hash = hashlib.sha256(message.encode()).digest()
|
||
|
|
||
|
# Sign the hash directly (not as an Ethereum message)
|
||
|
private_key_bytes = bytes.fromhex(private_key)
|
||
|
signed = Account._sign_hash(message_hash, private_key_bytes) # Use internal _sign_hash
|
||
|
|
||
|
# Get r and s from signature
|
||
|
r = signed.r.to_bytes(32, 'big')
|
||
|
s = signed.s.to_bytes(32, 'big')
|
||
|
signature_bytes = r + s
|
||
|
|
||
|
signature = base64.urlsafe_b64encode(signature_bytes).decode().rstrip('=')
|
||
|
|
||
|
return f"{message}.{signature}"
|
||
|
|
||
|
def create_jwt(self, keypair: dict) -> str:
|
||
|
"""Create a signed JWT for registration"""
|
||
|
now = int(time.time())
|
||
|
|
||
|
# Create registration claim with admin as agent
|
||
|
register_claim = {
|
||
|
"@context": "https://schema.org",
|
||
|
"@type": "RegisterAction",
|
||
|
"agent": { "did": self.admin_did },
|
||
|
"participant": { "did": keypair['did'] },
|
||
|
"object": "endorser.ch"
|
||
|
}
|
||
|
|
||
|
# Match the TypeScript vcPayload exactly - no iss field
|
||
|
payload = {
|
||
|
"iat": now,
|
||
|
"exp": now + 300,
|
||
|
"sub": "RegisterAction",
|
||
|
"vc": {
|
||
|
"@context": ["https://www.w3.org/2018/credentials/v1"],
|
||
|
"type": ["VerifiableCredential"],
|
||
|
"credentialSubject": register_claim
|
||
|
}
|
||
|
}
|
||
|
|
||
|
print(f"\nDebug - JWT payload: {json.dumps(payload, indent=2)}")
|
||
|
|
||
|
# Sign with new DID's private key
|
||
|
return self.sign_jwt(payload, keypair['private_key'], keypair['did'])
|
||
|
|
||
|
def register_did(self, jwt_token: str) -> dict:
|
||
|
"""Submit the registration to the server"""
|
||
|
try:
|
||
|
response = requests.post(
|
||
|
self.api_url,
|
||
|
json={"jwtEncoded": jwt_token},
|
||
|
headers={'Content-Type': 'application/json'}
|
||
|
)
|
||
|
|
||
|
print(f"\nServer Response Status: {response.status_code}")
|
||
|
print(f"Server Response Body: {response.text}")
|
||
|
|
||
|
if response.status_code in [200, 201]:
|
||
|
return {
|
||
|
'success': True,
|
||
|
'response': response.json()
|
||
|
}
|
||
|
else:
|
||
|
try:
|
||
|
error_json = response.json()
|
||
|
error_msg = error_json.get('error', {}).get('message', 'Unknown error')
|
||
|
return {
|
||
|
'success': False,
|
||
|
'error': f"Registration failed ({response.status_code}): {error_msg}",
|
||
|
'response': error_json
|
||
|
}
|
||
|
except json.JSONDecodeError:
|
||
|
return {
|
||
|
'success': False,
|
||
|
'error': f"Registration failed ({response.status_code}): {response.text}",
|
||
|
'response': response.text
|
||
|
}
|
||
|
except Exception as e:
|
||
|
return {
|
||
|
'success': False,
|
||
|
'error': f"Request failed: {str(e)}"
|
||
|
}
|
||
|
|
||
|
def main():
|
||
|
parser = argparse.ArgumentParser(description='Generate a DID with admin authorization')
|
||
|
parser.add_argument('admin_did',
|
||
|
help='Admin DID (e.g., did:ethr:0x0000694B58C2cC69658993A90D3840C560f2F51F)')
|
||
|
args = parser.parse_args()
|
||
|
|
||
|
print('Starting DID Generation...\n')
|
||
|
print(f"Using admin DID: {args.admin_did}")
|
||
|
registrar = DIDRegistration(args.admin_did)
|
||
|
|
||
|
print("Generating new keypair...")
|
||
|
keypair = registrar.create_keypair()
|
||
|
|
||
|
print("\nGenerated DID Details:")
|
||
|
print("----------------------")
|
||
|
print(f"DID: {keypair['did']}")
|
||
|
print(f"Admin DID: {args.admin_did}")
|
||
|
print(f"Address: {keypair['address']}")
|
||
|
print(f"Private Key: {keypair['private_key']}")
|
||
|
print(f"Public Key: {keypair['public_key']}\n") # Store without any 0x prefix
|
||
|
|
||
|
print("Creating JWT...")
|
||
|
jwt_token = registrar.create_jwt(keypair)
|
||
|
|
||
|
print('\nSuccessfully generated DID with admin authorization!')
|
||
|
print(f'Registration JWT: {jwt_token[:50]}...')
|
||
|
|
||
|
print("\nAttempting registration...")
|
||
|
result = registrar.register_did(jwt_token)
|
||
|
if result['success']:
|
||
|
print(f"Registration successful!")
|
||
|
print(f"Response: {json.dumps(result['response'], indent=2)}")
|
||
|
else:
|
||
|
print(f"Registration failed!")
|
||
|
print(f"Error: {result['error']}")
|
||
|
if 'response' in result:
|
||
|
print(f"Full response: {json.dumps(result['response'], indent=2)}")
|
||
|
|
||
|
if __name__ == "__main__":
|
||
|
main()
|