Skip to content

Inventory module

The inventory module provides off-chain provider resource inventory functionality for the Akash Network, enabling queries of cluster and node resource availability from provider endpoints.

Off-Chain Module

The inventory module queries provider endpoints via the discovery module to retrieve cluster resource information. It does not interact with the Akash blockchain directly and has no transaction functions.

InventoryClient Class

from akash import AkashClient

client = AkashClient("https://akash-rpc.polkachu.com:443")

inventory = client.inventory

Queries

query_cluster_inventory()

Queries cluster inventory from a provider's status endpoint.

def query_cluster_inventory(provider_endpoint: str) -> Dict[str, Any]

Required parameters:

  • provider_endpoint (str): Provider endpoint (e.g., "provider.europlots.com:8443")

Returns: Dict - Cluster inventory data with the following fields:

  • status (str): Query status ("success" or "error")
  • nodes (List[Dict]): List of nodes with resource information
    • name (str): Node name
    • resources (Dict): Node resource information
      • cpu (Dict): CPU resources
        • allocatable (str): Allocatable CPU
        • available (str): Available CPU
        • capacity (str): Total CPU capacity
      • memory (Dict): Memory resources
        • allocatable (str): Allocatable memory
        • available (str): Available memory
        • capacity (str): Total memory capacity
      • gpu (Dict): GPU resources
        • allocatable (str): Allocatable GPU
        • available (str): Available GPU
        • capacity (str): Total GPU capacity
      • ephemeral_storage (Dict): Storage resources
        • allocatable (str): Allocatable storage
        • available (str): Available storage
        • capacity (str): Total storage capacity
    • capabilities (Dict): Node capabilities
      • storage_classes (List[str]): Available storage classes
  • storage (List[Dict]): List of storage classes available
    • class (str): Storage class name
    • size (str): Available storage size
  • provider (str): Provider endpoint queried
  • error (str, optional): Error message if status is "error"

Note: This method uses the discovery module to query the provider's status endpoint and extract inventory data from the cluster status information.

Basic example:

from akash import AkashClient

client = AkashClient("https://akash-rpc.polkachu.com:443")

inventory = client.inventory.query_cluster_inventory("provider.europlots.com:8443")

if inventory["status"] == "success":
    print(f"Provider cluster inventory:")
    print(f"Nodes: {len(inventory['nodes'])}")
    print(f"Storage classes: {len(inventory['storage'])}")

    for node in inventory['nodes']:
        resources = node['resources']
        print(f"Node {node['name']}:")
        print(f"CPU: {resources['cpu']['available']}/{resources['cpu']['capacity']} millicores")
        print(f"Memory: {resources['memory']['available']}/{resources['memory']['capacity']} bytes")
        print(f"GPU: {resources['gpu']['available']}/{resources['gpu']['capacity']} units")
else:
    print(f"Failed to query inventory: {inventory['error']}")

Provider analysis with blockchain data example:

from akash import AkashClient

client = AkashClient("https://akash-rpc.polkachu.com:443")

