Discovery module¶
The discovery module provides functionality for discovering and monitoring Akash Network providers using a GRPC-first, HTTP fallback approach, along with network compatibility checks and service discovery.
DiscoveryClient class¶
from akash import AkashClient
client = AkashClient("https://akash-rpc.polkachu.com:443")
discovery = client.discovery
Provider discovery¶
get_provider_status()¶
Get provider status using GRPC-first approach with HTTP fallback.
Required parameters:
provider_uri
(str
): Provider's URI or blockchain address (e.g., 'provider.akash.network:8443' or 'akash1provider...')
Optional parameters:
use_https
(bool
, optional): Whether to use HTTPS for HTTP fallback (default: True)
Protocol selection:
- If
provider_uri
contains ":" or starts with "http", treats it as a direct endpoint URI - For direct endpoints: attempts gRPC on ports 8443/8444, then falls back to HTTP(S)
- If
provider_uri
is a blockchain address: Uses GRPC client with version detection and protocol selection - All connections use insecure SSL to handle self-signed certificates
Returns: Dict[str, Any]
with the following fields:
status
(str
): Operation status ("success" or "failed")provider_status
(Dict
): Provider status information (if successful)address
(str
): Provider addresscluster_public_hostname
(str
): Cluster hostnameavailable_resources
(Dict
): Available resourcescpu
(int
): Available CPU in millicoresmemory
(int
): Available memory in bytesephemeral_storage
(int
): Available storage in bytesgpu
(int
): Available GPU count
pending_resources
(Dict
): Pending resourcescpu
(int
): Pending CPU in millicoresmemory
(int
): Pending memory in bytesephemeral_storage
(int
): Pending storage in bytesgpu
(int
): Pending GPU count
provider
(str
): Provider URIcluster
(Dict
): Cluster informationleases
(Dict
): Lease informationactive
(int
): Number of active leases
inventory
(Dict
): Resource inventorycluster
(Dict
): Cluster resource informationcpu
(Dict
): CPU informationval
(int
): CPU value in millicores
memory
(Dict
): Memory informationval
(int
): Memory value in bytes
ephemeral_storage
(Dict
): Storage informationval
(int
): Storage value in bytes
gpu
(Dict
): GPU informationval
(int
): GPU count
nodes
(List[Dict]
, optional): Per-node resource details
reservations
(Dict
): Reserved resourcespending
(Dict
): Pending reservationsresources
(Dict
): Pending resource allocationscpu
(Dict
): Pending CPUval
(int
): CPU value in millicores
memory
(Dict
): Pending memoryval
(int
): Memory value in bytes
ephemeral_storage
(Dict
): Pending storageval
(int
): Storage value in bytes
gpu
(Dict
): Pending GPUval
(int
): GPU count
resources
(Dict
): Aggregated resource informationcpu
(int
): Total CPU in millicoresmemory
(int
): Total memory in bytesstorage
(int
): Total storage in bytesgpu
(int
): Total GPU count
bid_engine
(Dict
): Bid engine status (provider-specific data)manifest
(Dict
): Manifest information (provider-specific data)errors
(List
): Any errors reported by providertimestamp
(str
): Status timestamppublic_hostnames
(List[str]
): Public hostnamesmethod
(str
): Connection method used ("GRPC" or "HTTP")status
(str
): Provider status ("online")
error
(str
): Error message (if failed)
Example:
from akash import AkashClient
client = AkashClient("https://akash-rpc.polkachu.com:443")
status = client.discovery.get_provider_status("akash1provider123...")
status = client.discovery.get_provider_status("provider.akash.network:8443")
if status["status"] == "success":
provider_status = status["provider_status"]
print(f"Provider: {provider_status['provider']}")
print(f"Connection method: {provider_status['method']}")
print(f"Active leases: {provider_status['cluster']['leases']['active']}")
resources = provider_status['cluster']['resources']
print(f"available CPU: {resources['cpu']}")
print(f"available memory: {resources['memory']}")
print(f"available storage: {resources['storage']}")
print(f"available GPU: {resources['gpu']}")
else:
print(f"Failed to get status: {status['error']}")
get_providers_status()¶
Get status information for multiple providers.
Required parameters:
provider_uris
(List[str]
): List of provider URIs to check
Optional parameters:
use_https
(bool
, optional): Whether to use HTTPS for connections (default: True)
Returns: Dict[str, Any]
with the following fields:
status
(str
): Operation status ("success" or "failed")discovery_results
(Dict
): Status check results (if successful)total_providers
(int
): Total number of providers checkedsuccessful_connections
(int
): Number of successful connectionsfailed_connections
(int
): Number of failed connectionsproviders
(Dict
): Per-provider results keyed by provider URI[provider_uri]
(Dict
): Result for specific providerendpoint
(str
): Provider endpointaccessible
(bool
): Whether provider is accessiblestatus
(Dict
, optional): Provider status (if accessible, same structure asprovider_status
fromget_provider_status()
)error
(str
, optional): Error message (if not accessible)
summary
(Dict
): Aggregated summarytotal_resources
(Dict
): Total available resourcescpu
(int
): Total CPU in millicoresmemory
(int
): Total memory in bytesstorage
(int
): Total storage in bytesgpu
(int
): Total GPU count
total_leases
(int
): Total active leasesonline_providers
(int
): Number of online providers
error
(str
): Error message (if failed)
Example:
from akash import AkashClient
client = AkashClient("https://akash-rpc.polkachu.com:443")
providers_result = client.discovery.get_providers(limit=100)
provider_uris = []
if providers_result["status"] == "success":
provider_uris = [provider.get("host_uri") for provider in providers_result.get("active_providers", []) if provider.get("host_uri")]
result = client.discovery.get_providers_status(provider_uris)
if result["status"] == "success":
results = result["discovery_results"]
print(f"Checked {results['total_providers']} providers")
print(f"Successfully connected to {results['successful_connections']} providers")
print(f"Failed to connect to {results['failed_connections']} providers")
summary = results["summary"]
print(f"Total online providers: {summary['online_providers']}")
print(f"Total active leases: {summary['total_leases']}")
print(f"Total CPU available: {summary['total_resources']['cpu']}")
print(f"Total memory available: {summary['total_resources']['memory']}")
for provider_uri, provider_info in results["providers"].items():
if provider_info.get("accessible"):
print(f"✓ {provider_uri}: Online")
else:
print(f"✗ {provider_uri}: {provider_info.get('error', 'Offline')}")
specific_providers = [
"provider1.akash.network:8443",
"provider2.akash.network:8443"
]
result = client.discovery.get_providers_status(specific_providers)
get_provider_capabilities()¶
Get provider capabilities and inventory via gRPC-first with HTTP fallback.
Required parameters:
provider_uri
(str
): Provider's URI
Optional parameters:
use_https
(bool
, optional): Whether to use HTTPS (default: True)
Returns: Dict[str, Any]
with the following fields:
status
(str
): Operation status ("success" or "failed")capabilities
(Dict
): Provider capabilities (if successful)storage
(Dict
): Ephemeral storage capabilities from inventoryval
(int
): Storage value in bytes
cpu
(Dict
): CPU capabilities from inventoryval
(int
): CPU value in millicores
memory
(Dict
): Memory capabilities from inventoryval
(int
): Memory value in bytes
gpu
(Dict
): GPU capabilities from inventoryval
(int
): GPU count
error
(str
): Error message (if failed)
Example:
from akash import AkashClient
client = AkashClient("https://akash-rpc.polkachu.com:443")
caps = client.discovery.get_provider_capabilities("provider.akash.network:8443")
if caps["status"] == "success":
capabilities = caps["capabilities"]
print("Provider capabilities:")
print(f"CPU: {capabilities['cpu']['val']} millicores")
print(f"Memory: {capabilities['memory']['val']} bytes")
print(f"Storage: {capabilities['storage']['val']} bytes")
print(f"GPU: {capabilities['gpu']['val']} units")
get_provider_resources()¶
Get provider resource availability via gRPC-first with HTTP fallback.
Required parameters:
provider_uri
(str
): Provider's URI
Optional parameters:
use_https
(bool
, optional): Whether to use HTTPS (default: True)
Returns: Dict[str, Any]
with the following fields:
status
(str
): Operation status ("success" or "failed")resources
(Dict
):available
(Dict
): Available resourcescpu
(int
): Available CPU in millicores (aggregated from all cluster nodes)memory
(int
): Available memory in bytes (aggregated from all cluster nodes)ephemeral_storage
(int
): Available ephemeral storage in bytes (aggregated from all cluster nodes)gpu
(int
): Available GPU count (aggregated from all cluster nodes)
pending
(Dict
): Pending/reserved resourcescpu
(int
): Pending CPU in millicoresmemory
(int
): Pending memory in bytesephemeral_storage
(int
): Pending ephemeral storage in bytesgpu
(int
): Pending GPU count
error
(str
): Error message (if failed)
Example:
from akash import AkashClient
client = AkashClient("https://akash-rpc.polkachu.com:443")
resources = client.discovery.get_provider_resources("provider.akash.network:8443")
if resources["status"] == "success":
available = resources["resources"]["available"]
pending = resources["resources"]["pending"]
print("Available resources:")
print(f"CPU: {available['cpu']} millicores")
print(f"Memory: {available['memory']} bytes ({available['memory'] / (1024**3):.2f} GB)")
print(f"Ephemeral Storage: {available['ephemeral_storage']} bytes ({available['ephemeral_storage'] / (1024**3):.2f} GB)")
print(f"GPU: {available['gpu']} units")
print("Pending resources:")
print(f"CPU: {pending['cpu']} millicores")
print(f"Memory: {pending['memory']} bytes")
get_provider_capacity()¶
Check if provider has sufficient capacity for required resources via gRPC-first with HTTP fallback.
def get_provider_capacity(provider_uri: str, required_resources: Dict[str, Any], use_https: bool = True) -> Dict[str, Any]
Required parameters:
provider_uri
(str
): Provider's URIrequired_resources
(Dict[str, Any]
): Required resourcescpu
(str
): CPU requirement (supports "m" suffix for millicores, e.g., "100m" or "1")memory
(str
): Memory requirement (supports Ki/Mi/Gi suffixes, e.g., "512Mi")storage
(str
): Storage requirement (maps to ephemeral_storage, supports Ki/Mi/Gi suffixes)
Optional parameters:
use_https
(bool
): Whether to use HTTPS (default: True)
Note: The capacity check automatically handles unit conversions. CPU values are converted between cores and millicores (1 core = 1000m). Storage requests using "storage"
are automatically mapped to the provider's ephemeral_storage
resources.
Returns: Dict[str, Any]
with the following fields:
status
(str
): Operation status ("success" or "failed")has_capacity
(bool
): Whether provider has sufficient capacity (if successful)sufficient
(Dict
): Per-resource sufficiency checkcpu
(bool
): CPU sufficiencymemory
(bool
): Memory sufficiencystorage
(bool
): Storage sufficiency
error
(str
): Error message (if failed)
Example:
from akash import AkashClient
client = AkashClient("https://akash-rpc.polkachu.com:443")
required = {
"cpu": "1000m", # 1 CPU core (1000 millicores)
"memory": "1Gi", # 1GB RAM
"storage": "5Gi" # 5GB storage (maps to ephemeral_storage)
}
capacity = client.discovery.get_provider_capacity(
"provider.akash.network:8443",
required
)
if capacity["status"] == "success":
if capacity["has_capacity"]:
print("✓ Provider has sufficient capacity")
for resource, sufficient in capacity["sufficient"].items():
status = "✓" if sufficient else "✗"
print(f"{status} {resource}")
else:
print("✗ Provider does not have sufficient capacity")
for resource, sufficient in capacity["sufficient"].items():
if not sufficient:
print(f"Insufficient {resource}")
Queries¶
These methods query the blockchain for provider information.
get_providers()¶
Query all providers from the blockchain with categorization.
def get_providers(limit: Optional[int] = 100, offset: Optional[int] = None, count_total: bool = False) -> Dict[str, Any]
Optional parameters:
limit
(int
): Maximum number of providers to return (default: 100)offset
(int
): Number of results to skip (default: None)count_total
(bool
): Include total count in response (default: False)
Returns: Dict[str, Any]
with the following fields:
status
(str
): Query status ("success" or "error")total
(int
): Total number of providerssummary
(Dict
): Summary statisticsactive
(int
): Providers with endpointsinactive
(int
): Providers without endpoints
providers
(List[Dict]
): All providersowner
(str
): Provider's blockchain addresshost_uri
(str
): Provider's host URIattributes
(List[Dict]
): Provider attributesemail
(str
, optional): Provider emailwebsite
(str
, optional): Provider website
active_providers
(List[Dict]
): Providers with endpointsinactive_providers
(List[Dict]
): Providers without endpointserror
(str
, optional): Error message if status is "error"
Example:
from akash import AkashClient
client = AkashClient("https://akash-rpc.polkachu.com:443")
providers = client.discovery.get_providers(limit=50, offset=0, count_total=True)
if providers["status"] == "success":
print(f"Total providers: {providers['total']}")
print(f"Active: {providers['summary']['active']}")
print(f"Inactive: {providers['summary']['inactive']}")
print("\nActive providers:")
for provider in providers["active_providers"][:5]:
print(f"- {provider['owner']}: {provider['host_uri']}")
for provider in providers["providers"][:3]:
print(f"\nProvider: {provider['owner']}")
if provider.get("host_uri"):
print(f"Endpoint: {provider['host_uri']}")
if provider.get("website"):
print(f"Website: {provider['website']}")
else:
print(f"Query failed: {providers['error']}")
get_client_info()¶
Get information about this SDK client and RPC compatibility.
Returns: Dict[str, Any]
with the following fields:
status
(str
): Query status ("success" or "error")api_version
(str
): RPC API version (e.g., "v1beta2")sdk_name
(str
): SDK name ("akash-python-sdk")sdk_version
(str
): SDK versionpython_version
(str
): Python versionplatform
(str
): Operating system platformarchitecture
(str
): System architecturesupported_endpoints
(List[str]
): Supported endpoint typesrpc_endpoint
(str
): RPC endpoint URLerror
(str
, optional): Error message if status is "error"
Example:
from akash import AkashClient
client = AkashClient("https://akash-rpc.polkachu.com:443")
client_info = client.discovery.get_client_info()
if client_info["status"] == "success":
print(f"API Version: {client_info['api_version']}")
print(f"SDK Version: {client_info['sdk_version']}")
print(f"Python Version: {client_info['python_version']}")
print(f"Platform: {client_info['platform']}")
print(f"RPC Endpoint: {client_info['rpc_endpoint']}")
print(f"Supported Endpoints: {', '.join(client_info['supported_endpoints'])}")
else:
print(f"Failed to get client info: {client_info['error']}")