Remember that Python script I shared with you all a couple of days ago? The one that fetches liquidity pools and makes you feel like a wizard commanding the blockchain seas? Well, I've been tinkering, and now it's even better!
What's New?

An unrelated photo of a Redwing,
shot by myself.
The updated version of the script now includes a JSON file that holds a list of Hive-Engine nodes to cycle through, making the script more resilient. The idea was to have a robust list of nodes, but... well, let's just say I hit a few bumps along the road. Currently, we only have two functioning hive-engine.com servers on the list.
But hey, two is better than none, right? 😅
Running the Improved Script
You can still run the script like before:
python fetch_liquidity_pools.py accountname TOKEN(:OTHERTOKEN)
Or if you're just feeling a bit more specific:
python fetch_liquidity_pools.py hive-engine ETH
The Enhanced Features:
- Node Switching: The script now loads a list of nodes from a JSON file (
nodes.json
) and cycles through them, selecting one randomly to handle your request. This helps ensure that if one node is down or slow, another can step in. - Caching Token Details: We now store token details in a local cache (
token_details_cache.json
). This prevents repeatedly fetching the same data, saving time and network bandwidth. - More Error Handling: Because we all know that dealing with APIs can be like herding cats, more robust error handling has been added. Now, you'll get better feedback on what went wrong, with retry mechanisms to recover from temporary issues. (You can uncomment the debugging statements to see a whole lot more what's happening behind the scenes!)
Example Output
To remind you of the magic, here's what the output looks like when fetching positions with the ETH token:
Liquidity Pool Positions with ETH token:
Token Pair: SWAP.HIVE:SWAP.ETH | Base Balance: 31206.634533 SWAP.HIVE | Quote Balance: 2.242819 SWAP.ETH
Token Pair: BEE:SWAP.ETH | Base Balance: 42215.817136 BEE | Quote Balance: 1.422489 SWAP.ETH
Token Pair: SWAP.BTC:SWAP.ETH | Base Balance: 0.007309 SWAP.BTC | Quote Balance: 0.169925 SWAP.ETH
Token Pair: SWAP.ETH:SWAP.USDT | Base Balance: 0.151195 SWAP.ETH | Quote Balance: 351.954867 SWAP.USDT
Token Pair: SWAP.ETH:SPS | Base Balance: 0.033753 SWAP.ETH | Quote Balance: 11164.970135 SPS
As easily, you can call it from your own Python scripts by importing it...
Using the script like a Pro
If you love incorporating ready-made tools (or just can't resist tinkering), you can do that too!
Here's how:
Just add this line to your script to access to the functions:
from fetch_liquidity_pools import get_filtered_pools
Once you’ve imported the script, you can call the get_filtered_pools function with your own parameters:
# Import the necessary function from your script
from fetch_liquidity_pools import get_filtered_pools
def sort_pools_by_balance(account_name, filter_token):
# Fetch the liquidity pools
pools = get_filtered_pools(account_name, filter_token)
# Sort the pools by base balance in descending order
sorted_pools = sorted(pools, key=lambda x: x['base_balance'], reverse=True)
print(f"\n--- Sorted Liquidity Pools for {filter_token.upper()} ---")
for pool in sorted_pools:
print(f"Token Pair: {pool['token_pair']} | Base Balance: {pool['base_balance']:.{pool['base_precision']}f} {pool['base_symbol']} | "
f"Quote Balance: {pool['quote_balance']:.{pool['quote_precision']}f} {pool['quote_symbol']}")
if __name__ == "__main__":
# Example usage with your Hive account and desired token filter
account_name = 'your-hive-account'
filter_token = 'ETH' # Replace with the token you want to filter by
sort_pools_by_balance(account_name, filter_token)
Why Import?
By importing the script into your own programs, you can:
- Sort the pools (see the example)
- Automate repetitive tasks by fetching and filtering liquidity pools programmatically.
- Integrate liquidity data into larger trading or analysis pipelines.
- Customize the logic further to suit your specific needs.
💡A Few Tips:
Make sure the fetch_liquidity_pools.py
script is in the same directory as your script or in a directory that’s part of your Python path.
Don’t forget to have the nodes.json
and token_details_cache.json
files in the same directory as well, so the script can find them!
These are the contents of the nodes.json
file:
[
"https://api2.hive-engine.com/rpc/contracts",
"https://api.hive-engine.com/rpc/contracts"
]
The Updated Script
Now, let's get to the juicy bits. Here's the improved script with all the bells and whistles:
# fetch_liquidity_pools.py
import json
import os
import argparse
import requests
from time import sleep
from random import choice
# Hive-Engine API Nodes
NODES_FILE = 'nodes.json'
retry_delay = 5 # seconds to wait between retries
max_retries = 3 # Maximum number of retries
# Default values
DEFAULT_ACCOUNT_NAME = 'hive-engine' # Replace with your actual Hive account name
DEFAULT_FILTER_TOKEN = 'BTC' # Replace with the desired default token to filter, or use 'ALL' to list all tokens
# File to store token details with precision
TOKEN_CACHE_FILE = 'token_details_cache.json'
cached_token_details = {}
hive_engine_nodes = []
def load_nodes():
global hive_engine_nodes
# Check if the nodes file exists
if os.path.exists(NODES_FILE):
try:
with open(NODES_FILE, 'r') as f:
hive_engine_nodes = json.load(f)
print("Loaded Hive-Engine nodes from file.")
except (ValueError, IOError):
print("Error: Hive-Engine nodes file is corrupted or not readable. Please re-create 'nodes.json' with the list of nodes.")
else:
print("Error: Hive-Engine nodes file not found. Please create 'nodes.json' with the list of nodes.")
hive_engine_nodes = [] # Ensure nodes list is empty on error
def get_node():
# Choose a random node from the list
if hive_engine_nodes:
selected_node = choice(hive_engine_nodes)
print(f"Using Hive-Engine node: {selected_node}") # Print the current node
return selected_node
else:
print("Error: No Hive-Engine nodes available.")
return None
def load_token_cache():
global cached_token_details
# Check if the token cache file exists
if os.path.exists(TOKEN_CACHE_FILE):
try:
with open(TOKEN_CACHE_FILE, 'r') as f:
cached_token_details = json.load(f)
print("Loaded cached token details from file.")
except (ValueError, IOError):
print("Error: Failed to load token cache file. Starting with an empty cache.")
def save_token_cache():
# Save the current token details cache to a file
try:
with open(TOKEN_CACHE_FILE, 'w') as f:
json.dump(cached_token_details, f)
print("Saved token details to cache file.")
except IOError:
print("Error: Failed to save token cache file.")
def fetch_token_details(symbol):
# Check if token details are already cached
if symbol in cached_token_details:
print(f"Token details for {symbol} found in cache.")
return cached_token_details[symbol]
print (f"Fetching token details for {symbol}...")
# Fetch token details for the given symbol
for attempt in range(max_retries):
url = get_node()
if not url:
return {}
payload = {
"jsonrpc": "2.0",
"id": 1,
"method": "find",
"params": {
"contract": "tokens",
"table": "tokens",
"query": {"symbol": symbol},
"limit": 1
}
}
response = requests.post(url, json=payload)
print(f"Attempt {attempt+1}: Status Code: {response.status_code}, Response: {response.text}")
if response.status_code == 200:
try:
data = response.json()
except ValueError:
print("Error: Failed to parse JSON response.")
return {}
if 'result' in data and data['result']:
cached_token_details[symbol] = data['result'][0] # Cache the token details
save_token_cache() # Save cache after updating
return data['result'][0]
print(f"Error: Failed to fetch token details for {symbol}. Status Code: {response.status_code}")
if attempt < max_retries - 1:
sleep(retry_delay)
else:
print(f"Max retries exceeded for {symbol}. Skipping.")
return {}
def fetch_pool_details(token_pair):
# Fetch details of the specified liquidity pool
for attempt in range(max_retries):
url = get_node()
if not url:
print("Error: No node URL available, exiting fetch_pool_details.")
return {}
payload = {
"jsonrpc": "2.0",
"id": 1,
"method": "find",
"params": {
"contract": "marketpools",
"table": "pools",
"query": {"tokenPair": token_pair},
"limit": 1
}
}
# print(f"Attempt {attempt + 1} to fetch pool details for {token_pair} from {url}...") # Debugging statement
try:
response = requests.post(url, json=payload, timeout=10) # Set a timeout for the request
print(f"Received response status code: {response.status_code} for {token_pair} from {url}")
if response.status_code == 200:
try:
data = response.json()
# print(f"Data received for {token_pair}: {data}") # Debugging the received data
if 'result' in data and data['result']:
print(f"Successfully fetched pool details for {token_pair}")
return data['result'][0]
else:
print(f"Unexpected response format or empty result for {token_pair} from {url}: {data}")
except ValueError as e:
print("Error: Failed to parse JSON response.")
print(f"Response content: {response.text}") # Print the actual response content
else:
print(f"Error: Failed to fetch pool details for {token_pair}. Status Code: {response.status_code}")
except requests.exceptions.RequestException as e:
print(f"Request exception occurred for {token_pair} from {url}: {e}")
# Handle retries
if attempt < max_retries - 1:
print(f"Retrying after {retry_delay} seconds...")
sleep(retry_delay)
else:
print(f"Max retries exceeded for {token_pair}. Skipping to next.")
print(f"Returning empty details for {token_pair} after all attempts.")
return {}
def fetch_liquidity_positions(account_name):
# Fetch liquidity positions for the given account
url = get_node()
if not url:
return {}
payload = {
"jsonrpc": "2.0",
"id": 1,
"method": "find",
"params": {
"contract": "marketpools",
"table": "liquidityPositions",
"query": {"account": account_name},
"limit": 1000
}
}
response = requests.post(url, json=payload)
# print("Response Status Code: ", response.status_code)
# print("Response Content: ", response.text) # Debug
if response.status_code != 200:
print(f"Error: Failed to fetch data. Status Code: {response.status_code}")
return []
try:
data = response.json()
except ValueError:
print("Error: Failed to parse JSON response.")
return []
return data.get('result', [])
def get_filtered_pools(account_name, filter_token):
# Get and filter pools by the specified token
positions = fetch_liquidity_positions(account_name)
# Debug: Check fetched positions
print(f"Fetched {len(positions)} liquidity positions for account {account_name}.")
if not positions:
print("No liquidity positions found for this account.")
return []
filtered_pools = []
for position in positions:
token_pair = position.get('tokenPair', 'Unknown')
# Debug: Print each position being processed
print(f"Processing position: {position}")
# If filter_token is 'ALL', skip filtering; otherwise, check for the token in the pair
if filter_token.upper() != 'ALL' and filter_token.upper() not in token_pair.upper():
print(f"Skipping position {token_pair} as it does not match filter token {filter_token.upper()}")
continue
# Additional debug to see which positions pass the filter
print(f"Including position {token_pair} with filter token {filter_token.upper()}")
# Fetch balances and calculate user share
shares = float(position.get('shares', '0'))
pool_details = fetch_pool_details(token_pair)
if not pool_details:
continue
total_shares = float(pool_details.get('totalShares', '0'))
base_quantity = float(pool_details.get('baseQuantity', '0'))
quote_quantity = float(pool_details.get('quoteQuantity', '0'))
if total_shares == 0:
print(f"Skipping position {token_pair} due to total shares being 0.")
continue
# Calculate user balances
user_base_balance = (shares / total_shares) * base_quantity
user_quote_balance = (shares / total_shares) * quote_quantity
if ':' in token_pair:
base_symbol, quote_symbol = token_pair.split(':')
else:
base_symbol, quote_symbol = "Unknown", "Unknown"
# Fetch token details to get precision
base_token_details = fetch_token_details(base_symbol)
quote_token_details = fetch_token_details(quote_symbol)
base_precision = base_token_details.get('precision', 0)
quote_precision = quote_token_details.get('precision', 0)
filtered_pools.append({
"token_pair": token_pair,
"base_symbol": base_symbol,
"quote_symbol": quote_symbol,
"base_balance": user_base_balance,
"quote_balance": user_quote_balance,
"base_precision": base_precision,
"quote_precision": quote_precision
})
# Debug: Print the number of filtered pools
# print(f"Number of filtered pools: {len(filtered_pools)}")
return filtered_pools
def main(account_name, filter_token):
# Load nodes from the external file
load_nodes()
# Load cached token details
load_token_cache()
# Fetch and print filtered pools
pools = get_filtered_pools(account_name, filter_token)
print(f"\nLiquidity Pool Positions with {filter_token.upper()} token:")
for pool in pools:
print(f"Token Pair: {pool['token_pair']} | Base Balance: {pool['base_balance']:.{pool['base_precision']}f} {pool['base_symbol']} | "
f"Quote Balance: {pool['quote_balance']:.{pool['quote_precision']}f} {pool['quote_symbol']}")
# Debug: If no pools were printed
# if not pools:
# print("No matching liquidity pools found for the given filter.")
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='Fetch Hive-Engine liquidity pools.')
parser.add_argument('account_name', nargs='?', default=DEFAULT_ACCOUNT_NAME, help='Hive account name to fetch liquidity pools for.')
parser.add_argument('filter_token', nargs='?', default=DEFAULT_FILTER_TOKEN, help="Token to filter by, or 'ALL' to list all tokens.")
args = parser.parse_args()
main(args.account_name, args.filter_token)
What's Next?
I'd love to hear your thoughts on this update! Is it working for you, or is it just causing new headaches? Any nodes to add to the list or tips on getting those stubborn servers to talk? Let me know in the comments!
Keep experimenting, and may your code always run without errors! Cheers!