Skip to content

Deployment module

The deployment module provides functionality for creating and managing deployments on the Akash Network.

Transaction parameters

All transaction functions support optional parameters like fee_amount, gas_limit, and gas_adjustment. See Transaction parameters for details.

DeploymentClient class

from akash import AkashClient

client = AkashClient("https://akash-rpc.polkachu.com:443")

deployment = client.deployment

Transactions

create_deployment()

Create deployment from SDL YAML.

def create_deployment(wallet, sdl_yaml: str, deposit: str = "5000000",
                      deposit_denom: str = "uakt", version: str = None,
                      memo: str = "", fee_amount: str = None, gas_limit: int = None,
                      gas_adjustment: float = 1.2, use_simulation: bool = True) -> BroadcastResult

Required parameters:

  • wallet (AkashWallet): Wallet instance for signing transaction
  • sdl_yaml (str): SDL YAML content as string

Optional parameters:

  • deposit (str): Deposit amount (default: "5000000")
  • deposit_denom (str): Deposit token denomination (default: "uakt")
  • version (str): Deployment version (calculated from SDL if not provided)
  • memo (str): Transaction memo (default: "")
  • fee_amount (str): Transaction fee amount in uakt (default: None)
  • gas_limit (int): Gas limit override (default: None)
  • gas_adjustment (float): Multiplier for gas estimation (default: 1.2)
  • use_simulation (bool): Enable gas simulation (default: True)

Returns: BroadcastResult - Transaction result with the following fields:

  • tx_hash (str): Transaction hash
  • code (int): Response code (0 for success)
  • raw_log (str): Raw transaction log/error message
  • success (bool): Whether transaction succeeded
  • events (List[Dict]): Transaction events
    • type (str): Event type
    • attributes (List[Dict]): Event attributes
      • key (str): Attribute key
      • value (str): Attribute value

Example:

from akash import AkashClient, AkashWallet

wallet = AkashWallet.from_mnemonic("your mnemonic here")
client = AkashClient("https://akash-rpc.polkachu.com:443")

sdl_yaml = '''
version: "2.0"
services:
  web:
    image: nginx:latest
    expose:
      - port: 80
        as: 80
        to:
          - global: true
profiles:
  compute:
    web:
      resources:
        cpu:
          units: 0.1
        memory:
          size: 128Mi
        storage:
          size: 512Mi
  placement:
    akash:
      pricing:
        web:
          denom: uakt
          amount: 1000
deployment:
  web:
    akash:
      profile: web
      count: 1
'''

result = client.deployment.create_deployment(
    wallet=wallet,
    sdl_yaml=sdl_yaml,
    deposit="5000000"
)

if result.success:
    print(f"Deployment created: {result.tx_hash}")
else:
    print(f"Failed to create deployment: {result.raw_log}")

⚠️ Note: To access deployment logs later, you'll need a client certificate. Create one with client.cert.create_certificate(wallet) before deploying. See Certificate documentation for details.

Using USDC deposits:

The SDK supports USDC and AKT. The deposit_denom parameter must match the denom specified in your SDL pricing section.

from akash import AkashClient, AkashWallet

wallet = AkashWallet.from_mnemonic("your mnemonic here")
client = AkashClient("https://akash-rpc.polkachu.com:443", "akashnet-2")

USDC_DENOM = "ibc/170C677610AC31DF0904FFE09CD3B5C657492170E7E52372E48756B71E56F2F1"

sdl_yaml = '''
version: "2.0"
services:
  web:
    image: nginx:latest
    expose:
      - port: 80
        as: 80
        to:
          - global: true
profiles:
  compute:
    web:
      resources:
        cpu:
          units: 0.1
        memory:
          size: 128Mi
        storage:
          size: 512Mi
  placement:
    akash:
      pricing:
        web:
          denom: ibc/170C677610AC31DF0904FFE09CD3B5C657492170E7E52372E48756B71E56F2F1
          amount: 500000 
deployment:
  web:
    akash:
      profile: web
      count: 1
'''

