Skip to content

Cert module

The cert module provides functionality for managing client and server certificates used in mTLS connections with Akash providers.

Transaction parameters

All transaction functions support optional parameters like fee_amount, gas_limit, and gas_adjustment. See Transaction parameters for details.

CertClient class

from akash import AkashClient

client = AkashClient("https://akash-rpc.polkachu.com:443")

cert = client.cert

Certificate File Storage

By default, certificate files are stored in a certs/ directory relative to your current working directory (where you run your Python app):

  • Client certificate: certs/client.pem
  • Client private key: certs/client-key.pem
  • CA certificate: certs/ca.pem

Example: If you run python my_script.py from /home/user/project/, certificates will be stored in /home/user/project/certs/

These files are automatically created when you use create_certificate_for_mtls() or get_mtls_credentials().

Transactions

create_certificate()

Create and publish a client certificate for mTLS authentication.

def create_certificate(wallet, memo: str = "", fee_amount: str = "20000",
                       gas_limit: int = None, use_simulation: bool = True) -> BroadcastResult

Required parameters:

  • wallet (AkashWallet): Wallet instance for signing transaction

Optional parameters:

  • memo (str): Transaction memo (default: "")
  • fee_amount (str): Transaction fee amount in uakt (default: "20000")
  • gas_limit (int): Gas limit override (default: auto-calculated)
  • use_simulation (bool): Enable gas simulation (default: True)

Returns: BroadcastResult - Transaction result with the following fields:

  • tx_hash (str): Transaction hash
  • code (int): Response code (0 for success)
  • raw_log (str): Raw transaction log/error message
  • success (bool): Whether transaction succeeded
  • events (List[Dict]): Transaction events

Example:

from akash import AkashClient, AkashWallet

client = AkashClient("https://akash-rpc.polkachu.com:443")
wallet = AkashWallet.from_mnemonic("your mnemonic here")

result = client.cert.create_certificate(wallet)

if result.success:
    print(f"Certificate created: TX {result.tx_hash}")
else:
    print(f"Certificate creation failed: {result.raw_log}")

This function generates a complete certificate and private key pair, publishes the certificate to the blockchain, and stores the private key locally for mTLS authentication. If a certificate already exists for the wallet, it returns success without creating a duplicate.

publish_client_certificate()

Publish a client certificate to the blockchain.

def publish_client_certificate(wallet, cert_data: bytes, public_key: bytes,
                               memo: str = "", fee_amount: str = "20000",
                               gas_limit: int = None, gas_adjustment: float = 1.2,
                               use_simulation: bool = True) -> BroadcastResult

Required parameters:

  • wallet (AkashWallet): Wallet to sign the transaction
  • cert_data (bytes): Certificate data in bytes
  • public_key (bytes): Public key data in bytes

Optional parameters:

  • memo (str): Transaction memo (default: "")
  • fee_amount (str): Transaction fee amount in uakt (default: "20000")
  • gas_limit (int): Gas limit override (default: None)
  • gas_adjustment (float): Multiplier for gas estimation (default: 1.2)
  • use_simulation (bool): Enable gas simulation (default: True)

Returns: BroadcastResult with the following fields:

  • tx_hash (str): Transaction hash
  • code (int): Response code (0 for success)
  • raw_log (str): Raw transaction log/error message
  • success (bool): Whether transaction succeeded
  • events (List[Dict]): Transaction events
    • type (str): Event type
    • attributes (List[Dict]): Event attributes
      • key (str): Attribute key
      • value (str): Attribute value

Example:

from akash import AkashClient, AkashWallet

wallet = AkashWallet.from_mnemonic("your mnemonic here")
client = AkashClient("https://akash-rpc.polkachu.com:443")

with open("client.pem", "rb") as f:
    cert_data = f.read()

with open("client-key.pem", "rb") as f:
    key_data = f.read()

result = client.cert.publish_client_certificate(
    wallet=wallet,
    cert_data=cert_data,
    public_key=key_data
)