def analyze_provider(endpoint, provider_address):
    blockchain_info = client.provider.get_provider(provider_address)
    if blockchain_info:
        print(f"Provider details:")
        print(f"Owner: {blockchain_info.get('owner', 'Unknown')}")
        print(f"Email: {blockchain_info.get('info', {}).get('email', 'Not provided')}")
        print(f"Website: {blockchain_info.get('info', {}).get('website', 'Not provided')}")

        attributes = blockchain_info.get('attributes', [])
        if attributes:
            print(f"Provider attributes ({len(attributes)} total):")
            for attr in attributes:
                key = attr.get('key', 'unknown')
                value = attr.get('value', 'unknown')
                print(f"  {key} = {value}")

    status_result = client.discovery.get_provider_status(endpoint, use_https=True)
    if status_result.get('status') == 'success':
        provider_status = status_result.get('provider_status', {})

        if 'cluster' in provider_status:
            cluster = provider_status['cluster']
            leases = cluster.get('leases', 0)
            print(f"Total leases: {leases}")

    inventory = client.inventory.query_cluster_inventory(endpoint)
    if inventory.get('status') == 'success':
        nodes = inventory.get('nodes', [])
        print(f"Cluster inventory:")
        print(f"  Nodes: {len(nodes)}")

        total_cpu = 0
        total_memory = 0

        for i, node in enumerate(nodes):
            print(f"  Node {i+1}: {node.get('name', 'unknown')}")
            resources = node.get('resources', {})

            cpu_info = resources.get('cpu', {})
            memory_info = resources.get('memory', {})

            cpu_available = cpu_info.get('available', '0')
            memory_available = memory_info.get('available', '0')

            print(f"    CPU: {cpu_available} millicores available")
            print(f"    Memory: {int(memory_available) // (1024**3):.1f} GB" if memory_available.isdigit() else f"    Memory: {memory_available}")

            if cpu_available.isdigit():
                total_cpu += int(cpu_available)
            if memory_available.isdigit():
                total_memory += int(memory_available)

            capabilities = node.get('capabilities', {})
            storage_classes = capabilities.get('storage_classes', [])
            if storage_classes:
                print(f"    Storage classes: {', '.join(storage_classes)}")

        print(f"  Cluster totals:")
        print(f"    Total CPU: {total_cpu:,} millicores ({total_cpu/1000:.1f} cores)")
        print(f"    Total Memory: {total_memory // (1024**3):.1f} GB")

analyze_provider(
    "provider.europlots.com:8443",
    "akash18ga02jzaq8cw52anyhzkwta5wygufgu6rsz6xc"
)

query_node_inventory()

Queries individual node inventory information from a provider's cluster.

def query_node_inventory(provider_endpoint: str, node_name: str = "") -> Dict[str, Any]

Required parameters:

  • provider_endpoint (str): Provider endpoint

Optional parameters:

  • node_name (str, optional): Specific node name to query (default: "")

Returns: Dict - Node inventory information with the following fields:

  • status (str): Query status ("success" or "error")
  • name (str): Node name
  • resources (Dict): Node resources with cpu, memory, gpu, ephemeral_storage
    • cpu (Dict): CPU resources
      • allocatable (str): Allocatable CPU
      • available (str): Available CPU
      • capacity (str): Total CPU capacity
    • memory (Dict): Memory resources
      • allocatable (str): Allocatable memory
      • available (str): Available memory
      • capacity (str): Total memory capacity
    • gpu (Dict): GPU resources
      • allocatable (str): Allocatable GPU
      • available (str): Available GPU
      • capacity (str): Total GPU capacity
    • ephemeral_storage (Dict): Storage resources
      • allocatable (str): Allocatable storage
      • available (str): Available storage
      • capacity (str): Total storage capacity
  • capabilities (Dict): Node capabilities including storage_classes
    • storage_classes (List[str]): Available storage classes
  • error (str, optional): Error message if status is "error"

Note: If no node_name is specified, returns the first/primary node from the cluster. If a specific node_name is provided, searches for a matching node.

Example:

from akash import AkashClient

client = AkashClient("https://akash-rpc.polkachu.com:443")

node_inventory = client.inventory.query_node_inventory("provider.example.com:8443")

if node_inventory["status"] == "success":
    print(f"Node inventory:")
    print(f"Node: {node_inventory['name']}")

    resources = node_inventory['resources']
    print(f"CPU: {resources['cpu']['available']}/{resources['cpu']['capacity']} millicores")
    print(f"Memory: {resources['memory']['available']}/{resources['memory']['capacity']} bytes")
    print(f"GPU: {resources['gpu']['available']}/{resources['gpu']['capacity']} units")
    print(f"Storage: {resources['ephemeral_storage']['available']}/{resources['ephemeral_storage']['capacity']} bytes")

    capabilities = node_inventory['capabilities']
    print(f"Storage classes: {capabilities['storage_classes']}")
else:
    print(f"Failed to query node inventory: {node_inventory['error']}")

specific_node = client.inventory.query_node_inventory(
    "provider.example.com:8443", 
    node_name="worker-1"
)

aggregate_inventory_data()

Aggregates inventory data from multiple provider query results.

def aggregate_inventory_data(inventory_results: list) -> Dict[str, Any]