result = client.deployment.create_deployment(
    wallet=wallet,
    sdl_yaml=sdl_yaml,
    deposit="500000",          
    deposit_denom=USDC_DENOM,   
    fee_amount="7000"           
)

if result.success:
    print(f"Deployment created: {result.tx_hash}")
else:
    print(f"Failed to create deployment: {result.raw_log}")

⚠️ Important: The deposit_denom must exactly match the denomination in your SDL pricing section. Transaction fees (fee_amount) are always paid in uakt regardless of deployment denomination.

update_deployment()

Update deployment with SDL YAML content.

def update_deployment(wallet, sdl_yaml: str, owner: str = None, dseq: int = None,
                      memo: str = "", fee_amount: str = None, gas_limit: int = None,
                      gas_adjustment: float = 1.2, use_simulation: bool = True) -> BroadcastResult

Required parameters:

  • wallet (AkashWallet): Wallet instance for signing transaction
  • sdl_yaml (str): SDL YAML content as string
  • dseq (int): Deployment sequence number

Optional parameters:

  • owner (str): Deployment owner address (default: wallet.address)
  • memo (str): Transaction memo (default: "")
  • fee_amount (str): Transaction fee amount in uakt (default: None)
  • gas_limit (int): Gas limit override (default: None)
  • gas_adjustment (float): Multiplier for gas estimation (default: 1.2)
  • use_simulation (bool): Enable gas simulation (default: True)

Returns: BroadcastResult - Transaction result with the following fields:

  • tx_hash (str): Transaction hash
  • code (int): Response code (0 for success)
  • raw_log (str): Raw transaction log/error message
  • success (bool): Whether transaction succeeded
  • events (List[Dict]): Transaction events
    • type (str): Event type
    • attributes (List[Dict]): Event attributes
      • key (str): Attribute key
      • value (str): Attribute value

Example:

from akash import AkashClient, AkashWallet

wallet = AkashWallet.from_mnemonic("your mnemonic here")
client = AkashClient("https://akash-rpc.polkachu.com:443")

updated_sdl_yaml = '''
version: "2.0"
services:
  web:
    image: nginx:1.21
    command: ["/bin/sh", "-c", "echo 'Updated container' && nginx -g 'daemon off;'"]
    env:
      - NGINX_WORKER_PROCESSES=auto
      - UPDATE_VERSION=v2
    expose:
      - port: 80
        as: 80
        to:
          - global: true
profiles:
  compute:
    web:
      resources:
        cpu:
          units: 0.2
        memory:
          size: 256Mi
        storage:
          size: 1Gi
  placement:
    akash:
      pricing:
        web:
          denom: uakt
          amount: 1500
deployment:
  web:
    akash:
      profile: web
      count: 1
'''

result = client.deployment.update_deployment(
    wallet=wallet,
    sdl_yaml=updated_sdl_yaml,
    dseq=12345
)

if result.success:
    print(f"Deployment updated: {result.tx_hash}")
else:
    print(f"Update failed: {result.raw_log}")

close_deployment()

Close deployment and return deposit.

def close_deployment(wallet, owner: str, dseq: int,
                     memo: str = "", fee_amount: str = None,
                     gas_limit: int = None, gas_adjustment: float = 1.2,
                     use_simulation: bool = True) -> BroadcastResult

Required parameters:

  • wallet (AkashWallet): Wallet instance for signing transaction
  • owner (str): Deployment owner address
  • dseq (int): Deployment sequence number

Optional parameters:

  • memo (str): Transaction memo (default: "")
  • fee_amount (str): Fee amount in uakt (default: None)
  • gas_limit (int): Gas limit override (default: None)
  • gas_adjustment (float): Multiplier for simulated gas (default: 1.2)
  • use_simulation (bool): Enable/disable gas simulation (default: True)