if result.success:
    print(f"Certificate published: {result.tx_hash}")
else:
    print(f"Failed to publish certificate: {result.raw_log}")

publish_server_certificate()

Publish a server certificate to the blockchain.

def publish_server_certificate(wallet, cert_data: bytes, public_key: bytes,
                               memo: str = "", fee_amount: str = "20000",
                               gas_limit: int = None, gas_adjustment: float = 1.2,
                               use_simulation: bool = True) -> BroadcastResult

Required parameters:

  • wallet (AkashWallet): Wallet to sign the transaction
  • cert_data (bytes): Certificate data in bytes
  • public_key (bytes): Public key data in bytes

Optional parameters:

  • memo (str): Transaction memo (default: "")
  • fee_amount (str): Transaction fee amount in uakt (default: "20000")
  • gas_limit (int): Gas limit override (default: None)
  • gas_adjustment (float): Multiplier for gas estimation (default: 1.2)
  • use_simulation (bool): Enable gas simulation (default: True)

Returns: BroadcastResult with the following fields:

  • tx_hash (str): Transaction hash
  • code (int): Response code (0 for success)
  • raw_log (str): Raw transaction log/error message
  • success (bool): Whether transaction succeeded
  • events (List[Dict]): Transaction events
    • type (str): Event type
    • attributes (List[Dict]): Event attributes
      • key (str): Attribute key
      • value (str): Attribute value

Example:

from akash import AkashClient, AkashWallet

wallet = AkashWallet.from_mnemonic("your mnemonic here")
client = AkashClient("https://akash-rpc.polkachu.com:443")

result = client.cert.publish_server_certificate(
    wallet=wallet,
    cert_data=cert_data,
    public_key=key_data,
    memo=""
)

if result.success:
    print(f"Server certificate published: {result.tx_hash}")

revoke_client_certificate()

Revoke a client certificate.

def revoke_client_certificate(wallet, serial: str,
                               memo: str = "", fee_amount: str = "20000",
                               gas_limit: int = None, gas_adjustment: float = 1.2,
                               use_simulation: bool = True) -> BroadcastResult

Required parameters:

  • wallet (AkashWallet): Wallet to sign the transaction (must be certificate owner)
  • serial (str): Certificate serial number to revoke

Optional parameters:

  • memo (str): Transaction memo (default: "")
  • fee_amount (str): Transaction fee amount in uakt (default: "20000")
  • gas_limit (int): Gas limit override (default: None)
  • gas_adjustment (float): Multiplier for gas estimation (default: 1.2)
  • use_simulation (bool): Enable gas simulation (default: True)

Returns: BroadcastResult with the following fields:

  • tx_hash (str): Transaction hash
  • code (int): Response code (0 for success)
  • raw_log (str): Raw transaction log/error message
  • success (bool): Whether transaction succeeded
  • events (List[Dict]): Transaction events
    • type (str): Event type
    • attributes (List[Dict]): Event attributes
      • key (str): Attribute key
      • value (str): Attribute value

Example:

from akash import AkashClient, AkashWallet

wallet = AkashWallet.from_mnemonic("your mnemonic here")
client = AkashClient("https://akash-rpc.polkachu.com:443")

result = client.cert.revoke_client_certificate(
    wallet=wallet,
    serial="abc123def456"
)

if result.success:
    print(f"Certificate revoked: {result.tx_hash}")

revoke_server_certificate()

Revoke a server certificate.

def revoke_server_certificate(wallet, serial: str,
                              memo: str = "", fee_amount: str = "20000",
                              gas_limit: int = None, gas_adjustment: float = 1.2,
                              use_simulation: bool = True) -> BroadcastResult

Required parameters:

  • wallet (AkashWallet): Wallet to sign the transaction (must be certificate owner)
  • serial (str): Certificate serial number to revoke