Required parameters:

  • inventory_results (List[Dict]): List of inventory query results from query_cluster_inventory()
    • provider (str): Provider endpoint
    • status (str): Query status
    • nodes (List[Dict]): Node information
      • name (str): Node name
      • resources (Dict): Resource information
        • cpu (Dict): CPU resources
          • allocatable (str): Allocatable CPU
          • allocated (str): Allocated CPU
          • capacity (str): Total CPU capacity
        • memory (Dict): Memory resources
          • allocatable (str): Allocatable memory
          • allocated (str): Allocated memory
          • capacity (str): Total memory capacity
        • gpu (Dict): GPU resources
          • allocatable (str): Allocatable GPU
          • allocated (str): Allocated GPU
          • capacity (str): Total GPU capacity
        • ephemeral_storage (Dict): Storage resources
          • allocatable (str): Allocatable storage
          • allocated (str): Allocated storage
          • capacity (str): Total storage capacity
      • capabilities (Dict): Node capabilities
        • storage_classes (List[str]): Available storage classes
    • storage (List[Dict]): Storage information
      • class (str): Storage class name

Returns: Dict - Aggregated inventory statistics with the following fields:

  • status (str): Aggregation status ("success" or "error")
  • summary (Dict): Summary statistics
    • total_providers_queried (int): Total number of providers queried
    • successful_providers (int): Number of successful provider queries
    • failed_providers (int): Number of failed provider queries
    • total_nodes (int): Total nodes across all providers
    • storage_classes (List[str]): Unique storage classes found
  • resources (Dict): Aggregated resource totals for cpu, memory, gpu, ephemeral_storage
    • cpu (Dict): Total CPU resources
      • capacity (int): Total CPU capacity
      • allocatable (int): Total allocatable CPU
      • allocated (int): Total allocated CPU
    • memory (Dict): Total memory resources
      • capacity (int): Total memory capacity
      • allocatable (int): Total allocatable memory
      • allocated (int): Total allocated memory
    • gpu (Dict): Total GPU resources
      • capacity (int): Total GPU capacity
      • allocatable (int): Total allocatable GPU
      • allocated (int): Total allocated GPU
    • ephemeral_storage (Dict): Total storage resources
      • capacity (int): Total storage capacity
      • allocatable (int): Total allocatable storage
      • allocated (int): Total allocated storage
  • errors (List[Dict]): List of errors from failed provider queries
    • provider (str): Provider endpoint
    • error (str): Error message

Example:

from akash import AkashClient

client = AkashClient("https://akash-rpc.polkachu.com:443")

providers = [
    "provider1.example.com:8443",
    "provider2.example.com:8443", 
    "provider3.example.com:8443"
]

inventory_results = []
for provider in providers:
    result = client.inventory.query_cluster_inventory(provider)
    inventory_results.append(result)

aggregated = client.inventory.aggregate_inventory_data(inventory_results)

if aggregated["status"] == "success":
    summary = aggregated["summary"]
    resources = aggregated["resources"]

    print(f"Aggregated network inventory:")
    print(f"Providers: {summary['successful_providers']}/{summary['total_providers_queried']}")
    print(f"Total nodes: {summary['total_nodes']}")
    print(f"Storage classes: {summary['storage_classes']}")

    print(f"CPU: {resources['cpu']['capacity']} millicores capacity")
    print(f"Memory: {resources['memory']['capacity']} bytes capacity")
    print(f"GPU: {resources['gpu']['capacity']} units capacity")
else:
    print(f"Aggregation failed: {aggregated['error']}")

Usage examples

Provider resource monitoring

from akash import AkashClient