Returns: BroadcastResult - Transaction result with the following fields:

  • tx_hash (str): Transaction hash
  • code (int): Response code (0 for success)
  • raw_log (str): Raw transaction log/error message
  • success (bool): Whether transaction succeeded
  • events (List[Dict]): Transaction events
    • type (str): Event type
    • attributes (List[Dict]): Event attributes
      • key (str): Attribute key
      • value (str): Attribute value

Example:

from akash import AkashClient, AkashWallet

wallet = AkashWallet.from_mnemonic("your mnemonic here")
client = AkashClient("https://akash-rpc.polkachu.com:443")

result = client.deployment.close_deployment(
    wallet=wallet,
    owner=wallet.address,
    dseq=12345
)

if result.success:
    print(f"Deployment closed: {result.tx_hash}")

deposit_deployment()

Add funds to deployment deposit account.

def deposit_deployment(wallet, owner: str, dseq: int, amount: str,
                       denom: str = "uakt", memo: str = "",
                       fee_amount: str = None, gas_limit: int = None,
                       gas_adjustment: float = 1.2, use_simulation: bool = True) -> BroadcastResult

Required parameters:

  • wallet (AkashWallet): Wallet for signing transaction
  • owner (str): Deployment owner address
  • dseq (int): Deployment sequence number
  • amount (str): Amount to deposit

Optional parameters:

  • denom (str): Token denomination (default: "uakt")
  • memo (str): Transaction memo (default: "")
  • fee_amount (str): Transaction fee amount in uakt (default: None)
  • gas_limit (int): Gas limit override (default: None)
  • gas_adjustment (float): Multiplier for gas estimation (default: 1.2)
  • use_simulation (bool): Enable gas simulation (default: True)

Returns: BroadcastResult - Transaction result with the following fields:

  • tx_hash (str): Transaction hash
  • code (int): Response code (0 for success)
  • raw_log (str): Raw transaction log/error message
  • success (bool): Whether transaction succeeded
  • events (List[Dict]): Transaction events
    • type (str): Event type
    • attributes (List[Dict]): Event attributes
      • key (str): Attribute key
      • value (str): Attribute value

Example:

from akash import AkashClient, AkashWallet

wallet = AkashWallet.from_mnemonic("your mnemonic here")
client = AkashClient("https://akash-rpc.polkachu.com:443")

result = client.deployment.deposit_deployment(
    wallet=wallet,
    owner=wallet.address,
    dseq=12345,
    amount="2000000"
)

if result.success:
    print(f"Deposit added: {result.tx_hash}")

close_group()

Close a specific group within a deployment.

def close_group(wallet, owner: str, dseq: int, gseq: int,
                memo: str = "", fee_amount: str = None, gas_limit: int = None,
                gas_adjustment: float = 1.2, use_simulation: bool = True) -> BroadcastResult

Required parameters:

  • wallet (AkashWallet): Wallet for signing transaction
  • owner (str): Deployment owner address
  • dseq (int): Deployment sequence number
  • gseq (int): Group sequence number

Optional parameters:

  • memo (str): Transaction memo (default: "")
  • fee_amount (str): Transaction fee amount in uakt (default: None)
  • gas_limit (int): Gas limit override (default: None)
  • gas_adjustment (float): Multiplier for gas estimation (default: 1.2)
  • use_simulation (bool): Enable gas simulation (default: True)

Returns: BroadcastResult - Transaction result with the following fields:

  • tx_hash (str): Transaction hash
  • code (int): Response code (0 for success)
  • raw_log (str): Raw transaction log/error message
  • success (bool): Whether transaction succeeded
  • events (List[Dict]): Transaction events
    • type (str): Event type
    • attributes (List[Dict]): Event attributes
      • key (str): Attribute key
      • value (str): Attribute value

Example:

from akash import AkashClient, AkashWallet