Optional parameters:

  • memo (str): Transaction memo (default: "")
  • fee_amount (str): Transaction fee amount in uakt (default: "20000")
  • gas_limit (int): Gas limit override (default: None)
  • gas_adjustment (float): Multiplier for gas estimation (default: 1.2)
  • use_simulation (bool): Enable gas simulation (default: True)

Returns: BroadcastResult with the following fields:

  • tx_hash (str): Transaction hash
  • code (int): Response code (0 for success)
  • raw_log (str): Raw transaction log/error message
  • success (bool): Whether transaction succeeded
  • events (List[Dict]): Transaction events
    • type (str): Event type
    • attributes (List[Dict]): Event attributes
      • key (str): Attribute key
      • value (str): Attribute value

Example:

from akash import AkashClient, AkashWallet

wallet = AkashWallet.from_mnemonic("your mnemonic here")
client = AkashClient("https://akash-rpc.polkachu.com:443")

result = client.cert.revoke_server_certificate(
    wallet=wallet,
    serial="xyz789abc123"
)

if result.success:
    print(f"Server certificate revoked: {result.tx_hash}")

create_certificate_for_mtls()

Create and store a client certificate for mTLS, saving to local files.

def create_certificate_for_mtls(wallet, ca_cert_path: Optional[str] = None) -> Dict[str, Any]

Required parameters:

  • wallet (AkashWallet): Wallet for signing transaction

Optional parameters:

  • ca_cert_path (str): Path to provider's CA certificate (default: None)

Returns: Dict[str, Any] with the following fields:

  • status (str): Operation status ("success" or "error")
  • tx_hash (str): Transaction hash (if successful)
  • cert_data (Dict): Certificate data
    • cert (str): Certificate in PEM format
    • key (str): Private key in PEM format
    • ca_cert (str): CA certificate in PEM format
    • serial (str): Certificate serial number
  • file_paths (Dict): Saved certificate file paths
  • client_cert (str): Client certificate file path
  • client_key (str): Client key file path
  • ca_cert (str): CA certificate file path
  • error (str): Error message (if status is "error")

Example:

from akash import AkashClient, AkashWallet

wallet = AkashWallet.from_mnemonic("your mnemonic here")
client = AkashClient("https://akash-rpc.polkachu.com:443")

result = client.cert.create_certificate_for_mtls(wallet=wallet)

if result["status"] == "success":
    print(f"Certificate created and published: {result['tx_hash']}")
    print(f"Client cert saved to: {result['file_paths']['client_cert']}")
    print(f"Client key saved to: {result['file_paths']['client_key']}")
    print(f"CA cert saved to: {result['file_paths']['ca_cert']}")
else:
    print(f"Certificate creation failed: {result['error']}")

Queries

get_certificates()

Get certificates from blockchain with optional filters.

def get_certificates(owner: str = None, serial: str = None, state: str = None,
                     limit: int = 100, offset: int = 0, count_total: bool = False,
                     reverse: bool = False) -> Dict[str, Any]

Optional parameters:

  • owner (str): Filter by certificate owner address (default: None)
  • serial (str): Filter by certificate serial number (default: None)
  • state (str): Filter by certificate state - "valid", "revoked", "invalid" (default: None)
  • limit (int): Maximum number of certificates to return (default: 100)
  • offset (int): Number of certificates to skip (default: 0)
  • count_total (bool): Whether to count total number of certificates (default: False)
  • reverse (bool): Whether to return results in reverse order (default: False)

Returns: Dict[str, Any] with the following fields:

  • certificates (List[Dict]): List of certificate information
    • serial (str): Certificate serial number
    • certificate (Dict): Certificate details
      • state (str): Certificate state ("valid", "revoked", "invalid")
      • cert (str): Base64-encoded certificate data
      • pubkey (str): Base64-encoded public key data
  • pagination (Dict): Pagination information
    • next_key (str): Key for next page
    • total (int): Total count (if count_total=True)

Example:

from akash import AkashClient

