Working with Concurrency
This page provides some tips for working with concurrent async functions in Bittensor.
This documentation is built against Bittensor Python SDK v10. For documentation built against SDK 9.12, see SDK 9.12 Archive.
Calls to the blockchain can be slow, and routines that make many calls in series become very slow. For example, suppose we want to check a list of UIDS for subnets and see if they exist. In series, we could execute the following, but it will take longer in proportion to the list of netuids, since it makes a separate call for each.
from bittensor.core.subtensor import Subtensor
subtensor = Subtensor("test")
for netuid in range(1, 8):
print("subnet " + str(netuid) + " exists: " + str(subtensor.subnet_exists(netuid=netuid)))
To avoid this, we could create a separate thread with a new Subtensor object on each thread, as below. This is faster, but can cause problems by hogging as many web sockets as UIDS in the list of subnets to check.
import asyncio
from concurrent.futures import ThreadPoolExecutor
from bittensor.core.subtensor import Subtensor
def subnet_exists(netuid: int):
subtensor = Subtensor("test")
result = subtensor.subnet_exists(netuid=netuid)
subtensor.close()
print("subnet " + str(netuid) + " exists: " + str(result) )
with ThreadPoolExecutor() as executor:
subnets = executor.map(subnet_exists, range(1, 8))
Using Python’s asyncio (Asynchronous Input/Output) module, the code below accomplishes concurrent requests with a single websocket connection:
from bittensor.core.async_subtensor import AsyncSubtensor
import asyncio
async def main():
async with AsyncSubtensor("test") as subtensor:
block_hash = await subtensor.get_block_hash()
subnets = await asyncio.gather(*[subtensor.subnet_exists(netuid, block_hash=block_hash) for netuid in range(1, 8)])
for i, subnet in enumerate(subnets):
print("subnet " + str(1+i) + " exists: " + str(subnet))
asyncio.run(main())
Coroutine vs function
The usage of async def creates an asyncio coroutine rather than a function.
Coroutines differ from functions in that coroutines are run in event loops using await.
Coroutines always need to be awaited, and generally speaking, “asyncio objects” are instantiated with async with. See Python documentation on asyncio for a comprehensive section with examples.
AsyncSubstrateInterface
AsyncSubstrateInterface is an asyncio rewrite of Polkadot's original py-substrate-interface library, with a few added functionalities such as using the bt_decode for most SCALE decoding.
While AsyncSubstrateInterface is about 90% API compatible with py-substrate-interface, it is its own library. Similar to how we initialize aiohttp.ClientSession as shown in line 21 of the above example, the AsyncSubstrateInterface is initialized in the same way, as shown below:
async with AsyncSubstrateInterface(chain_endpoint) as substrate:
block_hash = await substrate.get_chain_head()
Generally speaking, you should use AsyncSubstrateInterface as a part of AsyncSubtensor, as the methods there are more specifically designed around using it within the Bittensor ecosystem.
AsyncSubtensor
AsyncSubtensor is the asyncio version of the Subtensor class. Under the hood, it utilises AsyncSubstrateInterface.
AsyncSubtensor vs Subtensor
Major differences between AsyncSubtensor and Subtensor are:
-
AsyncSubtensorusesblock_hashargs rather thanblockargs. -
AsyncSubtensoruses thereuse_blockarg. -
The usage of
block_hashis pretty obvious, i.e., you can specify the block hash (str) rather than the block number (int). -
The usage of
reuse_blockis a bit different. It allows you to reuse the same block hash that you have used for a previous call. -
Finally, the remaining major change is the use of args in the below
AsyncSubtensormethods:-
get_balance -
get_total_stake_for_coldkey -
get_total_stake_for_hotkeyThese methods now accept multiple SS58 addresses as addresses, and return a dictionary mapping the addresses to their balances.
-
Initializing AsyncSubtensor
Like AsyncSubstrateInterface, the AsyncSubtensor is initialized the same way (with an async context manager):
from bittensor.core.async_subtensor import AsyncSubtensor
async def main():
async with AsyncSubtensor("test") as subtensor:
neurons = await subtensor.neurons_lite(1)
And because AsyncSubtensor uses AsyncSubstrateInterface under the hood, we can use this to our advantage in asyncio gathering:
import asyncio
import time
from bittensor.core.async_subtensor import AsyncSubtensor
async def main():
async with AsyncSubtensor(network="test") as subtensor:
start = time.time()
total_subnets = await subtensor.get_total_subnets()
neurons = await asyncio.gather(*[
subtensor.neurons(netuid=x)
for x in range(1, total_subnets + 1)
])
print(time.time() - start)
asyncio.run(main())
The above code example pulls all the neurons from the testnet. Compare this to the below sync version of the same code:
import time
from bittensor.core.subtensor import Subtensor
def sync_main():
subtensor = Subtensor(network="test")
start = time.time()
block = subtensor.block
total_subnets = subtensor.get_total_subnets(block)
neurons = [subtensor.neurons(netuid=x, block=block) for x in range(1, total_subnets + 1)]
print(time.time() - start)
sync_main()
Performance
On a high-latency South African Internet connection:
- The
asyncversion runs in 13.02 seconds. - The sync version runs in 1566.86 seconds.
Hence the
asyncversion runs a full 120X faster. Your results will vary depending on your connection latency.
Note that there is a bit of overhead with the async instantiation over the sync version. Therefore, if you are writing a script to maybe retrieve one item, it will likely be faster and less complex using the sync version. However, if you are building anything more complex than this, the async version will likely be faster, as shown in the above example.
Example
Below is an example of how you can use the AsyncSubtensor module to retrieve balances from multiple coldkey SS58 addresses in various ways:
Also see the Bittensor SDK reference for AsyncSubtensor.
import asyncio
from bittensor.core.subtensor import Subtensor # sync
from bittensor.core.async_subtensor import AsyncSubtensor # async
from bittensor.utils.balance import Balance
COLDKEY_PUB = "5EhCvSxpFRgXRCaN5LH2wRCD5su1vKsnVfYfjzkqfmPoCy2G"
COLDKEY_PUBS = [
COLDKEY_PUB,
"5CZrQzo3W6LGEopMw2zVMugPcwFBmQDYne3TJc9XzZbTX2WR",
"5Dcmx3kNTKqExHoineVpBJ6HnD9JHApRs8y2GFBgPLCaYn8d",
"5DZaBZKKGZBGaevi42bYUK44tEuS3SYJ7GU3rQKYr7kjfa8v"
]
async def main(): # define a coroutine with `async def`
sync_sub = Subtensor(network="finney")
async with AsyncSubtensor(network="finney") as async_subtensor:
sync_balance: Balance = sync_sub.get_balance(COLDKEY_PUB)
print(f"Sync balance: {sync_balance}")
async_balance: dict[str, Balance] = await async_subtensor.get_balance(COLDKEY_PUB)
print(f"Async balance: {async_balance}")
async_balances: dict[str, Balance] = await async_subtensor.get_balances(*COLDKEY_PUBS)
print(f"Async multiple balances: {async_balances}")
sync_balances: dict[str, Balance] = {}
for coldkey in COLDKEY_PUBS:
sync_balances[coldkey] = sync_sub.get_balance(coldkey)
print(f"Sync multiple balances: {sync_balances}")
async_delegated, async_balance = await asyncio.gather(
async_subtensor.get_delegated(COLDKEY_PUB),
async_subtensor.get_balance(COLDKEY_PUB)
)
print(f"Delegated: {async_delegated}")
print(f"Balance: {async_balance}")
# This will make concurrent calls to retrieve the delegates and balance of this coldkey
# We can even chain these together quite dramatically, such as this example in btcli wallets:
"""
async def main():
async with AsyncSubtensor(network="finney") as subtensor:
# Get current block hash for consistency
block_hash = await subtensor.get_block_hash()
# Get total subnets to query
total_subnets = await subtensor.get_total_subnets(block_hash=block_hash)
all_netuids = list(range(1, min(total_subnets + 1, 10))) # Limit to first 10 for testing
# Example wallet addresses (in real usage these would come from wallet objects)
wallets_with_ckp_file = COLDKEY_PUBS # Using the addresses directly
# Concurrent batch fetch
balances, all_neurons, all_delegates = await asyncio.gather(
subtensor.get_balances( # Fixed: use get_balances for multiple addresses
*wallets_with_ckp_file,
block_hash=block_hash,
),
asyncio.gather(
*[
subtensor.neurons_lite(netuid=netuid, block_hash=block_hash)
for netuid in all_netuids
]
),
asyncio.gather(
*[
subtensor.get_delegated(addr)
for addr in wallets_with_ckp_file
]
),
)
# Print results
print(f"Balances: {balances}")
print(f"Neurons from {len(all_neurons)} subnets")
print(f"Delegates: {len(all_delegates)} results")
"""
# There are also certain changes for the decoding of SCALE objects from the chain.
# As a rule of thumb, anything using `.value` from sync Subtensor just returns the value itself
# See the example of `Subtensor._get_hyperparameter` vs `AsyncSubtensor.get_hyperparameter`:
# Subtensor
"""
result = self.query_subtensor(param_name, block, [netuid])
if result is None or not hasattr(result, "value"):
return None
return result.value
"""
# AsyncSubtensor
"""
result = await self.substrate.query(
module="SubtensorModule",
storage_function=param_name,
params=[netuid],
block_hash=block_hash,
reuse_block_hash=reuse_block,
)
if result is None:
return None
return result
"""
if __name__ == "__main__":
asyncio.run(main()) # coroutines need to have something to run them, usually `asyncio.run`