wallet = AkashWallet.from_mnemonic("your mnemonic here")
client = AkashClient("https://akash-rpc.polkachu.com:443")

result = client.deployment.close_group(
    wallet=wallet,
    owner=wallet.address,
    dseq=12345,
    gseq=1
)

if result.success:
    print(f"Group closed: {result.tx_hash}")

pause_group()

Pause a specific group within a deployment.

def pause_group(wallet, owner: str, dseq: int, gseq: int,
                memo: str = "", fee_amount: str = None, gas_limit: int = None,
                gas_adjustment: float = 1.2, use_simulation: bool = True) -> BroadcastResult

Required parameters:

  • wallet (AkashWallet): Wallet for signing transaction
  • owner (str): Deployment owner address
  • dseq (int): Deployment sequence number
  • gseq (int): Group sequence number

Optional parameters:

  • memo (str): Transaction memo (default: "")
  • fee_amount (str): Transaction fee amount in uakt (default: None)
  • gas_limit (int): Gas limit override (default: None)
  • gas_adjustment (float): Multiplier for gas estimation (default: 1.2)
  • use_simulation (bool): Enable gas simulation (default: True)

Returns: BroadcastResult - Transaction result with the following fields:

  • tx_hash (str): Transaction hash
  • code (int): Response code (0 for success)
  • raw_log (str): Raw transaction log/error message
  • success (bool): Whether transaction succeeded
  • events (List[Dict]): Transaction events
    • type (str): Event type
    • attributes (List[Dict]): Event attributes
      • key (str): Attribute key
      • value (str): Attribute value

Example:

from akash import AkashClient, AkashWallet

wallet = AkashWallet.from_mnemonic("your mnemonic here")
client = AkashClient("https://akash-rpc.polkachu.com:443")

result = client.deployment.pause_group(
    wallet=wallet,
    owner=wallet.address,
    dseq=12345,
    gseq=1
)

if result.success:
    print(f"Group paused: {result.tx_hash}")

start_group()

Start a specific group within a deployment.

def start_group(wallet, owner: str, dseq: int, gseq: int,
                memo: str = "", fee_amount: str = None, gas_limit: int = None,
                gas_adjustment: float = 1.2, use_simulation: bool = True) -> BroadcastResult

Required parameters:

  • wallet (AkashWallet): Wallet for signing transaction
  • owner (str): Deployment owner address
  • dseq (int): Deployment sequence number
  • gseq (int): Group sequence number

Optional parameters:

  • memo (str): Transaction memo (default: "")
  • fee_amount (str): Transaction fee amount in uakt (default: None)
  • gas_limit (int): Gas limit override (default: None)
  • gas_adjustment (float): Multiplier for gas estimation (default: 1.2)
  • use_simulation (bool): Enable gas simulation (default: True)

Returns: BroadcastResult - Transaction result with the following fields:

  • tx_hash (str): Transaction hash
  • code (int): Response code (0 for success)
  • raw_log (str): Raw transaction log/error message
  • success (bool): Whether transaction succeeded
  • events (List[Dict]): Transaction events
    • type (str): Event type
    • attributes (List[Dict]): Event attributes
      • key (str): Attribute key
      • value (str): Attribute value

Example:

from akash import AkashClient, AkashWallet

wallet = AkashWallet.from_mnemonic("your mnemonic here")
client = AkashClient("https://akash-rpc.polkachu.com:443")

result = client.deployment.start_group(
    wallet=wallet,
    owner=wallet.address,
    dseq=12345,
    gseq=1
)

if result.success:
    print(f"Group started: {result.tx_hash}")

Queries

get_deployments()

List deployments with optional filtering.

def get_deployments(owner: Optional[str] = None, limit: Optional[int] = None, 
                     offset: Optional[int] = None, count_total: bool = False) -> List[Dict[str, Any]]