client = AkashClient("https://akash-rpc.polkachu.com:443")

certs = client.cert.get_certificates(
    owner="akash1qnnhhgzxj24f2kld5yhy4v4h4s9r295ak5gjw4",
    state="valid",
    limit=50
)

print(f"Found {len(certs['certificates'])} valid certificates")
for cert in certs["certificates"]:
    print(f"Serial: {cert['serial']}, State: {cert['certificate']['state']}")

if certs["pagination"]["next_key"]:
    print("More certificates available")

get_certificate()

Get a specific certificate by owner and serial number.

def get_certificate(owner: str, serial: str) -> Optional[Dict]

Required parameters:

  • owner (str): Certificate owner address
  • serial (str): Certificate serial number

Returns: Optional[Dict] - Certificate information or None if not found with the following fields:

  • serial (str): Certificate serial number
  • certificate (Dict): Certificate details
    • state (str): Certificate state
    • cert (str): Base64-encoded certificate data
    • pubkey (str): Base64-encoded public key data

Example:

from akash import AkashClient

client = AkashClient("https://akash-rpc.polkachu.com:443")

cert = client.cert.get_certificate(
    owner="akash1qnnhhgzxj24f2kld5yhy4v4h4s9r295ak5gjw4",
    serial="abc123def456"
)

if cert:
    print(f"Certificate state: {cert['certificate']['state']}")
else:
    print("Certificate not found")

get_mtls_credentials()

Get certificate data for mTLS from blockchain and save to files.

def get_mtls_credentials(owner: str, serial: str = None) -> Dict[str, Any]

Required parameters:

  • owner (str): Certificate owner's Akash address

Optional parameters:

  • serial (str): Certificate serial (default: None, gets latest if None)

Returns: Dict[str, Any] with the following fields:

  • status (str): Operation status ("success" or "error")
  • file_paths (Dict): Certificate file paths
    • client_cert (str): Client certificate file path
    • client_key (str): Client key file path
    • ca_cert (str): CA certificate file path
  • cert_data (Dict): Certificate data
    • cert (str): Certificate in PEM format
    • key (str): Private key in PEM format
    • serial (str): Certificate serial number
  • error (str): Error message (if failed)

Example:

from akash import AkashClient

client = AkashClient("https://akash-rpc.polkachu.com:443")

credentials = client.cert.get_mtls_credentials(
    owner="akash1qnnhhgzxj24f2kld5yhy4v4h4s9r295ak5gjw4"
)

if credentials["status"] == "success":
    print("Credentials retrieved and saved to files")
    print(f"Client cert: {credentials['file_paths']['client_cert']}")

Utils

get_cert_file_paths()

Get standard certificate file paths used for mTLS connections.

def get_cert_file_paths() -> Dict[str, str]

Returns: Dict[str, str] with the following fields:

  • client_cert (str): Path to client certificate file (default: certs/client.pem)
  • client_key (str): Path to client private key file (default: certs/client-key.pem)
  • ca_cert (str): Path to CA certificate file (default: certs/ca.pem)

Example:

from akash import AkashClient

client = AkashClient("https://akash-rpc.polkachu.com:443")

paths = client.cert.get_cert_file_paths()

print(f"Client cert: {paths['client_cert']}")
print(f"Client key: {paths['client_key']}")
print(f"CA cert: {paths['ca_cert']}")

check_cert_files_exist()

Check if all required certificate files exist for mTLS connections. This checks for files in the default certs/ directory.

def check_cert_files_exist() -> bool

Returns: bool - True if all certificate files exist in certs/ directory, False otherwise

Example:

import os
from akash import AkashClient

client = AkashClient("https://akash-rpc.polkachu.com:443")

if client.cert.check_cert_files_exist():
    print("All certificate files are present in certs/ directory")
else:
    print("Some certificate files are missing")

    paths = client.cert.get_cert_file_paths()
    for name, path in paths.items():
        if os.path.exists(path):
            print(f"[EXISTS] {name}: {path}")
        else:
            print(f"[MISSING] {name}: {path}")