def monitor_provider_resources(client, provider_endpoint: str):
    print(f"Monitoring provider: {provider_endpoint}")

    cluster_inv = client.inventory.query_cluster_inventory(provider_endpoint)

    if cluster_inv["status"] != "success":
        print(f"Failed to query cluster: {cluster_inv['error']}")
        return

    nodes = cluster_inv["nodes"]
    storage = cluster_inv["storage"]

    print(f"Resource overview:")
    print(f"Nodes: {len(nodes)}")
    print(f"Storage classes: {len(storage)}")

    total_cpu_capacity = 0
    total_cpu_available = 0
    total_memory_capacity = 0
    total_memory_available = 0
    total_gpu_capacity = 0
    total_gpu_available = 0

    for node in nodes:
        resources = node["resources"]

        cpu_cap = int(resources["cpu"]["capacity"])
        cpu_avail = int(resources["cpu"]["available"])
        total_cpu_capacity += cpu_cap
        total_cpu_available += cpu_avail

        mem_cap = int(resources["memory"]["capacity"])
        mem_avail = int(resources["memory"]["available"])
        total_memory_capacity += mem_cap
        total_memory_available += mem_avail

        gpu_cap = int(resources["gpu"]["capacity"])
        gpu_avail = int(resources["gpu"]["available"])
        total_gpu_capacity += gpu_cap
        total_gpu_available += gpu_avail

    def calc_utilization(capacity, available):
        if capacity == 0:
            return 0
        return ((capacity - available) / capacity) * 100

    cpu_util = calc_utilization(total_cpu_capacity, total_cpu_available)
    mem_util = calc_utilization(total_memory_capacity, total_memory_available)
    gpu_util = calc_utilization(total_gpu_capacity, total_gpu_available)

    print(f"Resource utilization:")
    print(f"CPU: {cpu_util:.1f}% ({total_cpu_capacity - total_cpu_available:,} / {total_cpu_capacity:,} millicores)")
    print(f"Memory: {mem_util:.1f}% ({(total_memory_capacity - total_memory_available) // (1024**3):.1f} / {total_memory_capacity // (1024**3):.1f} GB)")
    print(f"GPU: {gpu_util:.1f}% ({total_gpu_capacity - total_gpu_available} / {total_gpu_capacity} units)")

client = AkashClient("https://akash-rpc.polkachu.com:443")
monitor_provider_resources(client, "provider.example.com:8443")

Network capacity analysis

from akash import AkashClient

def analyze_network_capacity(client, provider_list: list):
    print(f"Analyzing capacity across {len(provider_list)} providers...")

    inventory_results = []
    for provider in provider_list:
        inventory = client.inventory.query_cluster_inventory(provider)
        inventory_results.append(inventory)

    aggregated = client.inventory.aggregate_inventory_data(inventory_results)

    if aggregated["status"] != "success":
        print(f"Aggregation failed: {aggregated['error']}")
        return

    summary = aggregated["summary"]
    resources = aggregated["resources"]

    total_cpu_cores = int(resources["cpu"]["capacity"]) / 1000
    available_cpu_cores = int(resources["cpu"]["allocatable"]) / 1000
    total_memory_gb = int(resources["memory"]["capacity"]) / (1024**3)
    available_memory_gb = int(resources["memory"]["allocatable"]) / (1024**3)

    print(f"Network capacity analysis:")
    print(f"Responding providers: {summary['successful_providers']}/{summary['total_providers_queried']}")
    print(f"Total nodes: {summary['total_nodes']}")
    print(f"Storage classes: {summary['storage_classes']}")
    print(f"")
    print(f"CPU: {available_cpu_cores:,.1f} / {total_cpu_cores:,.1f} cores available")
    print(f"Memory: {available_memory_gb:,.1f} / {total_memory_gb:,.1f} GB available") 

    cpu_util = ((int(resources["cpu"]["capacity"]) - int(resources["cpu"]["allocatable"])) / int(resources["cpu"]["capacity"]) * 100) if int(resources["cpu"]["capacity"]) > 0 else 0
    mem_util = ((int(resources["memory"]["capacity"]) - int(resources["memory"]["allocatable"])) / int(resources["memory"]["capacity"]) * 100) if int(resources["memory"]["capacity"]) > 0 else 0

    print(f"Utilization rates:")
    print(f"CPU: {cpu_util:.1f}%")
    print(f"Memory: {mem_util:.1f}%")

    if aggregated["errors"]:
        print(f"Failed providers:")
        for error in aggregated["errors"]:
            print(f"{error['provider']}: {error['error']}")

client = AkashClient("https://akash-rpc.polkachu.com:443")
provider_list = [
    "provider1.example.com:8443",
    "provider2.example.com:8443",
    "provider3.example.com:8443"
]

analyze_network_capacity(client, provider_list)