Optional parameters:

  • owner (str): Deployment owner filter (default: None)
  • limit (int): Maximum number of results to return (default: None)
  • offset (int): Number of results to skip (default: None)
  • count_total (bool): Include total count in response (default: False)

Returns: List[Dict[str, Any]] - List of deployments with the following structure:

  • deployment (Dict): Deployment information
    • deployment_id (Dict): Deployment identifier
      • owner (str): Deployment owner address
      • dseq (int): Deployment sequence number
    • state (int): Deployment state (1=active, 2=closed)
    • version (str): Deployment version hash
    • created_at (int): Block height when deployment was created
  • groups (List[Dict]): List of groups in deployment
    • group_id (Dict): Group identifier
      • owner (str): Deployment owner address
      • dseq (int): Deployment sequence number
      • gseq (int): Group sequence number
    • state (int): Group state (1=open, 2=paused, 3=insufficient_funds, 4=closed)
    • name (str): Group name
    • created_at (int): Block height when group was created
  • escrow_account (Dict): Escrow account information
    • balance (str): Account balance in AKT
    • raw_balance (str): Account balance in uakt (micro AKT)
    • state (int): Account state (1=active, 2=closed)
    • transferred (Dict): Transferred amount
      • denom (str): Token denomination
      • amount (str): Transferred amount
    • settled_at (int): Settlement timestamp

Example:

from akash import AkashClient

client = AkashClient("https://akash-rpc.polkachu.com:443")

deployments = client.deployment.get_deployments(limit=10)

print(f"Found {len(deployments)} deployments")
for deployment in deployments:
    dseq = deployment['deployment']['deployment_id']['dseq']
    owner = deployment['deployment']['deployment_id']['owner']
    state = deployment['deployment']['state']
    print(f"DSEQ {dseq} (Owner: {owner}) - State: {state}")

my_deployments = client.deployment.get_deployments(
    owner="akash1qnnhhgzxj24f2kld5yhy4v4h4s9r295ak5gjw4",
    limit=5
)

print(f"Found {len(my_deployments)} deployments for owner")

get_deployment()

Get deployment information.

def get_deployment(owner: str, dseq: int) -> Dict[str, Any]

Required parameters:

  • owner (str): Deployment owner address
  • dseq (int): Deployment sequence number

Returns: Dict[str, Any] - Deployment information with the following structure:

  • deployment (Dict): Deployment information
    • deployment_id (Dict): Deployment identifier
      • owner (str): Deployment owner address
      • dseq (int): Deployment sequence number
    • state (int): Deployment state (1=active, 2=closed)
    • version (str): Deployment version hash
    • created_at (int): Block height when deployment was created
  • groups (List[Dict]): List of groups in deployment
    • group_id (Dict): Group identifier
      • owner (str): Deployment owner address
      • dseq (int): Deployment sequence number
      • gseq (int): Group sequence number
    • state (int): Group state (1=open, 2=paused, 3=insufficient_funds, 4=closed)
    • name (str): Group name
  • escrow_account (Dict): Escrow account information
    • balance (str): Account balance in AKT
    • raw_balance (str): Account balance in uakt (micro AKT)
    • state (int): Account state (1=active, 2=closed)
    • transferred (Dict): Transferred amount
      • denom (str): Token denomination
      • amount (str): Transferred amount
    • settled_at (int): Settlement timestamp

Example:

from akash import AkashClient

client = AkashClient("https://akash-rpc.polkachu.com:443")

deployment = client.deployment.get_deployment(
    owner="akash1qnnhhgzxj24f2kld5yhy4v4h4s9r295ak5gjw4",
    dseq=12345
)

if deployment:
    dep_info = deployment['deployment']
    dep_id = dep_info['deployment_id']
    escrow = deployment['escrow_account']

    print(f"Deployment {dep_id['dseq']}:")
    print(f"Owner: {dep_id['owner']}")
    print(f"State: {dep_info['state']} (1=active, 2=closed)")
    print(f"Version: {dep_info['version']}")
    print(f"Escrow Balance: {escrow['balance']} AKT")
    print(f"Groups: {len(deployment['groups'])}")

    for group in deployment['groups']:
        gseq = group['group_id']['gseq']
        state = group['state']
        name = group['name']
        print(f"Group {gseq}: {name} (state {state})")