create_ssl_context()

Create SSL context for mTLS connections with providers. This uses the certificate files from the certs/ directory.

def create_ssl_context() -> Dict[str, Any]

Prerequisites: Certificate files must exist in certs/ directory. Use create_certificate_for_mtls() or get_mtls_credentials() to create them.

Returns: Dict[str, Any] with the following fields:

  • status (str): Operation status ("success" or "error")
  • ssl_context (ssl.SSLContext): SSL context object (if successful)
  • error (str): Error message (if failed)

Example:

from akash import AkashClient

client = AkashClient("https://akash-rpc.polkachu.com:443")

if not client.cert.check_cert_files_exist():
    print("Certificate files not found. Creating them...")
    client.cert.create_certificate_for_mtls(wallet)

result = client.cert.create_ssl_context()

if result["status"] == "success":
    ssl_context = result["ssl_context"]
    print("SSL context created successfully from certs/ directory")
else:
    print(f"Failed to create SSL context: {result['error']}")

validate_ssl_certificate()

Validate SSL certificate PEM format using cryptography library.

def validate_ssl_certificate(cert_pem: str) -> bool

Required parameters:

  • cert_pem (str): Certificate in PEM format

Returns: bool - True if certificate is valid, False otherwise

Example:

from akash import AkashClient

client = AkashClient("https://akash-rpc.polkachu.com:443")

with open("client.pem", "r") as f:
    cert_pem = f.read()

if client.cert.validate_ssl_certificate(cert_pem):
    print("Certificate is valid")
else:
    print("Certificate is invalid or malformed")

verify_certificate_files()

Verify that certificate files exist and are readable for gRPC. Checks files in the certs/ directory.

def verify_certificate_files() -> Dict[str, Any]

Returns: Dict[str, Any] with the following fields:

  • status (str): Verification status ("success", "partial", "failed")
  • files (Dict): Per-file verification results
    • client_cert (Dict): Client certificate file status
      • exists (bool): Whether file exists
      • readable (bool): Whether file is readable
      • size (int, optional): File size in bytes (if exists and readable)
      • error (str, optional): Error message (if not readable)
    • client_key (Dict): Client key file status
      • exists (bool): Whether file exists
      • readable (bool): Whether file is readable
      • size (int, optional): File size in bytes (if exists and readable)
      • error (str, optional): Error message (if not readable)
    • ca_cert (Dict): CA certificate file status
      • exists (bool): Whether file exists
      • readable (bool): Whether file is readable
      • size (int, optional): File size in bytes (if exists and readable)
      • error (str, optional): Error message (if not readable)

Example:

from akash import AkashClient

client = AkashClient("https://akash-rpc.polkachu.com:443")

verification = client.cert.verify_certificate_files()

print(f"Verification status: {verification['status']}")
for file_type, info in verification["files"].items():
    if info["exists"]:
        print(f"{file_type}: exists, readable: {info['readable']}")
    else:
        print(f"{file_type}: missing")

check_expiry()

Check certificate expiry status and calculate days remaining.

def check_expiry(certificate: Dict[str, Any]) -> Dict[str, Any]

Required parameters:

  • certificate (Dict[str, Any]): Certificate dictionary from get_certificates
    • serial (str): Certificate serial number
    • certificate (Dict): Certificate details
      • state (str): Certificate state
      • cert (str): Base64-encoded certificate data
      • pubkey (str): Base64-encoded public key data

Returns: Dict[str, Any] with the following fields:

  • expired (bool): Whether certificate has expired
  • valid_until (str): ISO datetime string of expiry date (if parseable)
  • days_remaining (int): Days until expiry (0 if expired)
  • message (str): Human-readable status message

Example:

from akash import AkashClient, AkashWallet

