Inventory module¶
The inventory module provides off-chain provider resource inventory functionality for the Akash Network, enabling queries of cluster and node resource availability from provider endpoints.
Off-Chain Module
The inventory module queries provider endpoints via the discovery module to retrieve cluster resource information. It does not interact with the Akash blockchain directly and has no transaction functions.
InventoryClient Class¶
from akash import AkashClient
client = AkashClient("https://akash-rpc.polkachu.com:443")
inventory = client.inventory
Queries¶
query_cluster_inventory()¶
Queries cluster inventory from a provider's status endpoint.
Required parameters:
provider_endpoint
(str
): Provider endpoint (e.g., "provider.europlots.com:8443")
Returns: Dict
- Cluster inventory data with the following fields:
status
(str
): Query status ("success" or "error")nodes
(List[Dict]
): List of nodes with resource informationname
(str
): Node nameresources
(Dict
): Node resource informationcpu
(Dict
): CPU resourcesallocatable
(str
): Allocatable CPUavailable
(str
): Available CPUcapacity
(str
): Total CPU capacity
memory
(Dict
): Memory resourcesallocatable
(str
): Allocatable memoryavailable
(str
): Available memorycapacity
(str
): Total memory capacity
gpu
(Dict
): GPU resourcesallocatable
(str
): Allocatable GPUavailable
(str
): Available GPUcapacity
(str
): Total GPU capacity
ephemeral_storage
(Dict
): Storage resourcesallocatable
(str
): Allocatable storageavailable
(str
): Available storagecapacity
(str
): Total storage capacity
capabilities
(Dict
): Node capabilitiesstorage_classes
(List[str]
): Available storage classes
storage
(List[Dict]
): List of storage classes availableclass
(str
): Storage class namesize
(str
): Available storage size
provider
(str
): Provider endpoint queriederror
(str
, optional): Error message if status is "error"
Note: This method uses the discovery module to query the provider's status endpoint and extract inventory data from the cluster status information.
Basic example:
from akash import AkashClient
client = AkashClient("https://akash-rpc.polkachu.com:443")
inventory = client.inventory.query_cluster_inventory("provider.europlots.com:8443")
if inventory["status"] == "success":
print(f"Provider cluster inventory:")
print(f"Nodes: {len(inventory['nodes'])}")
print(f"Storage classes: {len(inventory['storage'])}")
for node in inventory['nodes']:
resources = node['resources']
print(f"Node {node['name']}:")
print(f"CPU: {resources['cpu']['available']}/{resources['cpu']['capacity']} millicores")
print(f"Memory: {resources['memory']['available']}/{resources['memory']['capacity']} bytes")
print(f"GPU: {resources['gpu']['available']}/{resources['gpu']['capacity']} units")
else:
print(f"Failed to query inventory: {inventory['error']}")
Provider analysis with blockchain data example:
from akash import AkashClient
client = AkashClient("https://akash-rpc.polkachu.com:443")
def analyze_provider(endpoint, provider_address):
blockchain_info = client.provider.get_provider(provider_address)
if blockchain_info:
print(f"Provider details:")
print(f"Owner: {blockchain_info.get('owner', 'Unknown')}")
print(f"Email: {blockchain_info.get('info', {}).get('email', 'Not provided')}")
print(f"Website: {blockchain_info.get('info', {}).get('website', 'Not provided')}")
attributes = blockchain_info.get('attributes', [])
if attributes:
print(f"Provider attributes ({len(attributes)} total):")
for attr in attributes:
key = attr.get('key', 'unknown')
value = attr.get('value', 'unknown')
print(f" {key} = {value}")
status_result = client.discovery.get_provider_status(endpoint, use_https=True)
if status_result.get('status') == 'success':
provider_status = status_result.get('provider_status', {})
if 'cluster' in provider_status:
cluster = provider_status['cluster']
leases = cluster.get('leases', 0)
print(f"Total leases: {leases}")
inventory = client.inventory.query_cluster_inventory(endpoint)
if inventory.get('status') == 'success':
nodes = inventory.get('nodes', [])
print(f"Cluster inventory:")
print(f" Nodes: {len(nodes)}")
total_cpu = 0
total_memory = 0
for i, node in enumerate(nodes):
print(f" Node {i+1}: {node.get('name', 'unknown')}")
resources = node.get('resources', {})
cpu_info = resources.get('cpu', {})
memory_info = resources.get('memory', {})
cpu_available = cpu_info.get('available', '0')
memory_available = memory_info.get('available', '0')
print(f" CPU: {cpu_available} millicores available")
print(f" Memory: {int(memory_available) // (1024**3):.1f} GB" if memory_available.isdigit() else f" Memory: {memory_available}")
if cpu_available.isdigit():
total_cpu += int(cpu_available)
if memory_available.isdigit():
total_memory += int(memory_available)
capabilities = node.get('capabilities', {})
storage_classes = capabilities.get('storage_classes', [])
if storage_classes:
print(f" Storage classes: {', '.join(storage_classes)}")
print(f" Cluster totals:")
print(f" Total CPU: {total_cpu:,} millicores ({total_cpu/1000:.1f} cores)")
print(f" Total Memory: {total_memory // (1024**3):.1f} GB")
analyze_provider(
"provider.europlots.com:8443",
"akash18ga02jzaq8cw52anyhzkwta5wygufgu6rsz6xc"
)
query_node_inventory()¶
Queries individual node inventory information from a provider's cluster.
Required parameters:
provider_endpoint
(str
): Provider endpoint
Optional parameters:
node_name
(str
, optional): Specific node name to query (default: "")
Returns: Dict
- Node inventory information with the following fields:
status
(str
): Query status ("success" or "error")name
(str
): Node nameresources
(Dict
): Node resources with cpu, memory, gpu, ephemeral_storagecpu
(Dict
): CPU resourcesallocatable
(str
): Allocatable CPUavailable
(str
): Available CPUcapacity
(str
): Total CPU capacity
memory
(Dict
): Memory resourcesallocatable
(str
): Allocatable memoryavailable
(str
): Available memorycapacity
(str
): Total memory capacity
gpu
(Dict
): GPU resourcesallocatable
(str
): Allocatable GPUavailable
(str
): Available GPUcapacity
(str
): Total GPU capacity
ephemeral_storage
(Dict
): Storage resourcesallocatable
(str
): Allocatable storageavailable
(str
): Available storagecapacity
(str
): Total storage capacity
capabilities
(Dict
): Node capabilities including storage_classesstorage_classes
(List[str]
): Available storage classes
error
(str
, optional): Error message if status is "error"
Note: If no node_name is specified, returns the first/primary node from the cluster. If a specific node_name is provided, searches for a matching node.
Example:
from akash import AkashClient
client = AkashClient("https://akash-rpc.polkachu.com:443")
node_inventory = client.inventory.query_node_inventory("provider.example.com:8443")
if node_inventory["status"] == "success":
print(f"Node inventory:")
print(f"Node: {node_inventory['name']}")
resources = node_inventory['resources']
print(f"CPU: {resources['cpu']['available']}/{resources['cpu']['capacity']} millicores")
print(f"Memory: {resources['memory']['available']}/{resources['memory']['capacity']} bytes")
print(f"GPU: {resources['gpu']['available']}/{resources['gpu']['capacity']} units")
print(f"Storage: {resources['ephemeral_storage']['available']}/{resources['ephemeral_storage']['capacity']} bytes")
capabilities = node_inventory['capabilities']
print(f"Storage classes: {capabilities['storage_classes']}")
else:
print(f"Failed to query node inventory: {node_inventory['error']}")
specific_node = client.inventory.query_node_inventory(
"provider.example.com:8443",
node_name="worker-1"
)
aggregate_inventory_data()¶
Aggregates inventory data from multiple provider query results.
Required parameters:
inventory_results
(List[Dict]
): List of inventory query results from query_cluster_inventory()provider
(str
): Provider endpointstatus
(str
): Query statusnodes
(List[Dict]
): Node informationname
(str
): Node nameresources
(Dict
): Resource informationcpu
(Dict
): CPU resourcesallocatable
(str
): Allocatable CPUallocated
(str
): Allocated CPUcapacity
(str
): Total CPU capacity
memory
(Dict
): Memory resourcesallocatable
(str
): Allocatable memoryallocated
(str
): Allocated memorycapacity
(str
): Total memory capacity
gpu
(Dict
): GPU resourcesallocatable
(str
): Allocatable GPUallocated
(str
): Allocated GPUcapacity
(str
): Total GPU capacity
ephemeral_storage
(Dict
): Storage resourcesallocatable
(str
): Allocatable storageallocated
(str
): Allocated storagecapacity
(str
): Total storage capacity
capabilities
(Dict
): Node capabilitiesstorage_classes
(List[str]
): Available storage classes
storage
(List[Dict]
): Storage informationclass
(str
): Storage class name
Returns: Dict
- Aggregated inventory statistics with the following fields:
status
(str
): Aggregation status ("success" or "error")summary
(Dict
): Summary statisticstotal_providers_queried
(int
): Total number of providers queriedsuccessful_providers
(int
): Number of successful provider queriesfailed_providers
(int
): Number of failed provider queriestotal_nodes
(int
): Total nodes across all providersstorage_classes
(List[str]
): Unique storage classes found
resources
(Dict
): Aggregated resource totals for cpu, memory, gpu, ephemeral_storagecpu
(Dict
): Total CPU resourcescapacity
(int
): Total CPU capacityallocatable
(int
): Total allocatable CPUallocated
(int
): Total allocated CPU
memory
(Dict
): Total memory resourcescapacity
(int
): Total memory capacityallocatable
(int
): Total allocatable memoryallocated
(int
): Total allocated memory
gpu
(Dict
): Total GPU resourcescapacity
(int
): Total GPU capacityallocatable
(int
): Total allocatable GPUallocated
(int
): Total allocated GPU
ephemeral_storage
(Dict
): Total storage resourcescapacity
(int
): Total storage capacityallocatable
(int
): Total allocatable storageallocated
(int
): Total allocated storage
errors
(List[Dict]
): List of errors from failed provider queriesprovider
(str
): Provider endpointerror
(str
): Error message
Example:
from akash import AkashClient
client = AkashClient("https://akash-rpc.polkachu.com:443")
providers = [
"provider1.example.com:8443",
"provider2.example.com:8443",
"provider3.example.com:8443"
]
inventory_results = []
for provider in providers:
result = client.inventory.query_cluster_inventory(provider)
inventory_results.append(result)
aggregated = client.inventory.aggregate_inventory_data(inventory_results)
if aggregated["status"] == "success":
summary = aggregated["summary"]
resources = aggregated["resources"]
print(f"Aggregated network inventory:")
print(f"Providers: {summary['successful_providers']}/{summary['total_providers_queried']}")
print(f"Total nodes: {summary['total_nodes']}")
print(f"Storage classes: {summary['storage_classes']}")
print(f"CPU: {resources['cpu']['capacity']} millicores capacity")
print(f"Memory: {resources['memory']['capacity']} bytes capacity")
print(f"GPU: {resources['gpu']['capacity']} units capacity")
else:
print(f"Aggregation failed: {aggregated['error']}")
Usage examples¶
Provider resource monitoring¶
from akash import AkashClient
def monitor_provider_resources(client, provider_endpoint: str):
print(f"Monitoring provider: {provider_endpoint}")
cluster_inv = client.inventory.query_cluster_inventory(provider_endpoint)
if cluster_inv["status"] != "success":
print(f"Failed to query cluster: {cluster_inv['error']}")
return
nodes = cluster_inv["nodes"]
storage = cluster_inv["storage"]
print(f"Resource overview:")
print(f"Nodes: {len(nodes)}")
print(f"Storage classes: {len(storage)}")
total_cpu_capacity = 0
total_cpu_available = 0
total_memory_capacity = 0
total_memory_available = 0
total_gpu_capacity = 0
total_gpu_available = 0
for node in nodes:
resources = node["resources"]
cpu_cap = int(resources["cpu"]["capacity"])
cpu_avail = int(resources["cpu"]["available"])
total_cpu_capacity += cpu_cap
total_cpu_available += cpu_avail
mem_cap = int(resources["memory"]["capacity"])
mem_avail = int(resources["memory"]["available"])
total_memory_capacity += mem_cap
total_memory_available += mem_avail
gpu_cap = int(resources["gpu"]["capacity"])
gpu_avail = int(resources["gpu"]["available"])
total_gpu_capacity += gpu_cap
total_gpu_available += gpu_avail
def calc_utilization(capacity, available):
if capacity == 0:
return 0
return ((capacity - available) / capacity) * 100
cpu_util = calc_utilization(total_cpu_capacity, total_cpu_available)
mem_util = calc_utilization(total_memory_capacity, total_memory_available)
gpu_util = calc_utilization(total_gpu_capacity, total_gpu_available)
print(f"Resource utilization:")
print(f"CPU: {cpu_util:.1f}% ({total_cpu_capacity - total_cpu_available:,} / {total_cpu_capacity:,} millicores)")
print(f"Memory: {mem_util:.1f}% ({(total_memory_capacity - total_memory_available) // (1024**3):.1f} / {total_memory_capacity // (1024**3):.1f} GB)")
print(f"GPU: {gpu_util:.1f}% ({total_gpu_capacity - total_gpu_available} / {total_gpu_capacity} units)")
client = AkashClient("https://akash-rpc.polkachu.com:443")
monitor_provider_resources(client, "provider.example.com:8443")
Network capacity analysis¶
from akash import AkashClient
def analyze_network_capacity(client, provider_list: list):
print(f"Analyzing capacity across {len(provider_list)} providers...")
inventory_results = []
for provider in provider_list:
inventory = client.inventory.query_cluster_inventory(provider)
inventory_results.append(inventory)
aggregated = client.inventory.aggregate_inventory_data(inventory_results)
if aggregated["status"] != "success":
print(f"Aggregation failed: {aggregated['error']}")
return
summary = aggregated["summary"]
resources = aggregated["resources"]
total_cpu_cores = int(resources["cpu"]["capacity"]) / 1000
available_cpu_cores = int(resources["cpu"]["allocatable"]) / 1000
total_memory_gb = int(resources["memory"]["capacity"]) / (1024**3)
available_memory_gb = int(resources["memory"]["allocatable"]) / (1024**3)
print(f"Network capacity analysis:")
print(f"Responding providers: {summary['successful_providers']}/{summary['total_providers_queried']}")
print(f"Total nodes: {summary['total_nodes']}")
print(f"Storage classes: {summary['storage_classes']}")
print(f"")
print(f"CPU: {available_cpu_cores:,.1f} / {total_cpu_cores:,.1f} cores available")
print(f"Memory: {available_memory_gb:,.1f} / {total_memory_gb:,.1f} GB available")
cpu_util = ((int(resources["cpu"]["capacity"]) - int(resources["cpu"]["allocatable"])) / int(resources["cpu"]["capacity"]) * 100) if int(resources["cpu"]["capacity"]) > 0 else 0
mem_util = ((int(resources["memory"]["capacity"]) - int(resources["memory"]["allocatable"])) / int(resources["memory"]["capacity"]) * 100) if int(resources["memory"]["capacity"]) > 0 else 0
print(f"Utilization rates:")
print(f"CPU: {cpu_util:.1f}%")
print(f"Memory: {mem_util:.1f}%")
if aggregated["errors"]:
print(f"Failed providers:")
for error in aggregated["errors"]:
print(f"{error['provider']}: {error['error']}")
client = AkashClient("https://akash-rpc.polkachu.com:443")
provider_list = [
"provider1.example.com:8443",
"provider2.example.com:8443",
"provider3.example.com:8443"
]
analyze_network_capacity(client, provider_list)