Discovery module¶
The discovery module provides functionality for discovering and monitoring Akash Network providers using a GRPC-first, HTTP fallback approach, along with network compatibility checks and service discovery.
DiscoveryClient class¶
from akash import AkashClient
client = AkashClient("https://akash-rpc.polkachu.com:443")
discovery = client.discovery
Provider discovery¶
get_provider_status()¶
Get provider status using GRPC-first approach with HTTP fallback.
Required parameters:
provider_uri(str): Provider's URI or blockchain address (e.g., 'provider.akash.network:8443' or 'akash1provider...')
Optional parameters:
use_https(bool, optional): Whether to use HTTPS for HTTP fallback (default: True)
Protocol selection:
- If
provider_uricontains ":" or starts with "http", treats it as a direct endpoint URI - For direct endpoints: attempts gRPC on ports 8443/8444, then falls back to HTTP(S)
- If
provider_uriis a blockchain address: Uses GRPC client with version detection and protocol selection - All connections use insecure SSL to handle self-signed certificates
Returns: Dict[str, Any] with the following fields:
status(str): Operation status ("success" or "failed")provider_status(Dict): Provider status information (if successful)address(str): Provider addresscluster_public_hostname(str): Cluster hostnameavailable_resources(Dict): Available resourcescpu(int): Available CPU in millicoresmemory(int): Available memory in bytesephemeral_storage(int): Available storage in bytesgpu(int): Available GPU count
pending_resources(Dict): Pending resourcescpu(int): Pending CPU in millicoresmemory(int): Pending memory in bytesephemeral_storage(int): Pending storage in bytesgpu(int): Pending GPU count
provider(str): Provider URIcluster(Dict): Cluster informationleases(Dict): Lease informationactive(int): Number of active leases
inventory(Dict): Resource inventorycluster(Dict): Cluster resource informationcpu(Dict): CPU informationval(int): CPU value in millicores
memory(Dict): Memory informationval(int): Memory value in bytes
ephemeral_storage(Dict): Storage informationval(int): Storage value in bytes
gpu(Dict): GPU informationval(int): GPU count
nodes(List[Dict], optional): Per-node resource details
reservations(Dict): Reserved resourcespending(Dict): Pending reservationsresources(Dict): Pending resource allocationscpu(Dict): Pending CPUval(int): CPU value in millicores
memory(Dict): Pending memoryval(int): Memory value in bytes
ephemeral_storage(Dict): Pending storageval(int): Storage value in bytes
gpu(Dict): Pending GPUval(int): GPU count
resources(Dict): Aggregated resource informationcpu(int): Total CPU in millicoresmemory(int): Total memory in bytesstorage(int): Total storage in bytesgpu(int): Total GPU count
bid_engine(Dict): Bid engine status (provider-specific data)manifest(Dict): Manifest information (provider-specific data)errors(List): Any errors reported by providertimestamp(str): Status timestamppublic_hostnames(List[str]): Public hostnamesmethod(str): Connection method used ("GRPC" or "HTTP")status(str): Provider status ("online")
error(str): Error message (if failed)
Example:
from akash import AkashClient
client = AkashClient("https://akash-rpc.polkachu.com:443")
status = client.discovery.get_provider_status("akash1provider123...")
status = client.discovery.get_provider_status("provider.akash.network:8443")
if status["status"] == "success":
provider_status = status["provider_status"]
print(f"Provider: {provider_status['provider']}")
print(f"Connection method: {provider_status['method']}")
print(f"Active leases: {provider_status['cluster']['leases']['active']}")
resources = provider_status['cluster']['resources']
print(f"available CPU: {resources['cpu']}")
print(f"available memory: {resources['memory']}")
print(f"available storage: {resources['storage']}")
print(f"available GPU: {resources['gpu']}")
else:
print(f"Failed to get status: {status['error']}")
get_providers_status()¶
Get status information for multiple providers.
Required parameters:
provider_uris(List[str]): List of provider URIs to check
Optional parameters:
use_https(bool, optional): Whether to use HTTPS for connections (default: True)
Returns: Dict[str, Any] with the following fields:
status(str): Operation status ("success" or "failed")discovery_results(Dict): Status check results (if successful)total_providers(int): Total number of providers checkedsuccessful_connections(int): Number of successful connectionsfailed_connections(int): Number of failed connectionsproviders(Dict): Per-provider results keyed by provider URI[provider_uri](Dict): Result for specific providerendpoint(str): Provider endpointaccessible(bool): Whether provider is accessiblestatus(Dict, optional): Provider status (if accessible, same structure asprovider_statusfromget_provider_status())error(str, optional): Error message (if not accessible)
summary(Dict): Aggregated summarytotal_resources(Dict): Total available resourcescpu(int): Total CPU in millicoresmemory(int): Total memory in bytesstorage(int): Total storage in bytesgpu(int): Total GPU count
total_leases(int): Total active leasesonline_providers(int): Number of online providers
error(str): Error message (if failed)
Example:
from akash import AkashClient
client = AkashClient("https://akash-rpc.polkachu.com:443")
providers_result = client.discovery.get_providers(limit=100)
provider_uris = []
if providers_result["status"] == "success":
provider_uris = [provider.get("host_uri") for provider in providers_result.get("active_providers", []) if provider.get("host_uri")]
result = client.discovery.get_providers_status(provider_uris)
if result["status"] == "success":
results = result["discovery_results"]
print(f"Checked {results['total_providers']} providers")
print(f"Successfully connected to {results['successful_connections']} providers")
print(f"Failed to connect to {results['failed_connections']} providers")
summary = results["summary"]
print(f"Total online providers: {summary['online_providers']}")
print(f"Total active leases: {summary['total_leases']}")
print(f"Total CPU available: {summary['total_resources']['cpu']}")
print(f"Total memory available: {summary['total_resources']['memory']}")
for provider_uri, provider_info in results["providers"].items():
if provider_info.get("accessible"):
print(f"✓ {provider_uri}: Online")
else:
print(f"✗ {provider_uri}: {provider_info.get('error', 'Offline')}")
specific_providers = [
"provider1.akash.network:8443",
"provider2.akash.network:8443"
]
result = client.discovery.get_providers_status(specific_providers)
get_provider_capabilities()¶
Get provider capabilities and inventory via gRPC-first with HTTP fallback.
Required parameters:
provider_uri(str): Provider's URI
Optional parameters:
use_https(bool, optional): Whether to use HTTPS (default: True)
Returns: Dict[str, Any] with the following fields:
status(str): Operation status ("success" or "failed")capabilities(Dict): Provider capabilities (if successful)storage(Dict): Ephemeral storage capabilities from inventoryval(int): Storage value in bytes
cpu(Dict): CPU capabilities from inventoryval(int): CPU value in millicores
memory(Dict): Memory capabilities from inventoryval(int): Memory value in bytes
gpu(Dict): GPU capabilities from inventoryval(int): GPU count
error(str): Error message (if failed)
Example:
from akash import AkashClient
client = AkashClient("https://akash-rpc.polkachu.com:443")
caps = client.discovery.get_provider_capabilities("provider.akash.network:8443")
if caps["status"] == "success":
capabilities = caps["capabilities"]
print("Provider capabilities:")
print(f"CPU: {capabilities['cpu']['val']} millicores")
print(f"Memory: {capabilities['memory']['val']} bytes")
print(f"Storage: {capabilities['storage']['val']} bytes")
print(f"GPU: {capabilities['gpu']['val']} units")
get_provider_resources()¶
Get provider resource availability via gRPC-first with HTTP fallback.
Required parameters:
provider_uri(str): Provider's URI
Optional parameters:
use_https(bool, optional): Whether to use HTTPS (default: True)
Returns: Dict[str, Any] with the following fields:
status(str): Operation status ("success" or "failed")resources(Dict):available(Dict): Available resourcescpu(int): Available CPU in millicores (aggregated from all cluster nodes)memory(int): Available memory in bytes (aggregated from all cluster nodes)ephemeral_storage(int): Available ephemeral storage in bytes (aggregated from all cluster nodes)gpu(int): Available GPU count (aggregated from all cluster nodes)
pending(Dict): Pending/reserved resourcescpu(int): Pending CPU in millicoresmemory(int): Pending memory in bytesephemeral_storage(int): Pending ephemeral storage in bytesgpu(int): Pending GPU count
error(str): Error message (if failed)
Example:
from akash import AkashClient
client = AkashClient("https://akash-rpc.polkachu.com:443")
resources = client.discovery.get_provider_resources("provider.akash.network:8443")
if resources["status"] == "success":
available = resources["resources"]["available"]
pending = resources["resources"]["pending"]
print("Available resources:")
print(f"CPU: {available['cpu']} millicores")
print(f"Memory: {available['memory']} bytes ({available['memory'] / (1024**3):.2f} GB)")
print(f"Ephemeral Storage: {available['ephemeral_storage']} bytes ({available['ephemeral_storage'] / (1024**3):.2f} GB)")
print(f"GPU: {available['gpu']} units")
print("Pending resources:")
print(f"CPU: {pending['cpu']} millicores")
print(f"Memory: {pending['memory']} bytes")
get_provider_capacity()¶
Check if provider has sufficient capacity for required resources via gRPC-first with HTTP fallback.
def get_provider_capacity(provider_uri: str, required_resources: Dict[str, Any], use_https: bool = True) -> Dict[str, Any]
Required parameters:
provider_uri(str): Provider's URIrequired_resources(Dict[str, Any]): Required resourcescpu(str): CPU requirement (supports "m" suffix for millicores, e.g., "100m" or "1")memory(str): Memory requirement (supports Ki/Mi/Gi suffixes, e.g., "512Mi")storage(str): Storage requirement (maps to ephemeral_storage, supports Ki/Mi/Gi suffixes)
Optional parameters:
use_https(bool): Whether to use HTTPS (default: True)
Note: The capacity check automatically handles unit conversions. CPU values are converted between cores and millicores (1 core = 1000m). Storage requests using "storage" are automatically mapped to the provider's ephemeral_storage resources.
Returns: Dict[str, Any] with the following fields:
status(str): Operation status ("success" or "failed")has_capacity(bool): Whether provider has sufficient capacity (if successful)sufficient(Dict): Per-resource sufficiency checkcpu(bool): CPU sufficiencymemory(bool): Memory sufficiencystorage(bool): Storage sufficiency
error(str): Error message (if failed)
Example:
from akash import AkashClient
client = AkashClient("https://akash-rpc.polkachu.com:443")
required = {
"cpu": "1000m", # 1 CPU core (1000 millicores)
"memory": "1Gi", # 1GB RAM
"storage": "5Gi" # 5GB storage (maps to ephemeral_storage)
}
capacity = client.discovery.get_provider_capacity(
"provider.akash.network:8443",
required
)
if capacity["status"] == "success":
if capacity["has_capacity"]:
print("✓ Provider has sufficient capacity")
for resource, sufficient in capacity["sufficient"].items():
status = "✓" if sufficient else "✗"
print(f"{status} {resource}")
else:
print("✗ Provider does not have sufficient capacity")
for resource, sufficient in capacity["sufficient"].items():
if not sufficient:
print(f"Insufficient {resource}")
Queries¶
These methods query the blockchain for provider information.
get_providers()¶
Query all providers from the blockchain with categorization.
def get_providers(limit: Optional[int] = 100, offset: Optional[int] = None, count_total: bool = False) -> Dict[str, Any]
Optional parameters:
limit(int): Maximum number of providers to return (default: 100)offset(int): Number of results to skip (default: None)count_total(bool): Include total count in response (default: False)
Returns: Dict[str, Any] with the following fields:
status(str): Query status ("success" or "error")total(int): Total number of providerssummary(Dict): Summary statisticsactive(int): Providers with endpointsinactive(int): Providers without endpoints
providers(List[Dict]): All providersowner(str): Provider's blockchain addresshost_uri(str): Provider's host URIattributes(List[Dict]): Provider attributesemail(str, optional): Provider emailwebsite(str, optional): Provider website
active_providers(List[Dict]): Providers with endpointsinactive_providers(List[Dict]): Providers without endpointserror(str, optional): Error message if status is "error"
Example:
from akash import AkashClient
client = AkashClient("https://akash-rpc.polkachu.com:443")
providers = client.discovery.get_providers(limit=50, offset=0, count_total=True)
if providers["status"] == "success":
print(f"Total providers: {providers['total']}")
print(f"Active: {providers['summary']['active']}")
print(f"Inactive: {providers['summary']['inactive']}")
print("\nActive providers:")
for provider in providers["active_providers"][:5]:
print(f"- {provider['owner']}: {provider['host_uri']}")
for provider in providers["providers"][:3]:
print(f"\nProvider: {provider['owner']}")
if provider.get("host_uri"):
print(f"Endpoint: {provider['host_uri']}")
if provider.get("website"):
print(f"Website: {provider['website']}")
else:
print(f"Query failed: {providers['error']}")
get_client_info()¶
Get information about this SDK client and RPC compatibility.
Returns: Dict[str, Any] with the following fields:
status(str): Query status ("success" or "error")api_version(str): RPC API version (e.g., "v1beta2")sdk_name(str): SDK name ("akash-python-sdk")sdk_version(str): SDK versionpython_version(str): Python versionplatform(str): Operating system platformarchitecture(str): System architecturesupported_endpoints(List[str]): Supported endpoint typesrpc_endpoint(str): RPC endpoint URLerror(str, optional): Error message if status is "error"
Example:
from akash import AkashClient
client = AkashClient("https://akash-rpc.polkachu.com:443")
client_info = client.discovery.get_client_info()
if client_info["status"] == "success":
print(f"API Version: {client_info['api_version']}")
print(f"SDK Version: {client_info['sdk_version']}")
print(f"Python Version: {client_info['python_version']}")
print(f"Platform: {client_info['platform']}")
print(f"RPC Endpoint: {client_info['rpc_endpoint']}")
print(f"Supported Endpoints: {', '.join(client_info['supported_endpoints'])}")
else:
print(f"Failed to get client info: {client_info['error']}")