wallet = AkashWallet.from_mnemonic("your mnemonic here")
client = AkashClient("https://akash-rpc.polkachu.com:443")

cert_response = client.cert.get_certificates(owner=wallet.address)
certificates = cert_response.get('certificates', [])

if certificates:
    cert = certificates[0]

    expiry_info = client.cert.check_expiry(cert)

    print(f"Certificate status: {expiry_info['message']}")
    print(f"Expired: {expiry_info['expired']}")

    if expiry_info['valid_until']:
        print(f"Valid until: {expiry_info['valid_until']}")

    if expiry_info['days_remaining'] is not None:
        if expiry_info['days_remaining'] < 30:
            print(f"Warning: certificate expires in {expiry_info['days_remaining']} days")
        else:
            print(f"Certificate valid for {expiry_info['days_remaining']} days")

validate_certificate()

Validate certificate structure and data.

def validate_certificate(cert_data: Dict) -> bool

Required parameters:

  • cert_data (Dict): Certificate data to validate
    • cert (str): Certificate in PEM format
    • key (str): Private key in PEM format
    • serial (str, optional): Certificate serial number

Returns: bool - True if valid, False otherwise

Example:

from akash import AkashClient

client = AkashClient("https://akash-rpc.polkachu.com:443")

cert = client.cert.get_certificate(owner, serial)
if cert:
    is_valid = client.cert.validate_certificate(cert)
    print(f"Certificate valid: {is_valid}")

ensure_certificate()

Ensure a valid certificate exists for mTLS communication. This function validates existing certificates and creates new ones if needed.

def ensure_certificate(wallet) -> tuple

Validation performed:

  • Checks if certificate has expired or is not yet valid
  • Verifies certificate serial number exists on blockchain
  • Validates private key matches certificate public key
  • Removes invalid certificates and creates new ones automatically

Required parameters:

  • wallet (AkashWallet): Wallet instance

Returns: tuple with the following elements:

  • success (bool): Whether certificate is ready for use
  • cert_pem (str): Certificate in PEM format (or None if failed)
  • key_pem (str): Private key in PEM format (or None if failed)

Example:

from akash import AkashClient, AkashWallet

client = AkashClient("https://akash-rpc.polkachu.com:443")
wallet = AkashWallet.from_mnemonic("your mnemonic here")

success, cert_pem, key_pem = client.cert.ensure_certificate(wallet)

if success:
    print("Certificate validated and ready for use")
else:
    print("Failed to ensure certificate")

This function checks certificate cache first, then validates local certificate files against blockchain. If validation fails (expired, not on blockchain, or mismatched private key), it automatically creates a new certificate.

generate_certificate_serial()

Generate a deterministic serial number for a certificate.

def generate_certificate_serial(owner: str, cert_data: bytes) -> str

Required parameters:

  • owner (str): Certificate owner address
  • cert_data (bytes): Certificate data

Returns: str - Certificate serial number

Example:

from akash import AkashClient

client = AkashClient("https://akash-rpc.polkachu.com:443")

with open("cert.pem", "rb") as f:
    cert_data = f.read()

serial = client.cert.generate_certificate_serial(
    owner="akash1...",
    cert_data=cert_data
)
print(f"Certificate serial: {serial}")

cleanup_certificates()

Clean up locally stored certificate files from the certs/ directory.

def cleanup_certificates() -> Dict[str, Any]

Returns: Dict[str, Any] with the following fields:

  • status (str): Cleanup status ("success" or "error")
  • cleaned_files (List[str]): List of cleaned file names
  • message (str): Status message

Example:

from akash import AkashClient

client = AkashClient("https://akash-rpc.polkachu.com:443")

result = client.cert.cleanup_certificates()

if result["status"] == "success":
    print(f"Cleaned up {len(result['cleaned_files'])} certificate files from certs/ directory")
    print(f"Removed files: {', '.join(result['cleaned_files'])}")
else:
    print(f"Cleanup failed: {result.get('error', 'Unknown error')}")