else:
    print("Deployment not found")

Utils

validate_sdl()

Validate SDL (Stack Definition Language) structure and contents.

def validate_sdl(sdl: Dict[str, Any]) -> Dict[str, Any]

Required parameters:

  • sdl (Dict[str, Any]): SDL configuration dictionary

Returns: Dict[str, Any] - Validation result with the following structure:

  • valid (bool): Whether SDL is valid
  • errors (List[str]): List of validation errors
  • warnings (List[str]): List of validation warnings
  • details (Dict): Detailed validation information
    • version (str): SDL version from input
    • services (List[str]): List of service names from services section
    • resources (Dict): Resource specifications by service name
      • service_name (Dict): Service resource details
        • cpu (str): CPU units value or "N/A"
        • memory (str): Memory size value or "N/A"
        • storage (str): Storage size value or "N/A"
    • pricing (Dict): Pricing specifications by placement name
      • placement_name (Dict): Placement pricing details
        • service_name (str): Formatted pricing string "amount denom"
    • deployment (Dict): Deployment configuration by service name
      • service_name (Dict): Placement configuration for service
        • placement_name (Dict): Placement details
          • profile (str): Profile name to use
          • count (int): Number of instances

Example:

from akash import AkashClient

client = AkashClient("https://akash-rpc.polkachu.com:443")

sdl = {
    "version": "2.0",
    "services": {
        "web": {
            "image": "nginx:latest",
            "expose": [{"port": 80, "as": 80, "to": [{"global": True}]}]
        }
    },
    "profiles": {
        "compute": {
            "web": {
                "resources": {
                    "cpu": {"units": "0.1"},
                    "memory": {"size": "128Mi"},
                    "storage": {"size": "512Mi"}
                }
            }
        },
        "placement": {
            "akash": {
                "pricing": {
                    "web": {"denom": "uakt", "amount": "1000"}
                }
            }
        }
    },
    "deployment": {
        "web": {"akash": {"profile": "web", "count": 1}}
    }
}

validation = client.deployment.validate_sdl(sdl)

if validation['valid']:
    print("SDL is valid")
    print(f"Services: {validation['details']['services']}")
else:
    print("SDL validation failed:")
    for error in validation['errors']:
        print(f"Error: {error}")

if validation['warnings']:
    print("Warnings:")
    for warning in validation['warnings']:
        print(f"Warning: {warning}")

get_service_logs()

Get logs from a service running in your deployment using WebSocket streaming.

def get_service_logs(provider_endpoint: str, lease_id: Dict[str, Any], 
                     service_name: str = None, tail: int = 100, 
                     cert_pem: str = None, key_pem: str = None, timeout: int = 30) -> List[str]

Required parameters:

  • provider_endpoint (str): Provider's HTTPS endpoint (e.g., "provider.akash.network:8443")
  • lease_id (Dict): Lease identifier dictionary with the following fields:
    • owner (str): Deployment owner address
    • dseq (int): Deployment sequence number
    • gseq (int): Group sequence number
    • oseq (int): Order sequence number
    • provider (str): Provider address

Optional parameters:

  • service_name (str): Specific service name to get logs from (default: None for all services)
  • tail (int): Number of recent log lines to retrieve (default: 100)
  • cert_pem (str): Client certificate in PEM format for mTLS authentication (default: auto-load from client store)
  • key_pem (str): Client private key in PEM format for mTLS authentication (default: auto-load from client store)
  • timeout (int): WebSocket connection timeout in seconds (default: 30)

Returns: List[str] - List of log lines from the service(s)

