Deployment module¶
The deployment module provides functionality for creating and managing deployments on the Akash Network.
Transaction parameters
All transaction functions support optional parameters like fee_amount
, gas_limit
, and gas_adjustment
. See Transaction parameters for details.
DeploymentClient class¶
from akash import AkashClient
client = AkashClient("https://akash-rpc.polkachu.com:443")
deployment = client.deployment
Transactions¶
create_deployment()¶
Create deployment from SDL YAML.
def create_deployment(wallet, sdl_yaml: str, deposit: str = "5000000",
deposit_denom: str = "uakt", version: str = None,
memo: str = "", fee_amount: str = None, gas_limit: int = None,
gas_adjustment: float = 1.2, use_simulation: bool = True) -> BroadcastResult
Required parameters:
wallet
(AkashWallet
): Wallet instance for signing transactionsdl_yaml
(str
): SDL YAML content as string
Optional parameters:
deposit
(str
): Deposit amount (default: "5000000")deposit_denom
(str
): Deposit token denomination (default: "uakt")version
(str
): Deployment version (calculated from SDL if not provided)memo
(str
): Transaction memo (default: "")fee_amount
(str
): Transaction fee amount in uakt (default: None)gas_limit
(int
): Gas limit override (default: None)gas_adjustment
(float
): Multiplier for gas estimation (default: 1.2)use_simulation
(bool
): Enable gas simulation (default: True)
Returns: BroadcastResult
- Transaction result with the following fields:
tx_hash
(str
): Transaction hashcode
(int
): Response code (0 for success)raw_log
(str
): Raw transaction log/error messagesuccess
(bool
): Whether transaction succeededevents
(List[Dict]
): Transaction eventstype
(str
): Event typeattributes
(List[Dict]
): Event attributeskey
(str
): Attribute keyvalue
(str
): Attribute value
Example:
from akash import AkashClient, AkashWallet
wallet = AkashWallet.from_mnemonic("your mnemonic here")
client = AkashClient("https://akash-rpc.polkachu.com:443")
sdl_yaml = '''
version: "2.0"
services:
web:
image: nginx:latest
expose:
- port: 80
as: 80
to:
- global: true
profiles:
compute:
web:
resources:
cpu:
units: 0.1
memory:
size: 128Mi
storage:
size: 512Mi
placement:
akash:
pricing:
web:
denom: uakt
amount: 1000
deployment:
web:
akash:
profile: web
count: 1
'''
result = client.deployment.create_deployment(
wallet=wallet,
sdl_yaml=sdl_yaml,
deposit="5000000"
)
if result.success:
print(f"Deployment created: {result.tx_hash}")
else:
print(f"Failed to create deployment: {result.raw_log}")
⚠️ Note: To access deployment logs later, you'll need a client certificate. Create one with client.cert.create_certificate(wallet)
before deploying. See Certificate documentation for details.
Using USDC deposits:
The SDK supports USDC and AKT. The deposit_denom
parameter must match the denom
specified in your SDL pricing section.
from akash import AkashClient, AkashWallet
wallet = AkashWallet.from_mnemonic("your mnemonic here")
client = AkashClient("https://akash-rpc.polkachu.com:443", "akashnet-2")
USDC_DENOM = "ibc/170C677610AC31DF0904FFE09CD3B5C657492170E7E52372E48756B71E56F2F1"
sdl_yaml = '''
version: "2.0"
services:
web:
image: nginx:latest
expose:
- port: 80
as: 80
to:
- global: true
profiles:
compute:
web:
resources:
cpu:
units: 0.1
memory:
size: 128Mi
storage:
size: 512Mi
placement:
akash:
pricing:
web:
denom: ibc/170C677610AC31DF0904FFE09CD3B5C657492170E7E52372E48756B71E56F2F1
amount: 500000
deployment:
web:
akash:
profile: web
count: 1
'''
result = client.deployment.create_deployment(
wallet=wallet,
sdl_yaml=sdl_yaml,
deposit="500000",
deposit_denom=USDC_DENOM,
fee_amount="7000"
)
if result.success:
print(f"Deployment created: {result.tx_hash}")
else:
print(f"Failed to create deployment: {result.raw_log}")
⚠️ Important: The deposit_denom
must exactly match the denomination in your SDL pricing
section. Transaction fees (fee_amount
) are always paid in uakt regardless of deployment denomination.
update_deployment()¶
Update deployment with SDL YAML content.
def update_deployment(wallet, sdl_yaml: str, owner: str = None, dseq: int = None,
memo: str = "", fee_amount: str = None, gas_limit: int = None,
gas_adjustment: float = 1.2, use_simulation: bool = True) -> BroadcastResult
Required parameters:
wallet
(AkashWallet
): Wallet instance for signing transactionsdl_yaml
(str
): SDL YAML content as stringdseq
(int
): Deployment sequence number
Optional parameters:
owner
(str
): Deployment owner address (default: wallet.address)memo
(str
): Transaction memo (default: "")fee_amount
(str
): Transaction fee amount in uakt (default: None)gas_limit
(int
): Gas limit override (default: None)gas_adjustment
(float
): Multiplier for gas estimation (default: 1.2)use_simulation
(bool
): Enable gas simulation (default: True)
Returns: BroadcastResult
- Transaction result with the following fields:
tx_hash
(str
): Transaction hashcode
(int
): Response code (0 for success)raw_log
(str
): Raw transaction log/error messagesuccess
(bool
): Whether transaction succeededevents
(List[Dict]
): Transaction eventstype
(str
): Event typeattributes
(List[Dict]
): Event attributeskey
(str
): Attribute keyvalue
(str
): Attribute value
Example:
from akash import AkashClient, AkashWallet
wallet = AkashWallet.from_mnemonic("your mnemonic here")
client = AkashClient("https://akash-rpc.polkachu.com:443")
updated_sdl_yaml = '''
version: "2.0"
services:
web:
image: nginx:1.21
command: ["/bin/sh", "-c", "echo 'Updated container' && nginx -g 'daemon off;'"]
env:
- NGINX_WORKER_PROCESSES=auto
- UPDATE_VERSION=v2
expose:
- port: 80
as: 80
to:
- global: true
profiles:
compute:
web:
resources:
cpu:
units: 0.2
memory:
size: 256Mi
storage:
size: 1Gi
placement:
akash:
pricing:
web:
denom: uakt
amount: 1500
deployment:
web:
akash:
profile: web
count: 1
'''
result = client.deployment.update_deployment(
wallet=wallet,
sdl_yaml=updated_sdl_yaml,
dseq=12345
)
if result.success:
print(f"Deployment updated: {result.tx_hash}")
else:
print(f"Update failed: {result.raw_log}")
close_deployment()¶
Close deployment and return deposit.
def close_deployment(wallet, owner: str, dseq: int,
memo: str = "", fee_amount: str = None,
gas_limit: int = None, gas_adjustment: float = 1.2,
use_simulation: bool = True) -> BroadcastResult
Required parameters:
wallet
(AkashWallet
): Wallet instance for signing transactionowner
(str
): Deployment owner addressdseq
(int
): Deployment sequence number
Optional parameters:
memo
(str
): Transaction memo (default: "")fee_amount
(str
): Fee amount in uakt (default: None)gas_limit
(int
): Gas limit override (default: None)gas_adjustment
(float
): Multiplier for simulated gas (default: 1.2)use_simulation
(bool
): Enable/disable gas simulation (default: True)
Returns: BroadcastResult
- Transaction result with the following fields:
tx_hash
(str
): Transaction hashcode
(int
): Response code (0 for success)raw_log
(str
): Raw transaction log/error messagesuccess
(bool
): Whether transaction succeededevents
(List[Dict]
): Transaction eventstype
(str
): Event typeattributes
(List[Dict]
): Event attributeskey
(str
): Attribute keyvalue
(str
): Attribute value
Example:
from akash import AkashClient, AkashWallet
wallet = AkashWallet.from_mnemonic("your mnemonic here")
client = AkashClient("https://akash-rpc.polkachu.com:443")
result = client.deployment.close_deployment(
wallet=wallet,
owner=wallet.address,
dseq=12345
)
if result.success:
print(f"Deployment closed: {result.tx_hash}")
deposit_deployment()¶
Add funds to deployment deposit account.
def deposit_deployment(wallet, owner: str, dseq: int, amount: str,
denom: str = "uakt", memo: str = "",
fee_amount: str = None, gas_limit: int = None,
gas_adjustment: float = 1.2, use_simulation: bool = True) -> BroadcastResult
Required parameters:
wallet
(AkashWallet
): Wallet for signing transactionowner
(str
): Deployment owner addressdseq
(int
): Deployment sequence numberamount
(str
): Amount to deposit
Optional parameters:
denom
(str
): Token denomination (default: "uakt")memo
(str
): Transaction memo (default: "")fee_amount
(str
): Transaction fee amount in uakt (default: None)gas_limit
(int
): Gas limit override (default: None)gas_adjustment
(float
): Multiplier for gas estimation (default: 1.2)use_simulation
(bool
): Enable gas simulation (default: True)
Returns: BroadcastResult
- Transaction result with the following fields:
tx_hash
(str
): Transaction hashcode
(int
): Response code (0 for success)raw_log
(str
): Raw transaction log/error messagesuccess
(bool
): Whether transaction succeededevents
(List[Dict]
): Transaction eventstype
(str
): Event typeattributes
(List[Dict]
): Event attributeskey
(str
): Attribute keyvalue
(str
): Attribute value
Example:
from akash import AkashClient, AkashWallet
wallet = AkashWallet.from_mnemonic("your mnemonic here")
client = AkashClient("https://akash-rpc.polkachu.com:443")
result = client.deployment.deposit_deployment(
wallet=wallet,
owner=wallet.address,
dseq=12345,
amount="2000000"
)
if result.success:
print(f"Deposit added: {result.tx_hash}")
close_group()¶
Close a specific group within a deployment.
def close_group(wallet, owner: str, dseq: int, gseq: int,
memo: str = "", fee_amount: str = None, gas_limit: int = None,
gas_adjustment: float = 1.2, use_simulation: bool = True) -> BroadcastResult
Required parameters:
wallet
(AkashWallet
): Wallet for signing transactionowner
(str
): Deployment owner addressdseq
(int
): Deployment sequence numbergseq
(int
): Group sequence number
Optional parameters:
memo
(str
): Transaction memo (default: "")fee_amount
(str
): Transaction fee amount in uakt (default: None)gas_limit
(int
): Gas limit override (default: None)gas_adjustment
(float
): Multiplier for gas estimation (default: 1.2)use_simulation
(bool
): Enable gas simulation (default: True)
Returns: BroadcastResult
- Transaction result with the following fields:
tx_hash
(str
): Transaction hashcode
(int
): Response code (0 for success)raw_log
(str
): Raw transaction log/error messagesuccess
(bool
): Whether transaction succeededevents
(List[Dict]
): Transaction eventstype
(str
): Event typeattributes
(List[Dict]
): Event attributeskey
(str
): Attribute keyvalue
(str
): Attribute value
Example:
from akash import AkashClient, AkashWallet
wallet = AkashWallet.from_mnemonic("your mnemonic here")
client = AkashClient("https://akash-rpc.polkachu.com:443")
result = client.deployment.close_group(
wallet=wallet,
owner=wallet.address,
dseq=12345,
gseq=1
)
if result.success:
print(f"Group closed: {result.tx_hash}")
pause_group()¶
Pause a specific group within a deployment.
def pause_group(wallet, owner: str, dseq: int, gseq: int,
memo: str = "", fee_amount: str = None, gas_limit: int = None,
gas_adjustment: float = 1.2, use_simulation: bool = True) -> BroadcastResult
Required parameters:
wallet
(AkashWallet
): Wallet for signing transactionowner
(str
): Deployment owner addressdseq
(int
): Deployment sequence numbergseq
(int
): Group sequence number
Optional parameters:
memo
(str
): Transaction memo (default: "")fee_amount
(str
): Transaction fee amount in uakt (default: None)gas_limit
(int
): Gas limit override (default: None)gas_adjustment
(float
): Multiplier for gas estimation (default: 1.2)use_simulation
(bool
): Enable gas simulation (default: True)
Returns: BroadcastResult
- Transaction result with the following fields:
tx_hash
(str
): Transaction hashcode
(int
): Response code (0 for success)raw_log
(str
): Raw transaction log/error messagesuccess
(bool
): Whether transaction succeededevents
(List[Dict]
): Transaction eventstype
(str
): Event typeattributes
(List[Dict]
): Event attributeskey
(str
): Attribute keyvalue
(str
): Attribute value
Example:
from akash import AkashClient, AkashWallet
wallet = AkashWallet.from_mnemonic("your mnemonic here")
client = AkashClient("https://akash-rpc.polkachu.com:443")
result = client.deployment.pause_group(
wallet=wallet,
owner=wallet.address,
dseq=12345,
gseq=1
)
if result.success:
print(f"Group paused: {result.tx_hash}")
start_group()¶
Start a specific group within a deployment.
def start_group(wallet, owner: str, dseq: int, gseq: int,
memo: str = "", fee_amount: str = None, gas_limit: int = None,
gas_adjustment: float = 1.2, use_simulation: bool = True) -> BroadcastResult
Required parameters:
wallet
(AkashWallet
): Wallet for signing transactionowner
(str
): Deployment owner addressdseq
(int
): Deployment sequence numbergseq
(int
): Group sequence number
Optional parameters:
memo
(str
): Transaction memo (default: "")fee_amount
(str
): Transaction fee amount in uakt (default: None)gas_limit
(int
): Gas limit override (default: None)gas_adjustment
(float
): Multiplier for gas estimation (default: 1.2)use_simulation
(bool
): Enable gas simulation (default: True)
Returns: BroadcastResult
- Transaction result with the following fields:
tx_hash
(str
): Transaction hashcode
(int
): Response code (0 for success)raw_log
(str
): Raw transaction log/error messagesuccess
(bool
): Whether transaction succeededevents
(List[Dict]
): Transaction eventstype
(str
): Event typeattributes
(List[Dict]
): Event attributeskey
(str
): Attribute keyvalue
(str
): Attribute value
Example:
from akash import AkashClient, AkashWallet
wallet = AkashWallet.from_mnemonic("your mnemonic here")
client = AkashClient("https://akash-rpc.polkachu.com:443")
result = client.deployment.start_group(
wallet=wallet,
owner=wallet.address,
dseq=12345,
gseq=1
)
if result.success:
print(f"Group started: {result.tx_hash}")
Queries¶
get_deployments()¶
List deployments with optional filtering.
def get_deployments(owner: Optional[str] = None, limit: Optional[int] = None,
offset: Optional[int] = None, count_total: bool = False) -> List[Dict[str, Any]]
Optional parameters:
owner
(str
): Deployment owner filter (default: None)limit
(int
): Maximum number of results to return (default: None)offset
(int
): Number of results to skip (default: None)count_total
(bool
): Include total count in response (default: False)
Returns: List[Dict[str, Any]]
- List of deployments with the following structure:
deployment
(Dict
): Deployment informationdeployment_id
(Dict
): Deployment identifierowner
(str
): Deployment owner addressdseq
(int
): Deployment sequence number
state
(int
): Deployment state (1=active, 2=closed)version
(str
): Deployment version hashcreated_at
(int
): Block height when deployment was created
groups
(List[Dict]
): List of groups in deploymentgroup_id
(Dict
): Group identifierowner
(str
): Deployment owner addressdseq
(int
): Deployment sequence numbergseq
(int
): Group sequence number
state
(int
): Group state (1=open, 2=paused, 3=insufficient_funds, 4=closed)name
(str
): Group namecreated_at
(int
): Block height when group was created
escrow_account
(Dict
): Escrow account informationbalance
(str
): Account balance in AKTraw_balance
(str
): Account balance in uakt (micro AKT)state
(int
): Account state (1=active, 2=closed)transferred
(Dict
): Transferred amountdenom
(str
): Token denominationamount
(str
): Transferred amount
settled_at
(int
): Settlement timestamp
Example:
from akash import AkashClient
client = AkashClient("https://akash-rpc.polkachu.com:443")
deployments = client.deployment.get_deployments(limit=10)
print(f"Found {len(deployments)} deployments")
for deployment in deployments:
dseq = deployment['deployment']['deployment_id']['dseq']
owner = deployment['deployment']['deployment_id']['owner']
state = deployment['deployment']['state']
print(f"DSEQ {dseq} (Owner: {owner}) - State: {state}")
my_deployments = client.deployment.get_deployments(
owner="akash1qnnhhgzxj24f2kld5yhy4v4h4s9r295ak5gjw4",
limit=5
)
print(f"Found {len(my_deployments)} deployments for owner")
get_deployment()¶
Get deployment information.
Required parameters:
owner
(str
): Deployment owner addressdseq
(int
): Deployment sequence number
Returns: Dict[str, Any]
- Deployment information with the following structure:
deployment
(Dict
): Deployment informationdeployment_id
(Dict
): Deployment identifierowner
(str
): Deployment owner addressdseq
(int
): Deployment sequence number
state
(int
): Deployment state (1=active, 2=closed)version
(str
): Deployment version hashcreated_at
(int
): Block height when deployment was created
groups
(List[Dict]
): List of groups in deploymentgroup_id
(Dict
): Group identifierowner
(str
): Deployment owner addressdseq
(int
): Deployment sequence numbergseq
(int
): Group sequence number
state
(int
): Group state (1=open, 2=paused, 3=insufficient_funds, 4=closed)name
(str
): Group name
escrow_account
(Dict
): Escrow account informationbalance
(str
): Account balance in AKTraw_balance
(str
): Account balance in uakt (micro AKT)state
(int
): Account state (1=active, 2=closed)transferred
(Dict
): Transferred amountdenom
(str
): Token denominationamount
(str
): Transferred amount
settled_at
(int
): Settlement timestamp
Example:
from akash import AkashClient
client = AkashClient("https://akash-rpc.polkachu.com:443")
deployment = client.deployment.get_deployment(
owner="akash1qnnhhgzxj24f2kld5yhy4v4h4s9r295ak5gjw4",
dseq=12345
)
if deployment:
dep_info = deployment['deployment']
dep_id = dep_info['deployment_id']
escrow = deployment['escrow_account']
print(f"Deployment {dep_id['dseq']}:")
print(f"Owner: {dep_id['owner']}")
print(f"State: {dep_info['state']} (1=active, 2=closed)")
print(f"Version: {dep_info['version']}")
print(f"Escrow Balance: {escrow['balance']} AKT")
print(f"Groups: {len(deployment['groups'])}")
for group in deployment['groups']:
gseq = group['group_id']['gseq']
state = group['state']
name = group['name']
print(f"Group {gseq}: {name} (state {state})")
else:
print("Deployment not found")
Utils¶
validate_sdl()¶
Validate SDL (Stack Definition Language) structure and contents.
Required parameters:
sdl
(Dict[str, Any]
): SDL configuration dictionary
Returns: Dict[str, Any]
- Validation result with the following structure:
valid
(bool
): Whether SDL is validerrors
(List[str]
): List of validation errorswarnings
(List[str]
): List of validation warningsdetails
(Dict
): Detailed validation informationversion
(str
): SDL version from inputservices
(List[str]
): List of service names from services sectionresources
(Dict
): Resource specifications by service nameservice_name
(Dict
): Service resource detailscpu
(str
): CPU units value or "N/A"memory
(str
): Memory size value or "N/A"storage
(str
): Storage size value or "N/A"
pricing
(Dict
): Pricing specifications by placement nameplacement_name
(Dict
): Placement pricing detailsservice_name
(str
): Formatted pricing string "amount denom"
deployment
(Dict
): Deployment configuration by service nameservice_name
(Dict
): Placement configuration for serviceplacement_name
(Dict
): Placement detailsprofile
(str
): Profile name to usecount
(int
): Number of instances
Example:
from akash import AkashClient
client = AkashClient("https://akash-rpc.polkachu.com:443")
sdl = {
"version": "2.0",
"services": {
"web": {
"image": "nginx:latest",
"expose": [{"port": 80, "as": 80, "to": [{"global": True}]}]
}
},
"profiles": {
"compute": {
"web": {
"resources": {
"cpu": {"units": "0.1"},
"memory": {"size": "128Mi"},
"storage": {"size": "512Mi"}
}
}
},
"placement": {
"akash": {
"pricing": {
"web": {"denom": "uakt", "amount": "1000"}
}
}
}
},
"deployment": {
"web": {"akash": {"profile": "web", "count": 1}}
}
}
validation = client.deployment.validate_sdl(sdl)
if validation['valid']:
print("SDL is valid")
print(f"Services: {validation['details']['services']}")
else:
print("SDL validation failed:")
for error in validation['errors']:
print(f"Error: {error}")
if validation['warnings']:
print("Warnings:")
for warning in validation['warnings']:
print(f"Warning: {warning}")
get_service_logs()¶
Get logs from a service running in your deployment using WebSocket streaming.
def get_service_logs(provider_endpoint: str, lease_id: Dict[str, Any],
service_name: str = None, tail: int = 100,
cert_pem: str = None, key_pem: str = None, timeout: int = 30) -> List[str]
Required parameters:
provider_endpoint
(str
): Provider's HTTPS endpoint (e.g., "provider.akash.network:8443")lease_id
(Dict
): Lease identifier dictionary with the following fields:owner
(str
): Deployment owner addressdseq
(int
): Deployment sequence numbergseq
(int
): Group sequence numberoseq
(int
): Order sequence numberprovider
(str
): Provider address
Optional parameters:
service_name
(str
): Specific service name to get logs from (default: None for all services)tail
(int
): Number of recent log lines to retrieve (default: 100)cert_pem
(str
): Client certificate in PEM format for mTLS authentication (default: auto-load from client store)key_pem
(str
): Client private key in PEM format for mTLS authentication (default: auto-load from client store)timeout
(int
): WebSocket connection timeout in seconds (default: 30)
Returns: List[str]
- List of log lines from the service(s)
⚠️ Note: Not all providers capture application logs. Ensure your deployment produces logs and the provider supports log capture before expecting results.
Example:
from akash import AkashClient, AkashWallet
client = AkashClient("https://akash-rpc.polkachu.com:443")
wallet = AkashWallet.from_mnemonic("your mnemonic here")
lease_id = {
"owner": wallet.address,
"dseq": 12345,
"gseq": 1,
"oseq": 1,
"provider": "akash1..."
}
logs = client.deployment.get_service_logs(
provider_endpoint="provider.europlots.com:8443",
lease_id=lease_id,
service_name="web",
tail=50
)
print(f"Retrieved {len(logs)} log lines:")
for line in logs:
print(line)
stream_service_logs()¶
Stream logs from a service running in your deployment in real-time using WebSocket streaming.
def stream_service_logs(provider_endpoint: str, lease_id: Dict[str, Any],
service_name: str = None, follow: bool = True,
cert_pem: str = None, key_pem: str = None, timeout: int = 300)
Required parameters:
provider_endpoint
(str
): Provider's HTTPS endpoint (e.g., "provider.akash.network:8443")lease_id
(Dict
): Lease identifier dictionary with the following fields:owner
(str
): Deployment owner addressdseq
(int
): Deployment sequence numbergseq
(int
): Group sequence numberoseq
(int
): Order sequence numberprovider
(str
): Provider address
Optional parameters:
service_name
(str
): Specific service name to stream logs from (default: None for all services)follow
(bool
): Whether to keep streaming new logs liketail -f
(default: True)cert_pem
(str
): Client certificate in PEM format for mTLS authentication (default: auto-load from client store)key_pem
(str
): Client private key in PEM format for mTLS authentication (default: auto-load from client store)timeout
(int
): Maximum streaming duration in seconds (default: 300)
Yields: str
- Log lines as they are received from the service in real-time
⚠️ Note: Not all providers capture application logs. The stream will continue indefinitely if follow=True
until timeout or manual interruption.
Deployment owners can access their deployment logs with valid certificates.
Example:
from akash import AkashClient, AkashWallet
client = AkashClient("https://akash-rpc.polkachu.com:443")
wallet = AkashWallet.from_mnemonic("your mnemonic here")
lease_id = {
"owner": wallet.address,
"dseq": 12345,
"gseq": 1,
"oseq": 1,
"provider": "akash1..."
}
print("Streaming logs (Ctrl+C to stop)...")
for log_line in client.deployment.stream_service_logs(
provider_endpoint="provider.europlots.com:8443",
lease_id=lease_id,
service_name="web",
follow=True
):
print(f"[LOG] {log_line}")
Deployment log functionality requires mTLS authentication using client certificates. See Certificate documentation for setup details. The SDK automatically uses certificates from the client store.
Manifest submission¶
submit_manifest_to_provider()¶
Submit deployment manifest to provider with automatic protocol fallback.
def submit_manifest_to_provider(provider_endpoint: str, lease_id: Dict[str, Any],
manifest: Dict[str, Any], cert_pem: str = None,
key_pem: str = None, timeout: int = 60,
use_http: bool = False) -> Dict[str, Any]
Required parameters:
provider_endpoint
(str
): Provider endpoint (e.g., "provider.akash.network:8443")lease_id
(Dict[str, Any]
): Lease identifier dictionaryowner
(str
, required): Deployment owner addressdseq
(str
orint
, required): Deployment sequence numbergseq
(str
orint
, required): Group sequence numberoseq
(str
orint
, required): Order sequence numberprovider
(str
, required): Provider address
manifest
(Dict[str, Any]
): Manifest data dictionaryversion
(str
, required): Manifest version (e.g., "2.0")services
(Dict[str, Dict]
, required): Service definitions (keys are service names like "web", "api")image
(str
, required): Docker image nameexpose
(List[Dict]
, optional): Port exposure configurationport
(int
, required): Internal port numberas
(int
, required): External port numberto
(List[Dict]
, required): Exposure targetsglobal
(bool
, optional): Expose globallyservice
(str
, optional): Expose to specific service
env
(List[str]
, optional): Environment variablescommand
(List[str]
, optional): Container command override
profiles
(Dict[str, Dict]
, required): Resource and placement profilescompute
(Dict[str, Dict]
, required): Compute resource profiles (keys are profile names like "web", "api")resources
(Dict
, required): Resource requirementscpu
(Dict
, required): CPU specificationunits
(str
orfloat
, required): CPU units
memory
(Dict
, required): Memory specificationquantity
(Dict
, required): Memory quantityval
(str
, required): Memory value (e.g., "256Mi")
storage
(Dict
orList[Dict]
, required): Storage specificationquantity
(Dict
, required): Storage quantity (if single)val
(str
, required): Storage value (e.g., "1Gi")
size
(str
, optional): Storage size (alternative format)attributes
(Dict
, optional): Storage attributespersistent
(str
, optional): Persistence settingclass
(str
, optional): Storage class
placement
(Dict[str, Dict]
, required): Placement profiles (keys are placement names like "akash")attributes
(Dict[str, str]
, optional): Provider attribute requirements as key-value pairspricing
(Dict[str, Dict]
, required): Pricing for services (keys are service names)denom
(str
, required): Token denomination (e.g., "uakt")amount
(str
, required): Price amount
deployment
(Dict[str, Dict]
, required): Deployment mapping (keys are service names)profile
(str
, required): Compute profile namecount
(int
, required): Instance count
Optional parameters:
cert_pem
(str
): Client certificate PEM (default: auto-load from cert store)key_pem
(str
): Client private key PEM (default: auto-load from cert store)timeout
(int
): Request timeout in seconds (default: 60)use_http
(bool
): Force HTTP method instead of gRPC first (default: False)
Returns: Dict[str, Any]
- Submission result with the following fields:
status
(str
): Submission status ("success" or "error")method
(str
, optional): Protocol used ("gRPC" or "HTTP")error
(str
, optional): Error message if status is "error"
Example:
from akash import AkashClient, AkashWallet
client = AkashClient("https://akash-rpc.polkachu.com:443")
wallet = AkashWallet.from_mnemonic("your mnemonic here")
lease_id = {
"owner": wallet.address,
"dseq": 12345,
"gseq": 1,
"oseq": 1,
"provider": provider_address
}
manifest = {
"version": "2.0",
"services": {
"web": {
"image": "nginx:latest",
"expose": [{"port": 80, "as": 80, "to": [{"global": True}]}]
}
},
"profiles": {
"compute": {
"web": {
"resources": {
"cpu": {"units": 0.1},
"memory": {"quantity": {"val": "512Mi"}},
"storage": {"quantity": {"val": "1Gi"}}
}
}
},
"placement": {
"akash": {"pricing": {"web": {"denom": "uakt", "amount": "100"}}}
}
},
"deployment": {
"web": {"akash": {"profile": "web", "count": 1}}
}
}
result = client.deployment.submit_manifest_to_provider(
provider_endpoint=provider_endpoint,
lease_id=lease_id,
manifest=manifest
)
if result["status"] == "success":
print(f"Manifest submitted via {result.get('method', 'gRPC')}")
else:
print(f"Manifest submission failed: {result['error']}")
Protocol fallback:
The method automatically tries gRPC first, then falls back to HTTP PUT if gRPC fails. This matches the console approach and provides maximum provider compatibility. Set use_http=True
to force HTTP method.
Authentication:
Like log functions, manifest submission requires mTLS certificates. The SDK automatically uses certificates from the client store created during create_certificate()
.