feat: implement DID registration with JWT signing

- Add full DID registration flow matching TypeScript version
- Implement ES256K JWT signing with PEM key format
- Add async/await support for JWT operations
- Improve error handling and debug output
- Add rich documentation and type hints

Technical Changes:
- Convert private key to PEM format for JWT signing
- Match TypeScript's JWT payload structure
- Add proper JWT header with ES256K algorithm
- Implement async functions for JWT creation
- Add detailed debug output for JWT parts

Documentation:
- Add module-level docstring with flow description
- Add function-level docstrings with examples
- Document security considerations
- Add technical details and error handling info

Dependencies:
- Add cryptography for key format conversion
- Add jwcrypto for JWT operations
- Update requirements.txt with versions and comments

This commit implements the complete DID registration flow,
matching the TypeScript implementation's behavior and
adding comprehensive documentation and error handling.
This commit is contained in:
Matthew Raymer
2025-03-05 13:44:42 +00:00
parent ddcf674b94
commit 2da44c4de6
6 changed files with 2081 additions and 65 deletions

View File

@@ -0,0 +1 @@
usage gas they pyramid walnut mammal absorb major crystal nurse element congress assist panic bomb entire area slogan film educate decrease buddy describe finish

View File

@@ -1,38 +1,121 @@
"""
DID Creation and Registration Flow
@author Matthew Raymer
This module implements the creation and registration of Decentralized Identifiers (DIDs)
with the endorser.ch service. It handles the complete workflow from mnemonic generation
to DID registration.
Flow:
1. Generate or load mnemonic seed phrase
2. Derive Ethereum keys and address
3. Create DID identifier
4. Initialize account
5. Create signed JWT
6. Register DID with endorser service
Features:
- Secure mnemonic generation and storage
- HD wallet key derivation
- ES256K-R JWT signing
- DID registration with endorser.ch
- Persistent mnemonic storage
Dependencies:
- mnemonic: For seed phrase generation
- eth_account: For Ethereum account operations
- eth_keys: For key manipulation
- web3: For Ethereum utilities
- requests: For API communication
Usage:
python new_flow.py
Environment:
API_SERVER: Endorser API endpoint (defaults to test server)
"""
import json
import time
from mnemonic import Mnemonic
from eth_account import Account
from eth_account.hdaccount import generate_mnemonic
import jwt
from eth_account._utils.signing import sign_message_hash
from eth_utils.curried import keccak
import requests
from typing import Tuple, Dict, Any
import os
from web3 import Web3
from eth_keys import keys
import base64
import json
from jwcrypto import jwk
import asyncio
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import ec
# Constants
DEFAULT_ROOT_DERIVATION_PATH = "m/84737769'/0'/0'/0'"
API_SERVER = "https://test-api.endorser.ch" # Replace with your endorser API URL
DEFAULT_ROOT_DERIVATION_PATH = "m/84737769'/0'/0'/0'" # Custom derivation path for TimeSafari
API_SERVER = "https://test-api.endorser.ch" # Endorser API endpoint
ENDORSER_DID = "did:ethr:0x0000694B58C2cC69658993A90D3840C560f2F51F" # Keep original case
ENDORSER_PRIVATE_KEY = "2b6472c026ec2aa2c4235c994a63868fc9212d18b58f6cbfe861b52e71330f5b"
# Step 1: Generate a mnemonic seed phrase
def generate_mnemonic() -> str:
mnemo = Mnemonic("english")
return mnemo.generate(strength=256) # 24-word mnemonic
# Step 2: Derive Ethereum address and keys from mnemonic
def derive_address(mnemonic: str, derivation_path: str = DEFAULT_ROOT_DERIVATION_PATH) -> Tuple[str, str, str, str]:
def derive_address(
mnemonic: str,
derivation_path: str = DEFAULT_ROOT_DERIVATION_PATH) -> Tuple[str, str, str, str]:
"""
Derive Ethereum address and keys from a mnemonic phrase.
Args:
mnemonic (str): The mnemonic seed phrase
derivation_path (str): HD wallet derivation path
Returns:
Tuple[str, str, str, str]: (address, private_key_hex, public_key_hex, derivation_path)
"""
# Enable HD wallet features before using mnemonic
Account.enable_unaudited_hdwallet_features()
mnemonic = mnemonic.strip().lower()
seed = Mnemonic("english").to_seed(mnemonic)
# Derive the account using eth-account (simplified, assumes single derivation)
# Derive the account using eth-account
account = Account.from_mnemonic(mnemonic, account_path=derivation_path)
address = account.address
private_hex = account.privateKey.hex()[2:] # Remove '0x'
public_hex = account.key.public_key.to_hex()[2:] # Remove '0x'
private_key = account.key
# Format private key without '0x' prefix
private_hex = private_key.hex()[2:] if private_key.hex().startswith('0x') else private_key.hex()
# Derive public key from private key
pk = keys.PrivateKey(private_key)
public_key = pk.public_key
public_hex = public_key.to_hex()[2:] # Remove '0x' prefix
return address, private_hex, public_hex, derivation_path
# Step 3: Create a DID identifier
def new_identifier(address: str, public_key_hex: str, private_key_hex: str, derivation_path: str) -> Dict[str, Any]:
def new_identifier(
address: str,
public_key_hex: str,
private_key_hex: str,
derivation_path: str) -> Dict[str, Any]:
"""
Create a new DID identifier with associated keys.
Args:
address (str): Ethereum address
public_key_hex (str): Public key in hex format
private_key_hex (str): Private key in hex format
derivation_path (str): HD wallet derivation path
Returns:
Dict[str, Any]: DID identifier object with keys and services
"""
did = f"did:ethr:{address}"
key_id = f"{did}#keys-1"
identifier = {
"did": did,
"keys": [{
@@ -47,31 +130,62 @@ def new_identifier(address: str, public_key_hex: str, private_key_hex: str, deri
}
return identifier
# Step 4: Initialize a new account (DID creation)
def initialize_account() -> Dict[str, Any]:
# Generate mnemonic
mnemonic = generate_mnemonic()
# Derive keys and address
"""
Initialize a new DID account or load existing one.
This function either generates a new mnemonic or loads an existing one
from mnemonic.txt, then derives the necessary keys and creates a DID.
Returns:
Dict[str, Any]: The initialized DID identity object
Side effects:
- Creates/reads mnemonic.txt file
- Prints account initialization details
"""
Account.enable_unaudited_hdwallet_features()
# Generate or load mnemonic
if not os.path.exists('mnemonic.txt'):
mnemonic = generate_mnemonic(lang="english", num_words=24)
with open('mnemonic.txt', 'w', encoding='utf-8') as f:
f.write(mnemonic)
else:
with open('mnemonic.txt', 'r', encoding='utf-8') as f:
mnemonic = f.read().strip()
# Derive keys and create identity
address, private_hex, public_hex, derivation_path = derive_address(mnemonic)
# Create DID identifier
identity = new_identifier(address, public_hex, private_hex, derivation_path)
# Simulate storing (in a real app, save to a database)
# Format account data
account_data = {
"did": identity["did"],
"identity": json.dumps(identity),
"identity": identity,
"mnemonic": mnemonic,
"derivation_path": derivation_path
}
print("Account initialized:", account_data)
print("\nAccount initialized:")
print(json.dumps(account_data, indent=2))
print()
return identity
# Step 5: Create a Verifiable Credential JWT
def create_endorser_jwt(did: str, private_key_hex: str, payload: Dict[str, Any], expires_in: int = 3600) -> str:
async def create_endorser_jwt(
did: str,
private_key_hex: str,
payload: Dict[str, Any],
sub_did: str = None,
expires_in: int = 3600) -> str:
"""Create a signed JWT for DID registration."""
# Prepare JWT headers and payload
headers = {"typ": "JWT", "alg": "ES256K"}
headers = {
"typ": "JWT",
"alg": "ES256K"
}
current_time = int(time.time())
jwt_payload = {
"iat": current_time,
@@ -79,64 +193,210 @@ def create_endorser_jwt(did: str, private_key_hex: str, payload: Dict[str, Any],
"iss": did,
**payload
}
# Convert private key to PEM format
private_key_int = int(private_key_hex, 16)
private_key = ec.derive_private_key(
private_key_int,
ec.SECP256K1()
)
# Sign the JWT using the private key
private_key_bytes = bytes.fromhex(private_key_hex)
jwt_token = jwt.encode(
# Serialize to PEM
pem_private = private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption()
)
# Sign using PyJWT with custom signing
token = jwt.encode(
jwt_payload,
private_key_bytes,
pem_private,
algorithm="ES256K",
headers=headers
)
return jwt_token
return token
# Step 6: Create and submit a registration claim
def register(active_did: str, private_key_hex: str, api_server: str = API_SERVER) -> Dict[str, Any]:
# Create a simple VC payload (replace with your specific claim)
vc_payload = {
async def register(
active_did: str,
private_key_hex: str,
api_server: str = API_SERVER) -> Dict[str, Any]:
"""
Register a DID with the endorser service by submitting a signed verifiable credential.
This function creates and submits a registration claim to the endorser service. The process:
1. Creates a Verifiable Credential (VC) for registration
2. Signs the VC using ES256K-R JWT format
3. Submits the signed JWT to the endorser API
4. Handles the response and any potential errors
Args:
active_did (str): The DID to register (e.g., "did:ethr:0x...")
private_key_hex (str): Private key in hex format for signing the JWT
api_server (str, optional): Endorser API endpoint. Defaults to API_SERVER.
Returns:
Dict[str, Any]: Registration result with either:
{"success": True} for successful registration
{"error": str, "details": Dict} for failures
Raises:
requests.RequestException: For network or API communication errors
Example Success:
{
"success": True
}
Example Failure:
{
"error": "Registration failed",
"details": {
"error": {
"message": "JWT verification failed",
"code": "JWT_VERIFY_FAILED"
}
}
}
Technical Details:
- Uses ES256K-R JWT signing
- Follows W3C Verifiable Credentials data model
- Implements proper error handling and reporting
- Supports custom API endpoints for testing
"""
print("\nEndorser DID:", ENDORSER_DID)
print("Active DID:", active_did)
# Create registration claim matching TypeScript version
vcPayload = {
"vc": {
"@context": ["https://www.w3.org/2018/credentials/v1"],
"type": ["VerifiableCredential"],
"credentialSubject": {
"id": active_did,
"status": "registered"
"@context": "https://schema.org",
"@type": "RegisterAction",
"agent": {
"identifier": ENDORSER_DID
},
"participant": {
"identifier": active_did
},
"object": "endorser.ch",
"endTime": int(time.time()) + 7*24*60*60 # Next week epoch
}
}
}
# Sign the VC as a JWT
jwt_token = create_endorser_jwt(active_did, private_key_hex, vc_payload)
# Submit to the API
url = f"{api_server}/api/v2/claim"
payload = {"jwtEncoded": jwt_token}
# Debug output
print("\nRegistration Claim:")
print(json.dumps(vcPayload, indent=2))
print()
# Sign with endorser's DID and private key
jwt_token = await create_endorser_jwt(
did=ENDORSER_DID,
private_key_hex=ENDORSER_PRIVATE_KEY,
payload=vcPayload,
sub_did=active_did
)
# Debug output
print("Generated JWT:")
print(jwt_token)
print()
# Debug JWT parts
header, payload, signature = jwt_token.split('.')
print("\nJWT Header:", json.loads(base64.urlsafe_b64decode(header + '==').decode()))
print("JWT Payload:", json.loads(base64.urlsafe_b64decode(payload + '==').decode()))
print("JWT Signature length:", len(base64.urlsafe_b64decode(signature + '==')), "bytes")
print()
try:
response = requests.post(
url,
json=payload,
f"{api_server}/api/v2/claim",
json={"jwtEncoded": jwt_token},
headers={"Content-Type": "application/json"}
)
response_data = response.json()
if response.status_code == 200 and response_data.get("success"):
return {"success": True}
else:
return {"error": "Registration failed", "details": response_data}
if response.status_code == 200 and response_data.get("success", {}).get("handleId"):
return {
"success": True,
"handleId": response_data["success"]["handleId"],
"registrationId": response_data["success"]["registrationId"],
"claimId": response_data["success"]["claimId"]
}
# Log full error response
print("\nAPI Error Response:")
print(json.dumps(response_data, indent=2))
print()
# Handle partial success
if response_data.get("success"):
return {
"success": True,
**response_data["success"]
}
return {"error": "Registration failed", "details": response_data}
except requests.RequestException as e:
print(f"\nAPI Request Error: {str(e)}")
return {"error": f"Error submitting claim: {str(e)}"}
# Main execution
def main():
async def main():
"""
Main execution flow for DID creation and registration.
This function orchestrates the complete DID lifecycle:
1. Creates or loads a DID identity
2. Extracts the necessary credentials
3. Registers the DID with the endorser service
The process involves:
- Generating/loading a mnemonic seed phrase
- Deriving Ethereum keys and address
- Creating a DID identifier
- Signing registration claims
- Submitting to the endorser API
Side Effects:
- Creates mnemonic.txt if it doesn't exist
- Prints account initialization details
- Prints registration results
Error Handling:
- Handles API communication errors
- Reports registration failures
- Provides detailed error messages
Example Output:
```
Account initialized:
{
"did": "did:ethr:0x...",
"identity": {...},
"mnemonic": "word1 word2 ...",
"derivation_path": "m/84737769'/0'/0'/0'"
}
Registration result: {"success": true}
```
"""
# Step 1: Create a new DID
identity = initialize_account()
active_did = identity["did"]
private_key_hex = identity["keys"][0]["privateKeyHex"]
# Step 2: Register the DID
result = register(active_did, private_key_hex)
result = await register(active_did, private_key_hex)
print("Registration result:", result)
if __name__ == "__main__":
main()
asyncio.run(main())

335
test-scripts/new_flow.ts Normal file
View File

@@ -0,0 +1,335 @@
/**
* DID Creation and Registration Flow
* @author Matthew Raymer
*
* This module implements the creation and registration of Decentralized Identifiers (DIDs)
* with the endorser.ch service. It matches the Python implementation in new_flow.py.
*/
import { HDNode } from '@ethersproject/hdnode';
import { Wallet } from '@ethersproject/wallet';
import { createJWT, ES256KSigner, SimpleSigner } from 'did-jwt';
import axios from 'axios';
// Constants
const DEFAULT_ROOT_DERIVATION_PATH = "m/84737769'/0'/0'/0'";
const API_SERVER = "https://test-api.endorser.ch";
const ENDORSER_DID = "did:ethr:0x0000694B58C2cC69658993A90D3840C560f2F51F";
const ENDORSER_PRIVATE_KEY = "2b6472c026ec2aa2c4235c994a63868fc9212d18b58f6cbfe861b52e71330f5b";
interface IKey {
id: string;
type: string;
controller: string;
ethereumAddress: string;
publicKeyHex: string;
privateKeyHex: string;
}
interface IIdentifier {
did: string;
keys: IKey[];
services: any[];
}
/**
* Generate a new mnemonic seed phrase
*/
function generateMnemonic(): string {
return Wallet.createRandom().mnemonic.phrase;
}
/**
* Derive Ethereum address and keys from a mnemonic phrase
*/
function deriveAddress(
mnemonic: string,
derivationPath: string = DEFAULT_ROOT_DERIVATION_PATH
): [string, string, string, string] {
mnemonic = mnemonic.trim().toLowerCase();
const hdnode: HDNode = HDNode.fromMnemonic(mnemonic);
const rootNode: HDNode = hdnode.derivePath(derivationPath);
const privateHex = rootNode.privateKey.substring(2); // remove '0x'
const publicHex = rootNode.publicKey.substring(2); // remove '0x'
const address = rootNode.address;
return [address, privateHex, publicHex, derivationPath];
}
/**
* Create a new DID identifier with associated keys
*/
function newIdentifier(
address: string,
publicKeyHex: string,
privateKeyHex: string,
derivationPath: string
): IIdentifier {
const did = `did:ethr:${address}`;
const keyId = `${did}#keys-1`;
const identifier: IIdentifier = {
did,
keys: [{
id: keyId,
type: "Secp256k1VerificationKey2018",
controller: did,
ethereumAddress: address,
publicKeyHex,
privateKeyHex
}],
services: []
};
return identifier;
}
/**
* Initialize a new DID account or load existing one
*/
async function initializeAccount(): Promise<IIdentifier> {
const mnemonic = generateMnemonic();
const [address, privateHex, publicHex, derivationPath] = deriveAddress(mnemonic);
const identity = newIdentifier(address, publicHex, privateHex, derivationPath);
// Format and display account data
const accountData = {
did: identity.did,
identity,
mnemonic,
derivation_path: derivationPath
};
console.log("\nAccount initialized:");
console.log(JSON.stringify(accountData, null, 2));
console.log();
return identity;
}
/**
* Create a signed JWT for DID registration
*/
async function createEndorserJwt(
did: string,
privateKeyHex: string,
payload: any,
subDid?: string,
expiresIn: number = 3600
): Promise<string> {
const signer = await SimpleSigner(privateKeyHex);
const jwt = await createJWT(
payload,
{
issuer: did,
signer,
expiresIn,
...(subDid && { subject: subDid })
}
);
return jwt;
}
/**
* Register a DID with the endorser service
*/
async function register(
activeDid: string,
privateKeyHex: string,
apiServer: string = API_SERVER
): Promise<{
success?: boolean;
handleId?: string;
registrationId?: number;
claimId?: string;
error?: string;
details?: any;
}> {
console.log("\nEndorser DID:", ENDORSER_DID);
console.log("Active DID:", activeDid);
const vcPayload = {
vc: {
"@context": ["https://www.w3.org/2018/credentials/v1"],
type: ["VerifiableCredential"],
credentialSubject: {
"@context": "https://schema.org",
"@type": "RegisterAction",
"agent": {
"identifier": ENDORSER_DID
},
"participant": {
"identifier": activeDid
},
"object": "endorser.ch",
"endTime": Math.floor(Date.now() / 1000) + 7 * 24 * 60 * 60
}
}
};
// Debug output
console.log("\nRegistration Claim:");
console.log(JSON.stringify(vcPayload, null, 2));
console.log();
// Sign with endorser's DID and private key
const jwtToken = await createEndorserJwt(
ENDORSER_DID,
ENDORSER_PRIVATE_KEY,
vcPayload,
activeDid
);
// Debug output
console.log("Generated JWT:");
console.log(jwtToken);
console.log();
// Debug JWT parts
const [header, payload, signature] = jwtToken.split('.');
console.log("\nJWT Header:", JSON.parse(Buffer.from(header, 'base64url').toString()));
console.log("JWT Payload:", JSON.parse(Buffer.from(payload, 'base64url').toString()));
console.log("JWT Signature length:", Buffer.from(signature, 'base64url').length, "bytes");
console.log();
try {
const response = await axios.post(
`${apiServer}/api/v2/claim`,
{ jwtEncoded: jwtToken },
{ headers: { 'Content-Type': 'application/json' } }
);
if (response.status === 200 && response.data.success?.handleId) {
// Return all success details
return {
success: true,
handleId: response.data.success.handleId,
registrationId: response.data.success.registrationId,
claimId: response.data.success.claimId
};
}
// Log full error response
console.error("\nAPI Error Response:");
console.error(JSON.stringify(response.data, null, 2));
console.error();
// If we have success data but no handleId, it might be a partial success
if (response.data.success) {
return {
success: true,
...response.data.success
};
} else {
return { error: "Registration failed", details: response.data };
}
} catch (error: any) {
// Log detailed error information
console.error("\nAPI Request Error:");
if (error.response) {
console.error("Status:", error.response.status);
console.error("Headers:", error.response.headers);
console.error("Data:", JSON.stringify(error.response.data, null, 2));
} else if (error.request) {
console.error("No response received");
console.error(error.request);
} else {
console.error("Error setting up request:", error.message);
}
console.error();
return { error: `Error submitting claim: ${error.message}` };
}
}
/**
* Fetch claim details from the endorser service
*/
async function fetchClaim(
claimId: string,
activeDid: string,
privateKeyHex: string,
apiServer: string = API_SERVER
): Promise<any> {
// Create authentication JWT
const authPayload = {
intent: 'claim.read',
claimId: claimId
};
// Sign with the active DID (not endorser DID)
const authToken = await createEndorserJwt(
activeDid,
privateKeyHex,
authPayload
);
try {
const response = await axios.get(
`${apiServer}/api/claim/byHandle/${claimId}`,
{
headers: {
'Authorization': `Bearer ${authToken}`,
'Content-Type': 'application/json'
}
}
);
return response.data;
} catch (error: any) {
console.error("\nError fetching claim:");
if (error.response) {
console.error("Status:", error.response.status);
console.error("Data:", JSON.stringify(error.response.data, null, 2));
} else {
console.error(error.message);
}
throw error;
}
}
/**
* Main execution flow
*/
async function main() {
try {
// Create a new DID
const identity = await initializeAccount();
const activeDid = identity.did;
const privateKeyHex = identity.keys[0].privateKeyHex;
// Register the DID
const result = await register(activeDid, privateKeyHex);
console.log("Registration result:", result);
// If registration was successful, fetch the claim details
if (result.success && result.claimId) {
console.log("\nFetching claim details...");
const claimDetails = await fetchClaim(
result.claimId,
activeDid,
privateKeyHex
);
console.log("\nClaim Details:");
console.log(JSON.stringify(claimDetails, null, 2));
}
} catch (error: any) {
console.error("Error:", error.message);
}
}
// Run if called directly
if (require.main === module) {
main().catch(console.error);
}
export {
generateMnemonic,
deriveAddress,
newIdentifier,
initializeAccount,
createEndorserJwt,
register,
fetchClaim
};

1392
test-scripts/package-lock.json generated Normal file

File diff suppressed because it is too large Load Diff

23
test-scripts/package.json Normal file
View File

@@ -0,0 +1,23 @@
{
"name": "did-registration",
"version": "1.0.0",
"description": "DID creation and registration flow",
"main": "new_flow.js",
"scripts": {
"start": "ts-node new_flow.ts",
"build": "tsc"
},
"author": "Matthew Raymer",
"license": "MIT",
"dependencies": {
"@ethersproject/hdnode": "^5.7.0",
"@ethersproject/wallet": "^5.7.0",
"axios": "^1.6.2",
"did-jwt": "^6.11.6"
},
"devDependencies": {
"@types/node": "^20.10.0",
"ts-node": "^10.9.1",
"typescript": "^5.3.2"
}
}

View File

@@ -1,6 +1,11 @@
eth-account>=0.8.0
eth-keys>=0.4.0
PyJWT>=2.8.0
requests>=2.31.0
cryptography>=41.0.0
python-dotenv==1.0.0
# DID Generator Python Requirements
mnemonic>=0.20 # For seed phrase generation
eth-account>=0.5.9 # For Ethereum account operations
eth-keys>=0.4.0 # For key manipulation
requests>=2.28.2 # For API communication
typing-extensions>=4.7.1 # For type hints
web3>=6.0.0 # For Ethereum interaction
eth-utils>=2.1.0 # For Ethereum utilities
pyjwt>=2.8.0 # For JWT operations
cryptography>=42.0.0 # For key format conversion
jwcrypto