⚠️ Note: Not all providers capture application logs. Ensure your deployment produces logs and the provider supports log capture before expecting results.

Example:

from akash import AkashClient, AkashWallet

client = AkashClient("https://akash-rpc.polkachu.com:443")
wallet = AkashWallet.from_mnemonic("your mnemonic here")

lease_id = {
    "owner": wallet.address,
    "dseq": 12345,
    "gseq": 1,
    "oseq": 1,
    "provider": "akash1..."
}

logs = client.deployment.get_service_logs(
    provider_endpoint="provider.europlots.com:8443",
    lease_id=lease_id,
    service_name="web",
    tail=50
)

print(f"Retrieved {len(logs)} log lines:")
for line in logs:
    print(line)

stream_service_logs()

Stream logs from a service running in your deployment in real-time using WebSocket streaming.

def stream_service_logs(provider_endpoint: str, lease_id: Dict[str, Any],
                        service_name: str = None, follow: bool = True,
                        cert_pem: str = None, key_pem: str = None, timeout: int = 300)

Required parameters:

  • provider_endpoint (str): Provider's HTTPS endpoint (e.g., "provider.akash.network:8443")
  • lease_id (Dict): Lease identifier dictionary with the following fields:
    • owner (str): Deployment owner address
    • dseq (int): Deployment sequence number
    • gseq (int): Group sequence number
    • oseq (int): Order sequence number
    • provider (str): Provider address

Optional parameters:

  • service_name (str): Specific service name to stream logs from (default: None for all services)
  • follow (bool): Whether to keep streaming new logs like tail -f (default: True)
  • cert_pem (str): Client certificate in PEM format for mTLS authentication (default: auto-load from client store)
  • key_pem (str): Client private key in PEM format for mTLS authentication (default: auto-load from client store)
  • timeout (int): Maximum streaming duration in seconds (default: 300)

Yields: str - Log lines as they are received from the service in real-time

⚠️ Note: Not all providers capture application logs. The stream will continue indefinitely if follow=True until timeout or manual interruption. Deployment owners can access their deployment logs with valid certificates.

Example:

from akash import AkashClient, AkashWallet

client = AkashClient("https://akash-rpc.polkachu.com:443")
wallet = AkashWallet.from_mnemonic("your mnemonic here")

lease_id = {
    "owner": wallet.address,
    "dseq": 12345,
    "gseq": 1,
    "oseq": 1,
    "provider": "akash1..."
}

print("Streaming logs (Ctrl+C to stop)...")
for log_line in client.deployment.stream_service_logs(
    provider_endpoint="provider.europlots.com:8443",
    lease_id=lease_id,
    service_name="web",
    follow=True
):
    print(f"[LOG] {log_line}")

Deployment log functionality requires mTLS authentication using client certificates. See Certificate documentation for setup details. The SDK automatically uses certificates from the client store.

Manifest submission

submit_manifest_to_provider()

Submit deployment manifest to provider with automatic protocol fallback.

def submit_manifest_to_provider(provider_endpoint: str, lease_id: Dict[str, Any],
                                manifest: Dict[str, Any], cert_pem: str = None,
                                key_pem: str = None, timeout: int = 60, 
                                use_http: bool = False) -> Dict[str, Any]

