Cert module¶
The cert module provides functionality for managing client and server certificates used in mTLS connections with Akash providers.
Transaction parameters
All transaction functions support optional parameters like fee_amount
, gas_limit
, and gas_adjustment
. See Transaction parameters for details.
CertClient class¶
from akash import AkashClient
client = AkashClient("https://akash-rpc.polkachu.com:443")
cert = client.cert
Certificate File Storage
By default, certificate files are stored in a certs/
directory relative to your current working directory (where you run your Python app):
- Client certificate:
certs/client.pem
- Client private key:
certs/client-key.pem
- CA certificate:
certs/ca.pem
Example: If you run python my_script.py
from /home/user/project/
, certificates will be stored in /home/user/project/certs/
These files are automatically created when you use create_certificate_for_mtls()
or get_mtls_credentials()
.
Transactions¶
create_certificate()¶
Create and publish a client certificate for mTLS authentication.
def create_certificate(wallet, memo: str = "", fee_amount: str = "20000",
gas_limit: int = None, use_simulation: bool = True) -> BroadcastResult
Required parameters:
wallet
(AkashWallet
): Wallet instance for signing transaction
Optional parameters:
memo
(str
): Transaction memo (default: "")fee_amount
(str
): Transaction fee amount in uakt (default: "20000")gas_limit
(int
): Gas limit override (default: auto-calculated)use_simulation
(bool
): Enable gas simulation (default: True)
Returns: BroadcastResult
- Transaction result with the following fields:
tx_hash
(str
): Transaction hashcode
(int
): Response code (0 for success)raw_log
(str
): Raw transaction log/error messagesuccess
(bool
): Whether transaction succeededevents
(List[Dict]
): Transaction events
Example:
from akash import AkashClient, AkashWallet
client = AkashClient("https://akash-rpc.polkachu.com:443")
wallet = AkashWallet.from_mnemonic("your mnemonic here")
result = client.cert.create_certificate(wallet)
if result.success:
print(f"Certificate created: TX {result.tx_hash}")
else:
print(f"Certificate creation failed: {result.raw_log}")
This function generates a complete certificate and private key pair, publishes the certificate to the blockchain, and stores the private key locally for mTLS authentication. If a certificate already exists for the wallet, it returns success without creating a duplicate.
publish_client_certificate()¶
Publish a client certificate to the blockchain.
def publish_client_certificate(wallet, cert_data: bytes, public_key: bytes,
memo: str = "", fee_amount: str = "20000",
gas_limit: int = None, gas_adjustment: float = 1.2,
use_simulation: bool = True) -> BroadcastResult
Required parameters:
wallet
(AkashWallet
): Wallet to sign the transactioncert_data
(bytes
): Certificate data in bytespublic_key
(bytes
): Public key data in bytes
Optional parameters:
memo
(str
): Transaction memo (default: "")fee_amount
(str
): Transaction fee amount in uakt (default: "20000")gas_limit
(int
): Gas limit override (default: None)gas_adjustment
(float
): Multiplier for gas estimation (default: 1.2)use_simulation
(bool
): Enable gas simulation (default: True)
Returns: BroadcastResult
with the following fields:
tx_hash
(str
): Transaction hashcode
(int
): Response code (0 for success)raw_log
(str
): Raw transaction log/error messagesuccess
(bool
): Whether transaction succeededevents
(List[Dict]
): Transaction eventstype
(str
): Event typeattributes
(List[Dict]
): Event attributeskey
(str
): Attribute keyvalue
(str
): Attribute value
Example:
from akash import AkashClient, AkashWallet
wallet = AkashWallet.from_mnemonic("your mnemonic here")
client = AkashClient("https://akash-rpc.polkachu.com:443")
with open("client.pem", "rb") as f:
cert_data = f.read()
with open("client-key.pem", "rb") as f:
key_data = f.read()
result = client.cert.publish_client_certificate(
wallet=wallet,
cert_data=cert_data,
public_key=key_data
)
if result.success:
print(f"Certificate published: {result.tx_hash}")
else:
print(f"Failed to publish certificate: {result.raw_log}")
publish_server_certificate()¶
Publish a server certificate to the blockchain.
def publish_server_certificate(wallet, cert_data: bytes, public_key: bytes,
memo: str = "", fee_amount: str = "20000",
gas_limit: int = None, gas_adjustment: float = 1.2,
use_simulation: bool = True) -> BroadcastResult
Required parameters:
wallet
(AkashWallet
): Wallet to sign the transactioncert_data
(bytes
): Certificate data in bytespublic_key
(bytes
): Public key data in bytes
Optional parameters:
memo
(str
): Transaction memo (default: "")fee_amount
(str
): Transaction fee amount in uakt (default: "20000")gas_limit
(int
): Gas limit override (default: None)gas_adjustment
(float
): Multiplier for gas estimation (default: 1.2)use_simulation
(bool
): Enable gas simulation (default: True)
Returns: BroadcastResult
with the following fields:
tx_hash
(str
): Transaction hashcode
(int
): Response code (0 for success)raw_log
(str
): Raw transaction log/error messagesuccess
(bool
): Whether transaction succeededevents
(List[Dict]
): Transaction eventstype
(str
): Event typeattributes
(List[Dict]
): Event attributeskey
(str
): Attribute keyvalue
(str
): Attribute value
Example:
from akash import AkashClient, AkashWallet
wallet = AkashWallet.from_mnemonic("your mnemonic here")
client = AkashClient("https://akash-rpc.polkachu.com:443")
result = client.cert.publish_server_certificate(
wallet=wallet,
cert_data=cert_data,
public_key=key_data,
memo=""
)
if result.success:
print(f"Server certificate published: {result.tx_hash}")
revoke_client_certificate()¶
Revoke a client certificate.
def revoke_client_certificate(wallet, serial: str,
memo: str = "", fee_amount: str = "20000",
gas_limit: int = None, gas_adjustment: float = 1.2,
use_simulation: bool = True) -> BroadcastResult
Required parameters:
wallet
(AkashWallet
): Wallet to sign the transaction (must be certificate owner)serial
(str
): Certificate serial number to revoke
Optional parameters:
memo
(str
): Transaction memo (default: "")fee_amount
(str
): Transaction fee amount in uakt (default: "20000")gas_limit
(int
): Gas limit override (default: None)gas_adjustment
(float
): Multiplier for gas estimation (default: 1.2)use_simulation
(bool
): Enable gas simulation (default: True)
Returns: BroadcastResult
with the following fields:
tx_hash
(str
): Transaction hashcode
(int
): Response code (0 for success)raw_log
(str
): Raw transaction log/error messagesuccess
(bool
): Whether transaction succeededevents
(List[Dict]
): Transaction eventstype
(str
): Event typeattributes
(List[Dict]
): Event attributeskey
(str
): Attribute keyvalue
(str
): Attribute value
Example:
from akash import AkashClient, AkashWallet
wallet = AkashWallet.from_mnemonic("your mnemonic here")
client = AkashClient("https://akash-rpc.polkachu.com:443")
result = client.cert.revoke_client_certificate(
wallet=wallet,
serial="abc123def456"
)
if result.success:
print(f"Certificate revoked: {result.tx_hash}")
revoke_server_certificate()¶
Revoke a server certificate.
def revoke_server_certificate(wallet, serial: str,
memo: str = "", fee_amount: str = "20000",
gas_limit: int = None, gas_adjustment: float = 1.2,
use_simulation: bool = True) -> BroadcastResult
Required parameters:
wallet
(AkashWallet
): Wallet to sign the transaction (must be certificate owner)serial
(str
): Certificate serial number to revoke
Optional parameters:
memo
(str
): Transaction memo (default: "")fee_amount
(str
): Transaction fee amount in uakt (default: "20000")gas_limit
(int
): Gas limit override (default: None)gas_adjustment
(float
): Multiplier for gas estimation (default: 1.2)use_simulation
(bool
): Enable gas simulation (default: True)
Returns: BroadcastResult
with the following fields:
tx_hash
(str
): Transaction hashcode
(int
): Response code (0 for success)raw_log
(str
): Raw transaction log/error messagesuccess
(bool
): Whether transaction succeededevents
(List[Dict]
): Transaction eventstype
(str
): Event typeattributes
(List[Dict]
): Event attributeskey
(str
): Attribute keyvalue
(str
): Attribute value
Example:
from akash import AkashClient, AkashWallet
wallet = AkashWallet.from_mnemonic("your mnemonic here")
client = AkashClient("https://akash-rpc.polkachu.com:443")
result = client.cert.revoke_server_certificate(
wallet=wallet,
serial="xyz789abc123"
)
if result.success:
print(f"Server certificate revoked: {result.tx_hash}")
create_certificate_for_mtls()¶
Create and store a client certificate for mTLS, saving to local files.
Required parameters:
wallet
(AkashWallet
): Wallet for signing transaction
Optional parameters:
ca_cert_path
(str
): Path to provider's CA certificate (default: None)
Returns: Dict[str, Any]
with the following fields:
status
(str
): Operation status ("success" or "error")tx_hash
(str
): Transaction hash (if successful)cert_data
(Dict
): Certificate datacert
(str
): Certificate in PEM formatkey
(str
): Private key in PEM formatca_cert
(str
): CA certificate in PEM formatserial
(str
): Certificate serial number
file_paths
(Dict
): Saved certificate file pathsclient_cert
(str
): Client certificate file pathclient_key
(str
): Client key file pathca_cert
(str
): CA certificate file patherror
(str
): Error message (if status is "error")
Example:
from akash import AkashClient, AkashWallet
wallet = AkashWallet.from_mnemonic("your mnemonic here")
client = AkashClient("https://akash-rpc.polkachu.com:443")
result = client.cert.create_certificate_for_mtls(wallet=wallet)
if result["status"] == "success":
print(f"Certificate created and published: {result['tx_hash']}")
print(f"Client cert saved to: {result['file_paths']['client_cert']}")
print(f"Client key saved to: {result['file_paths']['client_key']}")
print(f"CA cert saved to: {result['file_paths']['ca_cert']}")
else:
print(f"Certificate creation failed: {result['error']}")
Queries¶
get_certificates()¶
Get certificates from blockchain with optional filters.
def get_certificates(owner: str = None, serial: str = None, state: str = None,
limit: int = 100, offset: int = 0, count_total: bool = False,
reverse: bool = False) -> Dict[str, Any]
Optional parameters:
owner
(str
): Filter by certificate owner address (default: None)serial
(str
): Filter by certificate serial number (default: None)state
(str
): Filter by certificate state - "valid", "revoked", "invalid" (default: None)limit
(int
): Maximum number of certificates to return (default: 100)offset
(int
): Number of certificates to skip (default: 0)count_total
(bool
): Whether to count total number of certificates (default: False)reverse
(bool
): Whether to return results in reverse order (default: False)
Returns: Dict[str, Any]
with the following fields:
certificates
(List[Dict]
): List of certificate informationserial
(str
): Certificate serial numbercertificate
(Dict
): Certificate detailsstate
(str
): Certificate state ("valid", "revoked", "invalid")cert
(str
): Base64-encoded certificate datapubkey
(str
): Base64-encoded public key data
pagination
(Dict
): Pagination informationnext_key
(str
): Key for next pagetotal
(int
): Total count (if count_total=True)
Example:
from akash import AkashClient
client = AkashClient("https://akash-rpc.polkachu.com:443")
certs = client.cert.get_certificates(
owner="akash1qnnhhgzxj24f2kld5yhy4v4h4s9r295ak5gjw4",
state="valid",
limit=50
)
print(f"Found {len(certs['certificates'])} valid certificates")
for cert in certs["certificates"]:
print(f"Serial: {cert['serial']}, State: {cert['certificate']['state']}")
if certs["pagination"]["next_key"]:
print("More certificates available")
get_certificate()¶
Get a specific certificate by owner and serial number.
Required parameters:
owner
(str
): Certificate owner addressserial
(str
): Certificate serial number
Returns: Optional[Dict]
- Certificate information or None if not found with the following fields:
serial
(str
): Certificate serial numbercertificate
(Dict
): Certificate detailsstate
(str
): Certificate statecert
(str
): Base64-encoded certificate datapubkey
(str
): Base64-encoded public key data
Example:
from akash import AkashClient
client = AkashClient("https://akash-rpc.polkachu.com:443")
cert = client.cert.get_certificate(
owner="akash1qnnhhgzxj24f2kld5yhy4v4h4s9r295ak5gjw4",
serial="abc123def456"
)
if cert:
print(f"Certificate state: {cert['certificate']['state']}")
else:
print("Certificate not found")
get_mtls_credentials()¶
Get certificate data for mTLS from blockchain and save to files.
Required parameters:
owner
(str
): Certificate owner's Akash address
Optional parameters:
serial
(str
): Certificate serial (default: None, gets latest if None)
Returns: Dict[str, Any]
with the following fields:
status
(str
): Operation status ("success" or "error")file_paths
(Dict
): Certificate file pathsclient_cert
(str
): Client certificate file pathclient_key
(str
): Client key file pathca_cert
(str
): CA certificate file path
cert_data
(Dict
): Certificate datacert
(str
): Certificate in PEM formatkey
(str
): Private key in PEM formatserial
(str
): Certificate serial number
error
(str
): Error message (if failed)
Example:
from akash import AkashClient
client = AkashClient("https://akash-rpc.polkachu.com:443")
credentials = client.cert.get_mtls_credentials(
owner="akash1qnnhhgzxj24f2kld5yhy4v4h4s9r295ak5gjw4"
)
if credentials["status"] == "success":
print("Credentials retrieved and saved to files")
print(f"Client cert: {credentials['file_paths']['client_cert']}")
Utils¶
get_cert_file_paths()¶
Get standard certificate file paths used for mTLS connections.
Returns: Dict[str, str]
with the following fields:
client_cert
(str
): Path to client certificate file (default:certs/client.pem
)client_key
(str
): Path to client private key file (default:certs/client-key.pem
)ca_cert
(str
): Path to CA certificate file (default:certs/ca.pem
)
Example:
from akash import AkashClient
client = AkashClient("https://akash-rpc.polkachu.com:443")
paths = client.cert.get_cert_file_paths()
print(f"Client cert: {paths['client_cert']}")
print(f"Client key: {paths['client_key']}")
print(f"CA cert: {paths['ca_cert']}")
check_cert_files_exist()¶
Check if all required certificate files exist for mTLS connections. This checks for files in the default certs/
directory.
Returns: bool
- True if all certificate files exist in certs/
directory, False otherwise
Example:
import os
from akash import AkashClient
client = AkashClient("https://akash-rpc.polkachu.com:443")
if client.cert.check_cert_files_exist():
print("All certificate files are present in certs/ directory")
else:
print("Some certificate files are missing")
paths = client.cert.get_cert_file_paths()
for name, path in paths.items():
if os.path.exists(path):
print(f"[EXISTS] {name}: {path}")
else:
print(f"[MISSING] {name}: {path}")
create_ssl_context()¶
Create SSL context for mTLS connections with providers. This uses the certificate files from the certs/
directory.
Prerequisites: Certificate files must exist in certs/
directory. Use create_certificate_for_mtls()
or get_mtls_credentials()
to create them.
Returns: Dict[str, Any]
with the following fields:
status
(str
): Operation status ("success" or "error")ssl_context
(ssl.SSLContext
): SSL context object (if successful)error
(str
): Error message (if failed)
Example:
from akash import AkashClient
client = AkashClient("https://akash-rpc.polkachu.com:443")
if not client.cert.check_cert_files_exist():
print("Certificate files not found. Creating them...")
client.cert.create_certificate_for_mtls(wallet)
result = client.cert.create_ssl_context()
if result["status"] == "success":
ssl_context = result["ssl_context"]
print("SSL context created successfully from certs/ directory")
else:
print(f"Failed to create SSL context: {result['error']}")
validate_ssl_certificate()¶
Validate SSL certificate PEM format using cryptography library.
Required parameters:
cert_pem
(str
): Certificate in PEM format
Returns: bool
- True if certificate is valid, False otherwise
Example:
from akash import AkashClient
client = AkashClient("https://akash-rpc.polkachu.com:443")
with open("client.pem", "r") as f:
cert_pem = f.read()
if client.cert.validate_ssl_certificate(cert_pem):
print("Certificate is valid")
else:
print("Certificate is invalid or malformed")
verify_certificate_files()¶
Verify that certificate files exist and are readable for gRPC. Checks files in the certs/
directory.
Returns: Dict[str, Any]
with the following fields:
status
(str
): Verification status ("success", "partial", "failed")files
(Dict
): Per-file verification resultsclient_cert
(Dict
): Client certificate file statusexists
(bool
): Whether file existsreadable
(bool
): Whether file is readablesize
(int
, optional): File size in bytes (if exists and readable)error
(str
, optional): Error message (if not readable)
client_key
(Dict
): Client key file statusexists
(bool
): Whether file existsreadable
(bool
): Whether file is readablesize
(int
, optional): File size in bytes (if exists and readable)error
(str
, optional): Error message (if not readable)
ca_cert
(Dict
): CA certificate file statusexists
(bool
): Whether file existsreadable
(bool
): Whether file is readablesize
(int
, optional): File size in bytes (if exists and readable)error
(str
, optional): Error message (if not readable)
Example:
from akash import AkashClient
client = AkashClient("https://akash-rpc.polkachu.com:443")
verification = client.cert.verify_certificate_files()
print(f"Verification status: {verification['status']}")
for file_type, info in verification["files"].items():
if info["exists"]:
print(f"{file_type}: exists, readable: {info['readable']}")
else:
print(f"{file_type}: missing")
check_expiry()¶
Check certificate expiry status and calculate days remaining.
Required parameters:
certificate
(Dict[str, Any]
): Certificate dictionary from get_certificatesserial
(str
): Certificate serial numbercertificate
(Dict
): Certificate detailsstate
(str
): Certificate statecert
(str
): Base64-encoded certificate datapubkey
(str
): Base64-encoded public key data
Returns: Dict[str, Any]
with the following fields:
expired
(bool
): Whether certificate has expiredvalid_until
(str
): ISO datetime string of expiry date (if parseable)days_remaining
(int
): Days until expiry (0 if expired)message
(str
): Human-readable status message
Example:
from akash import AkashClient, AkashWallet
wallet = AkashWallet.from_mnemonic("your mnemonic here")
client = AkashClient("https://akash-rpc.polkachu.com:443")
cert_response = client.cert.get_certificates(owner=wallet.address)
certificates = cert_response.get('certificates', [])
if certificates:
cert = certificates[0]
expiry_info = client.cert.check_expiry(cert)
print(f"Certificate status: {expiry_info['message']}")
print(f"Expired: {expiry_info['expired']}")
if expiry_info['valid_until']:
print(f"Valid until: {expiry_info['valid_until']}")
if expiry_info['days_remaining'] is not None:
if expiry_info['days_remaining'] < 30:
print(f"Warning: certificate expires in {expiry_info['days_remaining']} days")
else:
print(f"Certificate valid for {expiry_info['days_remaining']} days")
validate_certificate()¶
Validate certificate structure and data.
Required parameters:
cert_data
(Dict
): Certificate data to validatecert
(str
): Certificate in PEM formatkey
(str
): Private key in PEM formatserial
(str
, optional): Certificate serial number
Returns: bool
- True if valid, False otherwise
Example:
from akash import AkashClient
client = AkashClient("https://akash-rpc.polkachu.com:443")
cert = client.cert.get_certificate(owner, serial)
if cert:
is_valid = client.cert.validate_certificate(cert)
print(f"Certificate valid: {is_valid}")
ensure_certificate()¶
Ensure a valid certificate exists for mTLS communication. This function validates existing certificates and creates new ones if needed.
Validation performed:
- Checks if certificate has expired or is not yet valid
- Verifies certificate serial number exists on blockchain
- Validates private key matches certificate public key
- Removes invalid certificates and creates new ones automatically
Required parameters:
wallet
(AkashWallet
): Wallet instance
Returns: tuple
with the following elements:
success
(bool
): Whether certificate is ready for usecert_pem
(str
): Certificate in PEM format (or None if failed)key_pem
(str
): Private key in PEM format (or None if failed)
Example:
from akash import AkashClient, AkashWallet
client = AkashClient("https://akash-rpc.polkachu.com:443")
wallet = AkashWallet.from_mnemonic("your mnemonic here")
success, cert_pem, key_pem = client.cert.ensure_certificate(wallet)
if success:
print("Certificate validated and ready for use")
else:
print("Failed to ensure certificate")
This function checks certificate cache first, then validates local certificate files against blockchain. If validation fails (expired, not on blockchain, or mismatched private key), it automatically creates a new certificate.
generate_certificate_serial()¶
Generate a deterministic serial number for a certificate.
Required parameters:
owner
(str
): Certificate owner addresscert_data
(bytes
): Certificate data
Returns: str
- Certificate serial number
Example:
from akash import AkashClient
client = AkashClient("https://akash-rpc.polkachu.com:443")
with open("cert.pem", "rb") as f:
cert_data = f.read()
serial = client.cert.generate_certificate_serial(
owner="akash1...",
cert_data=cert_data
)
print(f"Certificate serial: {serial}")
cleanup_certificates()¶
Clean up locally stored certificate files from the certs/
directory.
Returns: Dict[str, Any]
with the following fields:
status
(str
): Cleanup status ("success" or "error")cleaned_files
(List[str]
): List of cleaned file namesmessage
(str
): Status message
Example:
from akash import AkashClient
client = AkashClient("https://akash-rpc.polkachu.com:443")
result = client.cert.cleanup_certificates()
if result["status"] == "success":
print(f"Cleaned up {len(result['cleaned_files'])} certificate files from certs/ directory")
print(f"Removed files: {', '.join(result['cleaned_files'])}")
else:
print(f"Cleanup failed: {result.get('error', 'Unknown error')}")