Deployment module¶
The deployment module provides functionality for creating and managing deployments on the Akash Network.
Transaction parameters
All transaction functions support optional parameters like fee_amount, gas_limit, and gas_adjustment. See Transaction parameters for details.
DeploymentClient class¶
from akash import AkashClient
client = AkashClient("https://akash-rpc.polkachu.com:443")
deployment = client.deployment
Transactions¶
create_deployment()¶
Create deployment from SDL YAML.
def create_deployment(wallet, sdl_yaml: str, deposit: str = "5000000",
deposit_denom: str = "uakt", version: str = None,
memo: str = "", fee_amount: str = None, gas_limit: int = None,
gas_adjustment: float = 1.2, use_simulation: bool = True) -> BroadcastResult
Required parameters:
wallet(AkashWallet): Wallet instance for signing transactionsdl_yaml(str): SDL YAML content as string
Optional parameters:
deposit(str): Deposit amount (default: "5000000")deposit_denom(str): Deposit token denomination (default: "uakt")version(str): Deployment version (calculated from SDL if not provided)memo(str): Transaction memo (default: "")fee_amount(str): Transaction fee amount in uakt (default: None)gas_limit(int): Gas limit override (default: None)gas_adjustment(float): Multiplier for gas estimation (default: 1.2)use_simulation(bool): Enable gas simulation (default: True)
Returns: BroadcastResult - Transaction result with the following fields:
tx_hash(str): Transaction hashcode(int): Response code (0 for success)raw_log(str): Raw transaction log/error messagesuccess(bool): Whether transaction succeededevents(List[Dict]): Transaction eventstype(str): Event typeattributes(List[Dict]): Event attributeskey(str): Attribute keyvalue(str): Attribute value
Example:
from akash import AkashClient, AkashWallet
wallet = AkashWallet.from_mnemonic("your mnemonic here")
client = AkashClient("https://akash-rpc.polkachu.com:443")
sdl_yaml = '''
version: "2.0"
services:
web:
image: nginx:latest
expose:
- port: 80
as: 80
to:
- global: true
profiles:
compute:
web:
resources:
cpu:
units: 0.1
memory:
size: 128Mi
storage:
size: 512Mi
placement:
akash:
pricing:
web:
denom: uakt
amount: 1000
deployment:
web:
akash:
profile: web
count: 1
'''
result = client.deployment.create_deployment(
wallet=wallet,
sdl_yaml=sdl_yaml,
deposit="5000000"
)
if result.success:
print(f"Deployment created: {result.tx_hash}")
else:
print(f"Failed to create deployment: {result.raw_log}")
⚠️ Note: To access deployment logs later, you'll need a client certificate. Create one with client.cert.create_certificate(wallet) before deploying. See Certificate documentation for details.
Using USDC deposits:
The SDK supports USDC and AKT. The deposit_denom parameter must match the denom specified in your SDL pricing section.
from akash import AkashClient, AkashWallet
wallet = AkashWallet.from_mnemonic("your mnemonic here")
client = AkashClient("https://akash-rpc.polkachu.com:443", "akashnet-2")
USDC_DENOM = "ibc/170C677610AC31DF0904FFE09CD3B5C657492170E7E52372E48756B71E56F2F1"
sdl_yaml = '''
version: "2.0"
services:
web:
image: nginx:latest
expose:
- port: 80
as: 80
to:
- global: true
profiles:
compute:
web:
resources:
cpu:
units: 0.1
memory:
size: 128Mi
storage:
size: 512Mi
placement:
akash:
pricing:
web:
denom: ibc/170C677610AC31DF0904FFE09CD3B5C657492170E7E52372E48756B71E56F2F1
amount: 500000
deployment:
web:
akash:
profile: web
count: 1
'''
result = client.deployment.create_deployment(
wallet=wallet,
sdl_yaml=sdl_yaml,
deposit="500000",
deposit_denom=USDC_DENOM,
fee_amount="7000"
)
if result.success:
print(f"Deployment created: {result.tx_hash}")
else:
print(f"Failed to create deployment: {result.raw_log}")
⚠️ Important: The deposit_denom must exactly match the denomination in your SDL pricing section. Transaction fees (fee_amount) are always paid in uakt regardless of deployment denomination.
update_deployment()¶
Update deployment with SDL YAML content.
def update_deployment(wallet, sdl_yaml: str, owner: str = None, dseq: int = None,
memo: str = "", fee_amount: str = None, gas_limit: int = None,
gas_adjustment: float = 1.2, use_simulation: bool = True) -> BroadcastResult
Required parameters:
wallet(AkashWallet): Wallet instance for signing transactionsdl_yaml(str): SDL YAML content as stringdseq(int): Deployment sequence number
Optional parameters:
owner(str): Deployment owner address (default: wallet.address)memo(str): Transaction memo (default: "")fee_amount(str): Transaction fee amount in uakt (default: None)gas_limit(int): Gas limit override (default: None)gas_adjustment(float): Multiplier for gas estimation (default: 1.2)use_simulation(bool): Enable gas simulation (default: True)
Returns: BroadcastResult - Transaction result with the following fields:
tx_hash(str): Transaction hashcode(int): Response code (0 for success)raw_log(str): Raw transaction log/error messagesuccess(bool): Whether transaction succeededevents(List[Dict]): Transaction eventstype(str): Event typeattributes(List[Dict]): Event attributeskey(str): Attribute keyvalue(str): Attribute value
Example:
from akash import AkashClient, AkashWallet
wallet = AkashWallet.from_mnemonic("your mnemonic here")
client = AkashClient("https://akash-rpc.polkachu.com:443")
updated_sdl_yaml = '''
version: "2.0"
services:
web:
image: nginx:1.21
command: ["/bin/sh", "-c", "echo 'Updated container' && nginx -g 'daemon off;'"]
env:
- NGINX_WORKER_PROCESSES=auto
- UPDATE_VERSION=v2
expose:
- port: 80
as: 80
to:
- global: true
profiles:
compute:
web:
resources:
cpu:
units: 0.2
memory:
size: 256Mi
storage:
size: 1Gi
placement:
akash:
pricing:
web:
denom: uakt
amount: 1500
deployment:
web:
akash:
profile: web
count: 1
'''
result = client.deployment.update_deployment(
wallet=wallet,
sdl_yaml=updated_sdl_yaml,
dseq=12345
)
if result.success:
print(f"Deployment updated: {result.tx_hash}")
else:
print(f"Update failed: {result.raw_log}")
close_deployment()¶
Close deployment and return deposit.
def close_deployment(wallet, owner: str, dseq: int,
memo: str = "", fee_amount: str = None,
gas_limit: int = None, gas_adjustment: float = 1.2,
use_simulation: bool = True) -> BroadcastResult
Required parameters:
wallet(AkashWallet): Wallet instance for signing transactionowner(str): Deployment owner addressdseq(int): Deployment sequence number
Optional parameters:
memo(str): Transaction memo (default: "")fee_amount(str): Fee amount in uakt (default: None)gas_limit(int): Gas limit override (default: None)gas_adjustment(float): Multiplier for simulated gas (default: 1.2)use_simulation(bool): Enable/disable gas simulation (default: True)
Returns: BroadcastResult - Transaction result with the following fields:
tx_hash(str): Transaction hashcode(int): Response code (0 for success)raw_log(str): Raw transaction log/error messagesuccess(bool): Whether transaction succeededevents(List[Dict]): Transaction eventstype(str): Event typeattributes(List[Dict]): Event attributeskey(str): Attribute keyvalue(str): Attribute value
Example:
from akash import AkashClient, AkashWallet
wallet = AkashWallet.from_mnemonic("your mnemonic here")
client = AkashClient("https://akash-rpc.polkachu.com:443")
result = client.deployment.close_deployment(
wallet=wallet,
owner=wallet.address,
dseq=12345
)
if result.success:
print(f"Deployment closed: {result.tx_hash}")
deposit_deployment()¶
Add funds to deployment deposit account.
def deposit_deployment(wallet, owner: str, dseq: int, amount: str,
denom: str = "uakt", memo: str = "",
fee_amount: str = None, gas_limit: int = None,
gas_adjustment: float = 1.2, use_simulation: bool = True) -> BroadcastResult
Required parameters:
wallet(AkashWallet): Wallet for signing transactionowner(str): Deployment owner addressdseq(int): Deployment sequence numberamount(str): Amount to deposit
Optional parameters:
denom(str): Token denomination (default: "uakt")memo(str): Transaction memo (default: "")fee_amount(str): Transaction fee amount in uakt (default: None)gas_limit(int): Gas limit override (default: None)gas_adjustment(float): Multiplier for gas estimation (default: 1.2)use_simulation(bool): Enable gas simulation (default: True)
Returns: BroadcastResult - Transaction result with the following fields:
tx_hash(str): Transaction hashcode(int): Response code (0 for success)raw_log(str): Raw transaction log/error messagesuccess(bool): Whether transaction succeededevents(List[Dict]): Transaction eventstype(str): Event typeattributes(List[Dict]): Event attributeskey(str): Attribute keyvalue(str): Attribute value
Example:
from akash import AkashClient, AkashWallet
wallet = AkashWallet.from_mnemonic("your mnemonic here")
client = AkashClient("https://akash-rpc.polkachu.com:443")
result = client.deployment.deposit_deployment(
wallet=wallet,
owner=wallet.address,
dseq=12345,
amount="2000000"
)
if result.success:
print(f"Deposit added: {result.tx_hash}")
close_group()¶
Close a specific group within a deployment.
def close_group(wallet, owner: str, dseq: int, gseq: int,
memo: str = "", fee_amount: str = None, gas_limit: int = None,
gas_adjustment: float = 1.2, use_simulation: bool = True) -> BroadcastResult
Required parameters:
wallet(AkashWallet): Wallet for signing transactionowner(str): Deployment owner addressdseq(int): Deployment sequence numbergseq(int): Group sequence number
Optional parameters:
memo(str): Transaction memo (default: "")fee_amount(str): Transaction fee amount in uakt (default: None)gas_limit(int): Gas limit override (default: None)gas_adjustment(float): Multiplier for gas estimation (default: 1.2)use_simulation(bool): Enable gas simulation (default: True)
Returns: BroadcastResult - Transaction result with the following fields:
tx_hash(str): Transaction hashcode(int): Response code (0 for success)raw_log(str): Raw transaction log/error messagesuccess(bool): Whether transaction succeededevents(List[Dict]): Transaction eventstype(str): Event typeattributes(List[Dict]): Event attributeskey(str): Attribute keyvalue(str): Attribute value
Example:
from akash import AkashClient, AkashWallet
wallet = AkashWallet.from_mnemonic("your mnemonic here")
client = AkashClient("https://akash-rpc.polkachu.com:443")
result = client.deployment.close_group(
wallet=wallet,
owner=wallet.address,
dseq=12345,
gseq=1
)
if result.success:
print(f"Group closed: {result.tx_hash}")
pause_group()¶
Pause a specific group within a deployment.
def pause_group(wallet, owner: str, dseq: int, gseq: int,
memo: str = "", fee_amount: str = None, gas_limit: int = None,
gas_adjustment: float = 1.2, use_simulation: bool = True) -> BroadcastResult
Required parameters:
wallet(AkashWallet): Wallet for signing transactionowner(str): Deployment owner addressdseq(int): Deployment sequence numbergseq(int): Group sequence number
Optional parameters:
memo(str): Transaction memo (default: "")fee_amount(str): Transaction fee amount in uakt (default: None)gas_limit(int): Gas limit override (default: None)gas_adjustment(float): Multiplier for gas estimation (default: 1.2)use_simulation(bool): Enable gas simulation (default: True)
Returns: BroadcastResult - Transaction result with the following fields:
tx_hash(str): Transaction hashcode(int): Response code (0 for success)raw_log(str): Raw transaction log/error messagesuccess(bool): Whether transaction succeededevents(List[Dict]): Transaction eventstype(str): Event typeattributes(List[Dict]): Event attributeskey(str): Attribute keyvalue(str): Attribute value
Example:
from akash import AkashClient, AkashWallet
wallet = AkashWallet.from_mnemonic("your mnemonic here")
client = AkashClient("https://akash-rpc.polkachu.com:443")
result = client.deployment.pause_group(
wallet=wallet,
owner=wallet.address,
dseq=12345,
gseq=1
)
if result.success:
print(f"Group paused: {result.tx_hash}")
start_group()¶
Start a specific group within a deployment.
def start_group(wallet, owner: str, dseq: int, gseq: int,
memo: str = "", fee_amount: str = None, gas_limit: int = None,
gas_adjustment: float = 1.2, use_simulation: bool = True) -> BroadcastResult
Required parameters:
wallet(AkashWallet): Wallet for signing transactionowner(str): Deployment owner addressdseq(int): Deployment sequence numbergseq(int): Group sequence number
Optional parameters:
memo(str): Transaction memo (default: "")fee_amount(str): Transaction fee amount in uakt (default: None)gas_limit(int): Gas limit override (default: None)gas_adjustment(float): Multiplier for gas estimation (default: 1.2)use_simulation(bool): Enable gas simulation (default: True)
Returns: BroadcastResult - Transaction result with the following fields:
tx_hash(str): Transaction hashcode(int): Response code (0 for success)raw_log(str): Raw transaction log/error messagesuccess(bool): Whether transaction succeededevents(List[Dict]): Transaction eventstype(str): Event typeattributes(List[Dict]): Event attributeskey(str): Attribute keyvalue(str): Attribute value
Example:
from akash import AkashClient, AkashWallet
wallet = AkashWallet.from_mnemonic("your mnemonic here")
client = AkashClient("https://akash-rpc.polkachu.com:443")
result = client.deployment.start_group(
wallet=wallet,
owner=wallet.address,
dseq=12345,
gseq=1
)
if result.success:
print(f"Group started: {result.tx_hash}")
Queries¶
get_deployments()¶
List deployments with optional filtering.
def get_deployments(owner: Optional[str] = None, limit: Optional[int] = None,
offset: Optional[int] = None, count_total: bool = False) -> List[Dict[str, Any]]
Optional parameters:
owner(str): Deployment owner filter (default: None)limit(int): Maximum number of results to return (default: None)offset(int): Number of results to skip (default: None)count_total(bool): Include total count in response (default: False)
Returns: List[Dict[str, Any]] - List of deployments with the following structure:
deployment(Dict): Deployment informationdeployment_id(Dict): Deployment identifierowner(str): Deployment owner addressdseq(int): Deployment sequence number
state(int): Deployment state (1=active, 2=closed)version(str): Deployment version hashcreated_at(int): Block height when deployment was created
groups(List[Dict]): List of groups in deploymentgroup_id(Dict): Group identifierowner(str): Deployment owner addressdseq(int): Deployment sequence numbergseq(int): Group sequence number
state(int): Group state (1=open, 2=paused, 3=insufficient_funds, 4=closed)name(str): Group namecreated_at(int): Block height when group was created
escrow_account(Dict): Escrow account informationbalance(str): Account balance in AKTraw_balance(str): Account balance in uakt (micro AKT)state(int): Account state (1=active, 2=closed)transferred(Dict): Transferred amountdenom(str): Token denominationamount(str): Transferred amount
settled_at(int): Settlement timestamp
Example:
from akash import AkashClient
client = AkashClient("https://akash-rpc.polkachu.com:443")
deployments = client.deployment.get_deployments(limit=10)
print(f"Found {len(deployments)} deployments")
for deployment in deployments:
dseq = deployment['deployment']['deployment_id']['dseq']
owner = deployment['deployment']['deployment_id']['owner']
state = deployment['deployment']['state']
print(f"DSEQ {dseq} (Owner: {owner}) - State: {state}")
my_deployments = client.deployment.get_deployments(
owner="akash1qnnhhgzxj24f2kld5yhy4v4h4s9r295ak5gjw4",
limit=5
)
print(f"Found {len(my_deployments)} deployments for owner")
get_deployment()¶
Get deployment information.
Required parameters:
owner(str): Deployment owner addressdseq(int): Deployment sequence number
Returns: Dict[str, Any] - Deployment information with the following structure:
deployment(Dict): Deployment informationdeployment_id(Dict): Deployment identifierowner(str): Deployment owner addressdseq(int): Deployment sequence number
state(int): Deployment state (1=active, 2=closed)version(str): Deployment version hashcreated_at(int): Block height when deployment was created
groups(List[Dict]): List of groups in deploymentgroup_id(Dict): Group identifierowner(str): Deployment owner addressdseq(int): Deployment sequence numbergseq(int): Group sequence number
state(int): Group state (1=open, 2=paused, 3=insufficient_funds, 4=closed)name(str): Group name
escrow_account(Dict): Escrow account informationbalance(str): Account balance in AKTraw_balance(str): Account balance in uakt (micro AKT)state(int): Account state (1=active, 2=closed)transferred(Dict): Transferred amountdenom(str): Token denominationamount(str): Transferred amount
settled_at(int): Settlement timestamp
Example:
from akash import AkashClient
client = AkashClient("https://akash-rpc.polkachu.com:443")
deployment = client.deployment.get_deployment(
owner="akash1qnnhhgzxj24f2kld5yhy4v4h4s9r295ak5gjw4",
dseq=12345
)
if deployment:
dep_info = deployment['deployment']
dep_id = dep_info['deployment_id']
escrow = deployment['escrow_account']
print(f"Deployment {dep_id['dseq']}:")
print(f"Owner: {dep_id['owner']}")
print(f"State: {dep_info['state']} (1=active, 2=closed)")
print(f"Version: {dep_info['version']}")
print(f"Escrow Balance: {escrow['balance']} AKT")
print(f"Groups: {len(deployment['groups'])}")
for group in deployment['groups']:
gseq = group['group_id']['gseq']
state = group['state']
name = group['name']
print(f"Group {gseq}: {name} (state {state})")
else:
print("Deployment not found")
Utils¶
validate_sdl()¶
Validate SDL (Stack Definition Language) structure and contents.
Required parameters:
sdl(Dict[str, Any]): SDL configuration dictionary
Returns: Dict[str, Any] - Validation result with the following structure:
valid(bool): Whether SDL is validerrors(List[str]): List of validation errorswarnings(List[str]): List of validation warningsdetails(Dict): Detailed validation informationversion(str): SDL version from inputservices(List[str]): List of service names from services sectionresources(Dict): Resource specifications by service nameservice_name(Dict): Service resource detailscpu(str): CPU units value or "N/A"memory(str): Memory size value or "N/A"storage(str): Storage size value or "N/A"
pricing(Dict): Pricing specifications by placement nameplacement_name(Dict): Placement pricing detailsservice_name(str): Formatted pricing string "amount denom"
deployment(Dict): Deployment configuration by service nameservice_name(Dict): Placement configuration for serviceplacement_name(Dict): Placement detailsprofile(str): Profile name to usecount(int): Number of instances
Example:
from akash import AkashClient
client = AkashClient("https://akash-rpc.polkachu.com:443")
sdl = {
"version": "2.0",
"services": {
"web": {
"image": "nginx:latest",
"expose": [{"port": 80, "as": 80, "to": [{"global": True}]}]
}
},
"profiles": {
"compute": {
"web": {
"resources": {
"cpu": {"units": "0.1"},
"memory": {"size": "128Mi"},
"storage": {"size": "512Mi"}
}
}
},
"placement": {
"akash": {
"pricing": {
"web": {"denom": "uakt", "amount": "1000"}
}
}
}
},
"deployment": {
"web": {"akash": {"profile": "web", "count": 1}}
}
}
validation = client.deployment.validate_sdl(sdl)
if validation['valid']:
print("SDL is valid")
print(f"Services: {validation['details']['services']}")
else:
print("SDL validation failed:")
for error in validation['errors']:
print(f"Error: {error}")
if validation['warnings']:
print("Warnings:")
for warning in validation['warnings']:
print(f"Warning: {warning}")
get_service_logs()¶
Get logs from a service running in your deployment using WebSocket streaming.
def get_service_logs(provider_endpoint: str, lease_id: Dict[str, Any],
service_name: str = None, tail: int = 100,
cert_pem: str = None, key_pem: str = None, timeout: int = 30) -> List[str]
Required parameters:
provider_endpoint(str): Provider's HTTPS endpoint (e.g., "provider.akash.network:8443")lease_id(Dict): Lease identifier dictionary with the following fields:owner(str): Deployment owner addressdseq(int): Deployment sequence numbergseq(int): Group sequence numberoseq(int): Order sequence numberprovider(str): Provider address
Optional parameters:
service_name(str): Specific service name to get logs from (default: None for all services)tail(int): Number of recent log lines to retrieve (default: 100)cert_pem(str): Client certificate in PEM format for mTLS authentication (default: auto-load from client store)key_pem(str): Client private key in PEM format for mTLS authentication (default: auto-load from client store)timeout(int): WebSocket connection timeout in seconds (default: 30)
Returns: List[str] - List of log lines from the service(s)
⚠️ Note: Not all providers capture application logs. Ensure your deployment produces logs and the provider supports log capture before expecting results.
Example:
from akash import AkashClient, AkashWallet
client = AkashClient("https://akash-rpc.polkachu.com:443")
wallet = AkashWallet.from_mnemonic("your mnemonic here")
lease_id = {
"owner": wallet.address,
"dseq": 12345,
"gseq": 1,
"oseq": 1,
"provider": "akash1..."
}
logs = client.deployment.get_service_logs(
provider_endpoint="provider.europlots.com:8443",
lease_id=lease_id,
service_name="web",
tail=50
)
print(f"Retrieved {len(logs)} log lines:")
for line in logs:
print(line)
stream_service_logs()¶
Stream logs from a service running in your deployment in real-time using WebSocket streaming.
def stream_service_logs(provider_endpoint: str, lease_id: Dict[str, Any],
service_name: str = None, follow: bool = True,
cert_pem: str = None, key_pem: str = None, timeout: int = 300)
Required parameters:
provider_endpoint(str): Provider's HTTPS endpoint (e.g., "provider.akash.network:8443")lease_id(Dict): Lease identifier dictionary with the following fields:owner(str): Deployment owner addressdseq(int): Deployment sequence numbergseq(int): Group sequence numberoseq(int): Order sequence numberprovider(str): Provider address
Optional parameters:
service_name(str): Specific service name to stream logs from (default: None for all services)follow(bool): Whether to keep streaming new logs liketail -f(default: True)cert_pem(str): Client certificate in PEM format for mTLS authentication (default: auto-load from client store)key_pem(str): Client private key in PEM format for mTLS authentication (default: auto-load from client store)timeout(int): Maximum streaming duration in seconds (default: 300)
Yields: str - Log lines as they are received from the service in real-time
⚠️ Note: Not all providers capture application logs. The stream will continue indefinitely if follow=True until timeout or manual interruption.
Deployment owners can access their deployment logs with valid certificates.
Example:
from akash import AkashClient, AkashWallet
client = AkashClient("https://akash-rpc.polkachu.com:443")
wallet = AkashWallet.from_mnemonic("your mnemonic here")
lease_id = {
"owner": wallet.address,
"dseq": 12345,
"gseq": 1,
"oseq": 1,
"provider": "akash1..."
}
print("Streaming logs (Ctrl+C to stop)...")
for log_line in client.deployment.stream_service_logs(
provider_endpoint="provider.europlots.com:8443",
lease_id=lease_id,
service_name="web",
follow=True
):
print(f"[LOG] {log_line}")
Deployment log functionality requires mTLS authentication using client certificates. See Certificate documentation for setup details. The SDK automatically uses certificates from the client store.
Manifest submission¶
submit_manifest_to_provider()¶
Submit deployment manifest to provider with automatic protocol fallback.
def submit_manifest_to_provider(provider_endpoint: str, lease_id: Dict[str, Any],
manifest: Dict[str, Any], cert_pem: str = None,
key_pem: str = None, timeout: int = 60,
use_http: bool = False) -> Dict[str, Any]
Required parameters:
provider_endpoint(str): Provider endpoint (e.g., "provider.akash.network:8443")lease_id(Dict[str, Any]): Lease identifier dictionaryowner(str, required): Deployment owner addressdseq(strorint, required): Deployment sequence numbergseq(strorint, required): Group sequence numberoseq(strorint, required): Order sequence numberprovider(str, required): Provider address
manifest(Dict[str, Any]): Manifest data dictionaryversion(str, required): Manifest version (e.g., "2.0")services(Dict[str, Dict], required): Service definitions (keys are service names like "web", "api")image(str, required): Docker image nameexpose(List[Dict], optional): Port exposure configurationport(int, required): Internal port numberas(int, required): External port numberto(List[Dict], required): Exposure targetsglobal(bool, optional): Expose globallyservice(str, optional): Expose to specific service
env(List[str], optional): Environment variablescommand(List[str], optional): Container command override
profiles(Dict[str, Dict], required): Resource and placement profilescompute(Dict[str, Dict], required): Compute resource profiles (keys are profile names like "web", "api")resources(Dict, required): Resource requirementscpu(Dict, required): CPU specificationunits(strorfloat, required): CPU units
memory(Dict, required): Memory specificationquantity(Dict, required): Memory quantityval(str, required): Memory value (e.g., "256Mi")
storage(DictorList[Dict], required): Storage specificationquantity(Dict, required): Storage quantity (if single)val(str, required): Storage value (e.g., "1Gi")
size(str, optional): Storage size (alternative format)attributes(Dict, optional): Storage attributespersistent(str, optional): Persistence settingclass(str, optional): Storage class
placement(Dict[str, Dict], required): Placement profiles (keys are placement names like "akash")attributes(Dict[str, str], optional): Provider attribute requirements as key-value pairspricing(Dict[str, Dict], required): Pricing for services (keys are service names)denom(str, required): Token denomination (e.g., "uakt")amount(str, required): Price amount
deployment(Dict[str, Dict], required): Deployment mapping (keys are service names)profile(str, required): Compute profile namecount(int, required): Instance count
Optional parameters:
cert_pem(str): Client certificate PEM (default: auto-load from cert store)key_pem(str): Client private key PEM (default: auto-load from cert store)timeout(int): Request timeout in seconds (default: 60)use_http(bool): Force HTTP method instead of gRPC first (default: False)
Returns: Dict[str, Any] - Submission result with the following fields:
status(str): Submission status ("success" or "error")method(str, optional): Protocol used ("gRPC" or "HTTP")error(str, optional): Error message if status is "error"
Example:
from akash import AkashClient, AkashWallet
client = AkashClient("https://akash-rpc.polkachu.com:443")
wallet = AkashWallet.from_mnemonic("your mnemonic here")
lease_id = {
"owner": wallet.address,
"dseq": 12345,
"gseq": 1,
"oseq": 1,
"provider": provider_address
}
manifest = {
"version": "2.0",
"services": {
"web": {
"image": "nginx:latest",
"expose": [{"port": 80, "as": 80, "to": [{"global": True}]}]
}
},
"profiles": {
"compute": {
"web": {
"resources": {
"cpu": {"units": 0.1},
"memory": {"quantity": {"val": "512Mi"}},
"storage": {"quantity": {"val": "1Gi"}}
}
}
},
"placement": {
"akash": {"pricing": {"web": {"denom": "uakt", "amount": "100"}}}
}
},
"deployment": {
"web": {"akash": {"profile": "web", "count": 1}}
}
}
result = client.deployment.submit_manifest_to_provider(
provider_endpoint=provider_endpoint,
lease_id=lease_id,
manifest=manifest
)
if result["status"] == "success":
print(f"Manifest submitted via {result.get('method', 'gRPC')}")
else:
print(f"Manifest submission failed: {result['error']}")
Protocol fallback:
The method automatically tries gRPC first, then falls back to HTTP PUT if gRPC fails. This matches the console approach and provides maximum provider compatibility. Set use_http=True to force HTTP method.
Authentication:
Like log functions, manifest submission requires mTLS certificates. The SDK automatically uses certificates from the client store created during create_certificate().