Required parameters:

  • provider_endpoint (str): Provider endpoint (e.g., "provider.akash.network:8443")
  • lease_id (Dict[str, Any]): Lease identifier dictionary
    • owner (str, required): Deployment owner address
    • dseq (str or int, required): Deployment sequence number
    • gseq (str or int, required): Group sequence number
    • oseq (str or int, required): Order sequence number
    • provider (str, required): Provider address
  • manifest (Dict[str, Any]): Manifest data dictionary
    • version (str, required): Manifest version (e.g., "2.0")
    • services (Dict[str, Dict], required): Service definitions (keys are service names like "web", "api")
      • image (str, required): Docker image name
      • expose (List[Dict], optional): Port exposure configuration
        • port (int, required): Internal port number
        • as (int, required): External port number
        • to (List[Dict], required): Exposure targets
          • global (bool, optional): Expose globally
          • service (str, optional): Expose to specific service
      • env (List[str], optional): Environment variables
      • command (List[str], optional): Container command override
    • profiles (Dict[str, Dict], required): Resource and placement profiles
      • compute (Dict[str, Dict], required): Compute resource profiles (keys are profile names like "web", "api")
        • resources (Dict, required): Resource requirements
          • cpu (Dict, required): CPU specification
            • units (str or float, required): CPU units
          • memory (Dict, required): Memory specification
            • quantity (Dict, required): Memory quantity
              • val (str, required): Memory value (e.g., "256Mi")
          • storage (Dict or List[Dict], required): Storage specification
            • quantity (Dict, required): Storage quantity (if single)
              • val (str, required): Storage value (e.g., "1Gi")
            • size (str, optional): Storage size (alternative format)
            • attributes (Dict, optional): Storage attributes
              • persistent (str, optional): Persistence setting
              • class (str, optional): Storage class
      • placement (Dict[str, Dict], required): Placement profiles (keys are placement names like "akash")
        • attributes (Dict[str, str], optional): Provider attribute requirements as key-value pairs
        • pricing (Dict[str, Dict], required): Pricing for services (keys are service names)
          • denom (str, required): Token denomination (e.g., "uakt")
          • amount (str, required): Price amount
    • deployment (Dict[str, Dict], required): Deployment mapping (keys are service names)
      • profile (str, required): Compute profile name
      • count (int, required): Instance count

Optional parameters:

  • cert_pem (str): Client certificate PEM (default: auto-load from cert store)
  • key_pem (str): Client private key PEM (default: auto-load from cert store)
  • timeout (int): Request timeout in seconds (default: 60)
  • use_http (bool): Force HTTP method instead of gRPC first (default: False)

Returns: Dict[str, Any] - Submission result with the following fields:

  • status (str): Submission status ("success" or "error")
  • method (str, optional): Protocol used ("gRPC" or "HTTP")
  • error (str, optional): Error message if status is "error"

Example:

from akash import AkashClient, AkashWallet

client = AkashClient("https://akash-rpc.polkachu.com:443")
wallet = AkashWallet.from_mnemonic("your mnemonic here")

lease_id = {
    "owner": wallet.address,
    "dseq": 12345,
    "gseq": 1, 
    "oseq": 1,
    "provider": provider_address
}

manifest = {
    "version": "2.0",
    "services": {
        "web": {
            "image": "nginx:latest",
            "expose": [{"port": 80, "as": 80, "to": [{"global": True}]}]
        }
    },
    "profiles": {
        "compute": {
            "web": {
                "resources": {
                    "cpu": {"units": 0.1},
                    "memory": {"quantity": {"val": "512Mi"}},
                    "storage": {"quantity": {"val": "1Gi"}}
                }
            }
        },
        "placement": {
            "akash": {"pricing": {"web": {"denom": "uakt", "amount": "100"}}}
        }
    },
    "deployment": {
        "web": {"akash": {"profile": "web", "count": 1}}
    }
}

result = client.deployment.submit_manifest_to_provider(
    provider_endpoint=provider_endpoint,
    lease_id=lease_id,
    manifest=manifest
)

if result["status"] == "success":
    print(f"Manifest submitted via {result.get('method', 'gRPC')}")
else:
    print(f"Manifest submission failed: {result['error']}")

Protocol fallback:

The method automatically tries gRPC first, then falls back to HTTP PUT if gRPC fails. This matches the console approach and provides maximum provider compatibility. Set use_http=True to force HTTP method.

Authentication:

Like log functions, manifest submission requires mTLS certificates. The SDK automatically uses certificates from the client store created during create_certificate().