You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
142 lines
4.5 KiB
142 lines
4.5 KiB
1 week ago
|
import json
|
||
|
import time
|
||
|
from mnemonic import Mnemonic
|
||
|
from eth_account import Account
|
||
|
import jwt
|
||
|
import requests
|
||
|
from typing import Tuple, Dict, Any
|
||
|
|
||
|
# Constants
|
||
|
DEFAULT_ROOT_DERIVATION_PATH = "m/84737769'/0'/0'/0'"
|
||
|
API_SERVER = "https://test-api.endorser.ch" # Replace with your endorser API URL
|
||
|
|
||
|
# Step 1: Generate a mnemonic seed phrase
|
||
|
def generate_mnemonic() -> str:
|
||
|
mnemo = Mnemonic("english")
|
||
|
return mnemo.generate(strength=256) # 24-word mnemonic
|
||
|
|
||
|
# Step 2: Derive Ethereum address and keys from mnemonic
|
||
|
def derive_address(mnemonic: str, derivation_path: str = DEFAULT_ROOT_DERIVATION_PATH) -> Tuple[str, str, str, str]:
|
||
|
mnemonic = mnemonic.strip().lower()
|
||
|
seed = Mnemonic("english").to_seed(mnemonic)
|
||
|
|
||
|
# Derive the account using eth-account (simplified, assumes single derivation)
|
||
|
account = Account.from_mnemonic(mnemonic, account_path=derivation_path)
|
||
|
address = account.address
|
||
|
private_hex = account.privateKey.hex()[2:] # Remove '0x'
|
||
|
public_hex = account.key.public_key.to_hex()[2:] # Remove '0x'
|
||
|
|
||
|
return address, private_hex, public_hex, derivation_path
|
||
|
|
||
|
# Step 3: Create a DID identifier
|
||
|
def new_identifier(address: str, public_key_hex: str, private_key_hex: str, derivation_path: str) -> Dict[str, Any]:
|
||
|
did = f"did:ethr:{address}"
|
||
|
key_id = f"{did}#keys-1"
|
||
|
|
||
|
identifier = {
|
||
|
"did": did,
|
||
|
"keys": [{
|
||
|
"id": key_id,
|
||
|
"type": "Secp256k1VerificationKey2018",
|
||
|
"controller": did,
|
||
|
"ethereumAddress": address,
|
||
|
"publicKeyHex": public_key_hex,
|
||
|
"privateKeyHex": private_key_hex
|
||
|
}],
|
||
|
"services": []
|
||
|
}
|
||
|
return identifier
|
||
|
|
||
|
# Step 4: Initialize a new account (DID creation)
|
||
|
def initialize_account() -> Dict[str, Any]:
|
||
|
# Generate mnemonic
|
||
|
mnemonic = generate_mnemonic()
|
||
|
|
||
|
# Derive keys and address
|
||
|
address, private_hex, public_hex, derivation_path = derive_address(mnemonic)
|
||
|
|
||
|
# Create DID identifier
|
||
|
identity = new_identifier(address, public_hex, private_hex, derivation_path)
|
||
|
|
||
|
# Simulate storing (in a real app, save to a database)
|
||
|
account_data = {
|
||
|
"did": identity["did"],
|
||
|
"identity": json.dumps(identity),
|
||
|
"mnemonic": mnemonic,
|
||
|
"derivation_path": derivation_path
|
||
|
}
|
||
|
print("Account initialized:", account_data)
|
||
|
return identity
|
||
|
|
||
|
# Step 5: Create a Verifiable Credential JWT
|
||
|
def create_endorser_jwt(did: str, private_key_hex: str, payload: Dict[str, Any], expires_in: int = 3600) -> str:
|
||
|
# Prepare JWT headers and payload
|
||
|
headers = {"typ": "JWT", "alg": "ES256K"}
|
||
|
current_time = int(time.time())
|
||
|
jwt_payload = {
|
||
|
"iat": current_time,
|
||
|
"exp": current_time + expires_in,
|
||
|
"iss": did,
|
||
|
**payload
|
||
|
}
|
||
|
|
||
|
# Sign the JWT using the private key
|
||
|
private_key_bytes = bytes.fromhex(private_key_hex)
|
||
|
jwt_token = jwt.encode(
|
||
|
jwt_payload,
|
||
|
private_key_bytes,
|
||
|
algorithm="ES256K",
|
||
|
headers=headers
|
||
|
)
|
||
|
return jwt_token
|
||
|
|
||
|
# Step 6: Create and submit a registration claim
|
||
|
def register(active_did: str, private_key_hex: str, api_server: str = API_SERVER) -> Dict[str, Any]:
|
||
|
# Create a simple VC payload (replace with your specific claim)
|
||
|
vc_payload = {
|
||
|
"vc": {
|
||
|
"@context": ["https://www.w3.org/2018/credentials/v1"],
|
||
|
"type": ["VerifiableCredential"],
|
||
|
"credentialSubject": {
|
||
|
"id": active_did,
|
||
|
"status": "registered"
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
# Sign the VC as a JWT
|
||
|
jwt_token = create_endorser_jwt(active_did, private_key_hex, vc_payload)
|
||
|
|
||
|
# Submit to the API
|
||
|
url = f"{api_server}/api/v2/claim"
|
||
|
payload = {"jwtEncoded": jwt_token}
|
||
|
|
||
|
try:
|
||
|
response = requests.post(
|
||
|
url,
|
||
|
json=payload,
|
||
|
headers={"Content-Type": "application/json"}
|
||
|
)
|
||
|
response_data = response.json()
|
||
|
|
||
|
if response.status_code == 200 and response_data.get("success"):
|
||
|
return {"success": True}
|
||
|
else:
|
||
|
return {"error": "Registration failed", "details": response_data}
|
||
|
|
||
|
except requests.RequestException as e:
|
||
|
return {"error": f"Error submitting claim: {str(e)}"}
|
||
|
|
||
|
# Main execution
|
||
|
def main():
|
||
|
# Step 1: Create a new DID
|
||
|
identity = initialize_account()
|
||
|
active_did = identity["did"]
|
||
|
private_key_hex = identity["keys"][0]["privateKeyHex"]
|
||
|
|
||
|
# Step 2: Register the DID
|
||
|
result = register(active_did, private_key_hex)
|
||
|
print("Registration result:", result)
|
||
|
|
||
|
if __name__ == "__main__":
|
||
|
main()
|