Liquidity balancer (My little Python project)

in #hive2 months ago (edited)

I've been mulling over the posting of my new and improved 'fetch_liquidity_pools.py' script, along with a Liquidity Balancer I also wrote. However, I think now is the time to reveal my real progress.

There were several critical issues with my main script that kept me from releasing it, and even the latest update on the 'fetch_liquidity_pools.py' script was a bit misplaced, because soon after posting, I discovered a whole new batch of problems I had previously missed, and had to start tackling them one-by-one...

Key fixes and improvements:

  1. Multiple requests
    One of the issues I faced was that the main script was calling the fetcher for each token pair individually. Instead of fetching all the necessary liquidity information at once, the script was making multiple API calls for each pair. This not only slowed down performance but also unnecessarily overloaded the API.

    The solution was simply to modify the fetcher so that it now accepts a list of pairs, allowing it to gather all relevant token details in a single API call. This change significantly reduced the number of requests and improved the speed of both of the scripts.
    # Extract all token pairs from the configuration file
    token_pairs = [pair_info["pair"] for pair_info in pairs]

    # Fetch liquidity information for all pairs at once (optimizes API calls)
    pools = get_filtered_pools(account_name, token_pairs, session)
  1. Token cache initialization
    The second major issue was related to the token cache not being initialized when the fetcher was called from another script. This caused the script to fetch token details repeatedly, even if they were supposed to be already cached. Together with the Multiple Requests bug, it resulted in servers refusing to cooperate, and a plenty of connection errors.

    After realizing this, I moved the load_token_cache() function nearer to the top of the fetcher script, right after the declared functions, ensuring the cache is always loaded whether the script is run in standalone mode or when called by another script. This fix helped reduce unnecessary API calls and improved the overall performance.
  1. Duplicate node usage:
    The script was selecting Hive-Engine nodes multiple times, printing "Using Hive-Engine node" twice in standalone mode. This issue was caused by selecting nodes in more places than necessary. I fixed this by setting the node globally to avoid unnecessary selections.

""" Initialize the nodes and token cache """
load_nodes()
load_token_cache()
  1. Sessions and Streamlining API Calls:
    Initially, the script was making multiple calls to the API server without reusing the same connection. Implementing requests.Session() allowed me to keep the connection open across multiple requests, saving both time and resources. I also modified the main script to pass the session object to the fetcher so it could be used consistently throughout all requests.
# Create the session
with requests.Session() as session:
    balance_liquidity(
    account_name, private_active_key, pairs_to_balance, session
)

  1. Reorganizing the code:
    During this process, I also cleaned up the code structure. I moved functions like load_nodes() and load_token_cache() outside of the main() block to make sure they were always initialized and ready for action.

  2. Improved debugging:
    Throughout the debugging process, I added better error handling and more informative print statements to track down issues, particularly when dealing with the token cache and API responses.

  3. Balancing logic and erratic behavior:
    The Liquidity Balancer script initially showed erratic behavior — it wasn’t properly balancing liquidity pairs, and I couldn’t quite pinpoint the cause. Because of this, I held off on releasing the script until I could address these issues. After debugging and improving the core functionality, I finally managed to get the balancer working as expected, ensuring it adds or removes liquidity based on predefined thresholds.

    Most of these issues were external to the script, running out of reserve tokens and things like that. I will eventually need to make some additional changes to the balancer to iron them all out.

  4. Operation time check in the balancer:
    I "stole" this idea from @Arc7icWolf. (Thanks!)
    I had been using the Unix time command to measure the speed efficiency of my script, while he had come up with a Python solution to timing it.
    It's still true, I am lazy and still use the time command most of the time, but I just had to implement the timing operation in the balancer to see how long each balancing run takes. I love fancy output.


What does the LiquidityBalancer script actually do?

The LiquidityBalancer.py script is designed to automatically manage liquidity positions for a Hive-Engine account. It works by continuously monitoring the current balances in liquidity pools and making adjustments — either adding or removing liquidity — based on predefined target balances for each token pair. This is to ensure that the pools stay within healthy thresholds, maintaining optimal liquidity levels without manual intervention. (It can also empty your reserves if you are not careful.)

How it works:

  • The script loads a configuration file, which defines the target token balance for each liquidity pair.
  • It then fetches the current liquidity positions for the account from Hive-Engine and compares the actual balance to the target balance.
  • If the balance is above the target, the script removes excess liquidity. If the balance is below the target, it adds liquidity to match the desired level.
  • This process is repeated at random intervals (between 5 to 15 minutes) to ensure the liquidity remains balanced over time.

How to use the LiquidityBalancer script:

Here be dragons, take heed, and proceed at your own peril!

This script will not make you rich, but may eat your tokens AND your children, so be careful, please read it through and understand it before running it.

This means I will take no responsibility for any losses that may occur.

  1. Prepare the configuration file:
    You’ll need a JSON file for each account you want to balance, containing a list of token pairs and their target balances. Here’s an example of what the file might look like:

    [
        {
            "pair": "SWAP.BTC:SWAP.ETH",
            "target_balance": "0.5",
            "target_token": "SWAP.ETH",
            "threshold": "0.01"
        },
        {
            "pair": "DEC:SPS",
            "target_balance": "1000",
            "target_token": "DEC",
            "threshold": "5"
        }
    ]
    

    Each entry contains:

    • pair: The liquidity pool token pair to balance.
    • target_balance: The target balance you want to maintain for the target token.
    • target_token: The token within the pair to balance.
    • threshold: The range within which you’re willing to tolerate imbalance before the script takes action.
  2. Store your private active key:
    The script also needs access to your private active key in order to execute the add or remove liquidity actions on your behalf. You should store this in a .txt file named after your account, like so: your_account.private_active_key.txt.

  3. Run the script:
    Once the configuration file and private key are ready, you can run the script as follows:

    python LiquidityBalancer.py your_account_name
    

    The script will read the configuration file (e.g., your_account.pools_to_balance.json), fetch your current liquidity positions, and adjust them based on the target balances you defined. It will continue running in a loop, checking and balancing the liquidity at random intervals.

  4. Error handling and debugging:
    If the script encounters any issues (e.g., network failures, API errors, etc.), it has built-in error handling to retry the operations and log any failed attempts. You’ll also see informative messages about what actions are being taken, like adding or removing liquidity, or any errors that occur during the process.

    For more detailed output, uncomment the debug print statements.


Liquidity Balancer and its companion script

I almost forgot. Here are the scripts themselves:

LiquidityBalancer.py:

# LiquidityBalancer.py

import time
import json
import argparse
import random
from decimal import Decimal, ROUND_DOWN
from beem import Hive
from beem.transactionbuilder import TransactionBuilder
from beembase.operations import Custom_json
from fetch_liquidity_pools import get_filtered_pools, load_nodes, get_node
import requests  # For the session support

# Example usage:
# 'python LiquidityBalancer.py accountname'

# Configuration


class Colours:
    GREEN = "\033[92m"
    ORANGE = "\033[38;5;208m"
    CYAN = "\033[96m"
    RESET = "\033[0m"
    BLUE = "\033[94m"
    YELLOW = "\033[93m"
    MAGENTA = "\033[95m"
    WHITE = "\033[97m"

    RESET = "\033[0m"


# Function to print coloured messages to the terminal
def print_coloured(message, colour):
    print(f"{colour}{message}{Colours.RESET}")


def read_private_key(file_path):
    """Read the private key from a file."""
    try:
        with open(file_path, "r") as key_file:
            private_key = key_file.read().strip()
        print(f"Private key loaded from {file_path}")
        return private_key
    except Exception as e:
        print(f"Error: {e}")
        exit(1)


# Function to add liquidity to a token pair on Hive-Engine
def add_liquidity(
    account_name, private_active_key, token_pair, base_quantity, quote_quantity
):
    """Add liquidity to a specific token pair."""
    # Extract base and quote tokens from the token pair
    base_token, quote_token = token_pair.split(":")

    # Notify about the liquidity being added
    print_coloured(
        f"Adding liquidity for {token_pair} with baseQuantity: {base_quantity} {base_token} and quoteQuantity: {quote_quantity} {quote_token}",
        # f"Adding liquidity for {token_pair} with baseQuantity: {base_quantity:.4f} {base_token} and quoteQuantity: {quote_quantity:.4f} {quote_token}",
        Colours.GREEN,
    )

    # Use the Hive API to broadcast a transaction on Hive
    hive = Hive(keys=[private_active_key], node="https://api.hive.blog")

    # Payload for Hive-Engine addLiiquidity action
    payload = {
        "contractName": "marketpools",
        "contractAction": "addLiquidity",
        "contractPayload": {
            "tokenPair": token_pair,
            "baseQuantity": f"{base_quantity}",
            "quoteQuantity": f"{quote_quantity}",
        },
    }

    # Build and sign the transaction
    custom_json = Custom_json(
        id="ssc-mainnet-hive",
        required_auths=[account_name],
        required_posting_auths=[],
        json=json.dumps(payload),
    )

    # Broadcast the transaction on the Hive blockchain
    tx = TransactionBuilder(hive_instance=hive)
    tx.appendOps(custom_json)
    tx.appendWif(private_active_key)
    tx.sign()
    tx_id = tx.broadcast()
    print(
        f"Liquidity addition transaction broadcasted for {token_pair} with ID: {tx_id}"
    )


# Function to remove liquidity from a token pair
def remove_liquidity(account_name, private_active_key, token_pair, percentage):
    """Remove liquidity from a specific token pair."""
    print_coloured(
        f"Removing liquidity from {token_pair} with sharesOut: {percentage:.3f}",
        Colours.ORANGE,
    )
    hive = Hive(keys=[private_active_key], node="https://api.hive.blog")

    # Payload for Hive-Engine removeLiquidity action
    payload = {
        "contractName": "marketpools",
        "contractAction": "removeLiquidity",
        "contractPayload": {"tokenPair": token_pair, "sharesOut": f"{percentage:.3f}"},
    }

    # Build and sign the transaction
    custom_json = Custom_json(
        id="ssc-mainnet-hive",
        required_auths=[account_name],
        required_posting_auths=[],
        json=json.dumps(payload),
    )

    # Broadcast the transaction on the Hive blockchain
    tx = TransactionBuilder(hive_instance=hive)
    tx.appendOps(custom_json)
    tx.appendWif(private_active_key)
    tx.sign()
    tx_id = tx.broadcast()
    print(
        f"Liquidity removal transaction broadcasted for {token_pair} with ID: {tx_id}"
    )


# Function to load the configuration (pools to balance) from a JSON file
def load_config(file_path):
    """Load the pool configuration from a JSON file."""
    try:
        with open(file_path, "r") as f:
            config = json.load(f)
            print(f"Loaded configuration from {file_path}")
            return config
    except Exception as e:
        print(f"Error loading configuration: {e}")
        exit(1)


# Main function to balance liquidity across multiple token pairs
def balance_liquidity(account_name, private_key, pairs, session):
    """Balance the liquidity for all token pairs defined in the configuration."""

    # Extract all token pairs from the configuration file
    token_pairs = [pair_info["pair"] for pair_info in pairs]

    # Fetch liquidity information for all pairs at once (optimizes API calls)
    pools = get_filtered_pools(account_name, token_pairs, session)

    # Loop through each pair to check if rebalancing is needed
    for pair_info in pairs:
        token_pair = pair_info["pair"]
        target_balance = Decimal(pair_info["target_balance"])
        target_token = pair_info["target_token"]
        threshold = Decimal(pair_info["threshold"])

        # Find the relevant pool info for the current pair
        pool_info = next(
            (pool for pool in pools if pool["token_pair"] == token_pair), None
        )

        if pool_info:
            # Extract base and quote tokens and their precisions from the token pair
            base_token, quote_token = token_pair.split(":")
            base_precision = pool_info.get(
                "base_precision", 3
            )  # Default to 3 if not found
            quote_precision = pool_info.get(
                "quote_precision", 3
            )  # Default to 3 if not found

            # Determine current balances based on target_token
            if target_token == quote_token:
                current_balance = Decimal(pool_info["quote_balance"])
                source_balance = Decimal(pool_info["base_balance"])
            else:
                target_token = base_token
                current_balance = Decimal(pool_info["base_balance"])
                source_balance = Decimal(pool_info["quote_balance"])

            current_balance = current_balance.quantize(
                Decimal("0.000001")
            )  # Adjust to required precision

            # Notify about the current balance vs. target balance
            print(
                f"Balancing {token_pair}: Current {target_token} balance is {current_balance}, Target is {target_balance}"
            )

            # Determine if we need to add or remove liquidity
            if current_balance > target_balance + threshold:
                # Remove excess liquidity
                excess = current_balance - target_balance
                print_coloured(
                    f"Excess: {excess} {target_token} - Removing liquidity",
                    Colours.ORANGE,
                )
                percentage = (excess / current_balance * Decimal(100)).quantize(
                    Decimal("0.001"), rounding=ROUND_DOWN
                )
                remove_liquidity(account_name, private_key, token_pair, percentage)

            elif current_balance < target_balance - threshold:
                # Add liquidity to make up for the deficit
                deficit = target_balance - current_balance
                print_coloured(
                    f"Deficit: {deficit} {target_token} - Adding liquidity",
                    Colours.GREEN,
                )

                # Calculate the corresponding token quantities needed
                if target_token == quote_token:
                    base_quantity = (
                        deficit * source_balance / current_balance
                    ).quantize(
                        Decimal(f'0.{"0" * base_precision}'), rounding=ROUND_DOWN
                    )
                    quote_quantity = deficit.quantize(
                        Decimal(f'0.{"0" * quote_precision}'), rounding=ROUND_DOWN
                    )
                else:
                    quote_quantity = (
                        deficit * source_balance / current_balance
                    ).quantize(
                        Decimal(f'0.{"0" * quote_precision}'), rounding=ROUND_DOWN
                    )
                    base_quantity = deficit.quantize(
                        Decimal(f'0.{"0" * base_precision}'), rounding=ROUND_DOWN
                    )

                add_liquidity(
                    account_name, private_key, token_pair, base_quantity, quote_quantity
                )

            # If the balance is within the threshold, no action is needed
            else:
                print_coloured(
                    f"No change needed for {token_pair}. Balance is within the threshold.",
                    Colours.CYAN,
                )

        else:
            print(f"No liquidity information found for {token_pair}.")


if __name__ == "__main__":
    start = time.time()
    # Load Hive-Engine nodes
    load_nodes()

    # Argument parsing for account name
    parser = argparse.ArgumentParser(description="Liquidity Balancer Script")
    parser.add_argument("account", nargs="?", help="Specify the account name")

    args = parser.parse_args()
    account_name = args.account

    # Ask for account if not provided as argument
    if not account_name:
        account_name = input("Please enter an account name: ")

    account_private_key = f"{account_name}.private_active_key.txt"
    account_config_file = f"{account_name}.pools_to_balance.json"

    # Fetch a working node to verify connectivity
    node_url = get_node()
    if not node_url:
        print("No Hive-Engine nodes are available. Exiting...")
        exit(1)

    # Fetch private active key
    private_active_key = read_private_key(account_private_key)

    # Load the pool configuration
    pairs_to_balance = load_config(account_config_file)

    end = time.time()
    time_difference = end - start
    print_coloured(f"Initialization took {time_difference} seconds.", Colours.GREEN)

    while True:  # Loop to run indefinitely
        start = time.time()
        check_interval = random.randint(300, 900)  # 5 to 15 minutes in seconds
        # Create the session
        with requests.Session() as session:
            balance_liquidity(
                account_name, private_active_key, pairs_to_balance, session
            )

        end = time.time()
        elapsed_time = end - start
        print_coloured(f"Balancing took {elapsed_time} seconds.\n", Colours.GREEN)
        # Wait for the specified interval before the next check
        print(f"Going to sleep for {check_interval // 60} minutes...")
        time.sleep(check_interval)

fetch_liquidity_pools.py:

# fetch_liquidity_pools.py
import json
import os
import argparse
import requests

# from time import sleep
import time
from random import choice  # To randomly choose a node from the list

# Hive-Engine API Node
# HIVE_ENGINE_NODE = 'https://api2.hive-engine.com/rpc/contracts'
NODES_FILE = "nodes.json"
retry_delay = 5  # seconds to wait between retries
max_retries = 3  # Maximum number of retries

# Default values
DEFAULT_ACCOUNT_NAME = "hive-engine"  # Replace with your actual Hive account name
DEFAULT_FILTER_TOKEN = (
    "BTC"  # Replace with the desired token to filter, or use 'ALL' to list all tokens
)

# File to store token details with precision
TOKEN_CACHE_FILE = "token_details_cache.json"
cached_token_details = {}
hive_engine_nodes = []

# Declare a global variable to store the selected node
selected_node = None


def load_nodes():
    global hive_engine_nodes
    # Check if the nodes file exists
    if os.path.exists(NODES_FILE):
        try:
            with open(NODES_FILE, "r") as f:
                hive_engine_nodes = json.load(f)
                print("Loaded Hive-Engine nodes from file.")
        except (ValueError, IOError):
            print(
                "Error: Hive-Engine nodes file is corrupted or not readable. Please re-create 'nodes.json' with the list of nodes."
            )
    else:
        print(
            "Error: Hive-Engine nodes file not found. Please create 'nodes.json' with the list of nodes."
        )
        hive_engine_nodes = []  # Ensure nodes list is empty on error


def get_node():
    global selected_node
    if selected_node:
        return selected_node
    # Choose a random node from the list
    if hive_engine_nodes:
        selected_node = choice(hive_engine_nodes)
        print(f"Using Hive-Engine node: {selected_node}")  # Print the current node
        return selected_node
    else:
        print("Error: No Hive-Engine nodes available.")
        return None


def load_token_cache():
    global cached_token_details

    print(
        "Attempting to load the token cache..."
    )  # Add this to ensure the function is being called

    # Check if the token cache file exists
    if os.path.exists(TOKEN_CACHE_FILE):
        try:
            with open(TOKEN_CACHE_FILE, "r") as f:
                cached_token_details = json.load(f)
                print(
                    f"Loaded {len(cached_token_details)} cached token details from file."
                )
                # Pretty print the loaded cache for debugging purposes
                # print(json.dumps(cached_token_details, indent=4))

        except (ValueError, IOError) as e:
            print(
                "Error: Failed to load token cache file: {e}. Starting with an empty cache."
            )
    else:
        print(
            f"Error: {TOKEN_CACHE_FILE} does not exist. Starting with an empty cache."
        )


""" Following functions will run and initialize the nodes and token cache """
# Load nodes from the external file
load_nodes()
# Load cached token details
load_token_cache()


def save_token_cache():
    # Save the current token details cache to a file in a human-readable format
    try:
        with open(TOKEN_CACHE_FILE, "w") as f:
            json.dump(
                cached_token_details, f, indent=4
            )  # Add 'indent=4' for readability
            print("Saved token details to cache file.")
    except IOError:
        print("Error: Failed to save token cache file.")


def fetch_token_details(symbol, session):
    # Check if token details are already cached
    if symbol in cached_token_details:
        print(f"Token details for {symbol} found in cache.")
        return cached_token_details[symbol]

    print(f"Token details for {symbol} not found in cache.")
    print(f"Fetching token details for {symbol}...")

    # Fetch token details for the given symbol
    for attempt in range(max_retries):
        url = get_node()
        if not url:
            return {}

        payload = {
            "jsonrpc": "2.0",
            "id": 1,
            "method": "find",
            "params": {
                "contract": "tokens",
                "table": "tokens",
                "query": {"symbol": symbol},
                "limit": 1,
            },
        }

        try:
            response = session.post(url, json=payload)

            # print(f"Attempt {attempt+1}: Status Code: {response.status_code}, Response: {response.text}")

            if response.status_code == 200:
                try:
                    data = response.json()
                except ValueError:
                    print("Error: Failed to parse JSON response.")
                    return {}

                if "result" in data and data["result"]:
                    cached_token_details[symbol] = data["result"][
                        0
                    ]  # Cache the token details
                    save_token_cache()  # Save cache after updating
                    return data["result"][0]
                else:
                    print(f"No details found for {symbol}.")

            elif response.status_code == 503:
                print(
                    f"Error: Failed to fetch token details for {symbol}. Status Code: 503 (Service Unavailable)."
                )
                #  Retry after a delay, with an increasing backoff
                time.sleep(retry_delay * (attempt + 1))
                continue

            print(
                f"Error: Failed to fetch token details for {symbol}. Status Code: {response.status_code}"
            )

        except requests.exceptions.RequestException as e:
            print(f"Request exception occurred for {symbol}: {e}")

        # If the final attempt fails, return empty details
        if attempt == max_retries - 1:
            print(f"Max retries exceeded for {symbol}. Skipping.")
            time.sleep(retry_delay)

    return {}


def fetch_pool_details(token_pair):
    # Fetch details of the specified liquidity pool
    for attempt in range(max_retries):
        url = get_node()
        if not url:
            print("Error: No node URL available, exiting fetch_pool_details.")
            return {}

        payload = {
            "jsonrpc": "2.0",
            "id": 1,
            "method": "find",
            "params": {
                "contract": "marketpools",
                "table": "pools",
                "query": {"tokenPair": token_pair},
                "limit": 1,
            },
        }

        print(
            f"Attempt {attempt + 1} to fetch pool details for {token_pair} from {url}..."
        )  # Debugging statement

        try:
            response = session.post(
                url, json=payload, timeout=10
            )  # Set a timeout for the request
            # print(
            #     f"Received response status code: {response.status_code} for {token_pair} from {url}"
            # )

            if response.status_code == 200:
                try:
                    data = response.json()
                    print(
                        f"Data received for {token_pair}: {data}"
                    )  # Debugging the received data
                    if "result" in data and data["result"]:
                        print(f"Successfully fetched pool details for {token_pair}")
                        return data["result"][0]
                    else:
                        print(
                            f"Unexpected response format or empty result for {token_pair} from {url}: {data}"
                        )
                except ValueError as e:
                    print(f"Error: Failed to parse JSON response: {e}.")
                    # print(f"Response content: {response.text}") # Print the actual response content
            else:
                print(
                    f"Error: Failed to fetch pool details for {token_pair}. Status Code: {response.status_code}"
                )
        except requests.exceptions.RequestException as e:
            print(f"Request exception occurred for {token_pair} from {url}: {e}")

        # Handle retries
        if attempt < max_retries - 1:
            print(f"Retrying after {retry_delay} seconds...")
            time.sleep(retry_delay)
        else:
            print(f"Max retries exceeded for {token_pair}. Skipping to next.")

    print(f"Returning empty details for {token_pair} after all attempts.")
    return {}


def fetch_all_pools(session):
    attempts = 0
    max_atetmpts = len(hive_engine_nodes)
    while attempts < max_atetmpts:
        url = get_node()
        if not url:
            return []

        payload = {
            "jsonrpc": "2.0",
            "id": 1,
            "method": "find",
            "params": {
                "contract": "marketpools",
                "table": "pools",
                "query": {},
                "limit": 1000,  # Adjust limit based on how many pools exist
            },
        }

        try:
            response = session.post(url, json=payload, timeout=10)
            if response.status_code == 200:
                data = response.json()
                return data.get("result", [])
            else:
                print(
                    f"Error: Failed to fetch all pools. Status Code: {response.status_code}"
                )
                # return []
        except requests.exceptions.RequestException as e:
            print(f"An error occurred: {e}. Trying a different node.")
            # return []

        # Rotate to another node if an error occurs
        attempts += 1

    print("Max attempts reached. Unable to fetch pool details.")
    return []


def fetch_liquidity_positions(account_name, session, retries=5, backoff_factor=5):
    # Fetch liquidity positions for the given account
    url = get_node()
    if not url:
        return {}

    payload = {
        "jsonrpc": "2.0",
        "id": 1,
        "method": "find",
        "params": {
            "contract": "marketpools",
            "table": "liquidityPositions",
            "query": {"account": account_name},
            "limit": 1000,
        },
    }

    for attempt in range(retries):
        try:
            response = session.post(url, json=payload, timeout=10)
            # print("Response Status Code: ", response.status_code)

            # Print the entire raw response text for debugging purposes
            # print("Raw response text: ", response.text)

            if response.status_code == 200:
                try:
                    data = response.json()
                    return data.get("result", [])
                except ValueError:
                    print("Error: Failed to parse JSON response.")
                    return []
            else:
                print(
                    f"Error: Failed to fetch data. Status Code: {response.status_code}"
                )
                return []

        except requests.exceptions.ConnectionError as e:
            print(
                f"Attempt {attempt + 1}: Connection error: {e}, retrying in {backoff_factor} seconds..."
            )
            time.sleep(backoff_factor)
        except requests.exceptions.Timeout as e:
            print(
                f"Attempt {attempt + 1}: Request timed out: {e}, retrying in {backoff_factor} seconds..."
            )
            time.sleep(backoff_factor)
        except requests.exceptions.RequestException as e:
            print(f"Attempt {attempt + 1}: An error occurred: {e}")
            return []

    print(f"Max retries exceeded. Could not fetch data for account: {account_name}")
    return []


def get_filtered_pools(account_name, token_pairs, session):
    """Fetch liquidity information for multiple token pairs."""
    # Fetch all pools in one go
    all_pools = fetch_all_pools(session)  # This fetches all pools in one go

    # Check if pools were fetched successfully
    if not all_pools:
        print("Error: Failed to fetch all pools.")
        return []

    pool_dict = {pool["tokenPair"]: pool for pool in all_pools}

    filtered_pools = []

    # Fetch positions for all token pairs at once
    positions = fetch_liquidity_positions(account_name, session)

    # Debug: Check fetched positions
    print(f"Fetched {len(positions)} liquidity positions for account {account_name}.")

    if not positions:
        print("No liquidity positions found for this account.")
        return []

    # If token_pairs is empty (e.g., 'ALL'), return all pools
    if not token_pairs:
        token_pairs = pool_dict.keys()

    for token_pair in token_pairs:
        pool_info = pool_dict.get(token_pair)
        if pool_info:
            position = next(
                (pos for pos in positions if pos["tokenPair"] == token_pair), None
            )
            if position:
                base_symbol, quote_symbol = token_pair.split(":")
                # Fetch token details (for precision) for the base and quote tokens
                base_token_details = fetch_token_details(base_symbol, session)
                quote_token_details = fetch_token_details(quote_symbol, session)

                base_precision = base_token_details.get("precision", 0)
                quote_precision = quote_token_details.get("precision", 0)

                # Calculate balances
                shares = float(position.get("shares", "0"))
                base_quantity = float(pool_info.get("baseQuantity", "0"))
                quote_quantity = float(pool_info.get("quoteQuantity", "0"))
                total_shares = float(pool_info.get("totalShares", "0"))

                if total_shares > 0:
                    user_base_balance = (shares / total_shares) * base_quantity
                    user_quote_balance = (shares / total_shares) * quote_quantity

                    filtered_pools.append(
                        {
                            "token_pair": token_pair,
                            "base_symbol": base_symbol,
                            "quote_symbol": quote_symbol,
                            "base_balance": user_base_balance,
                            "quote_balance": user_quote_balance,
                            "base_precision": base_precision,
                            "quote_precision": quote_precision,
                        }
                    )

    # Debug: Print the number of filtered pools
    print(f"Number of filtered pools: {len(filtered_pools)}")

    return filtered_pools


def main(account_name, filter_token):
    # Create a session object
    with requests.Session() as session:
        # If filtering by 'ALL', fetch everything, otherwise pass the token as a list
        token_pairs = [filter_token] if filter_token.upper() != "ALL" else []

        # Fetch and print filtered pools
        pools = get_filtered_pools(account_name, token_pairs, session)
        print(f"\nLiquidity Pool Positions with {filter_token.upper()} token:")
        for pool in pools:
            print(
                f"Token Pair: {pool['token_pair']} | Base Balance: {pool['base_balance']:.{pool['base_precision']}f} {pool['base_symbol']} | "
                f"Quote Balance: {pool['quote_balance']:.{pool['quote_precision']}f} {pool['quote_symbol']}"
            )

        # Debug: If no pools were printed
        if not pools:
            print("No matching liquidity pools found for the given filter.")


if __name__ == "__main__":
    # When run as a standalone script
    session = requests.Session()
    try:
        parser = argparse.ArgumentParser(
            description="Fetch Hive-Engine liquidity pools."
        )
        parser.add_argument(
            "account_name",
            nargs="?",
            default=DEFAULT_ACCOUNT_NAME,
            help="Hive account name to fetch liquidity pools for.",
        )
        parser.add_argument(
            "filter_token",
            nargs="?",
            default=DEFAULT_FILTER_TOKEN,
            help="Token to filter by, or 'ALL' to list all tokens.",
        )

        args = parser.parse_args()

        main(args.account_name, args.filter_token)
    finally:
        session.close()

ToDo:

LiquidityBalancer.py:

  1. Reserve token handling capability
  2. Multiple accounts functionality
  3. Cross-account transfers
  4. Better colours for output

fetch_liquidity_pools.py

  1. Coloured output
  2. Multiple token input from cli?

Both:

  1. Integration with other tools
  2. Better error handling considering edge cases
    and other external factors.
  3. Refactoring and/or modularizing functionality
  4. Further optimizations

My Thoughts on This Project:

I wouldn’t call myself a programmer just yet, but diving deep into this project has been an eye-opening journey. Along the way, I’ve uncovered just how much I didn’t know — and sometimes, the discoveries have truly blown my mind. Understanding the connection between Hive and Hive-Engine, learning how APIs function, and realizing the inner workings of a blockchain like Hive has been both humbling and exciting.

It’s fascinating to look back and see how little I knew about what it truly meant to code for Hive. I’ve gone from being completely ignorant of the mechanics behind blockchains and sidechains to now having a working grasp of how everything fits together. And even though I still have plenty to learn, this project has shown me that every small step leads to a much larger understanding.

I think I might stay on this path for a while longer.

Sort:  

This is beyond impressive 😅 no surprise you had to work on it for quite some time...

I wouldn’t call myself a programmer just yet

It always makes me a bit sad reading that something like that is still not enough to be called a programer... it looks so amazing, difficult to realize and almost unreachable for me, so it makes me question if I will ever become decent at coding 😅 I guess not !LOL (a sad lol!)

Why did the art thief's vehicle run out of fuel?
He had no Monet to buy Degas to make the Van Gogh

Credit: reddit
@gamer00, I sent you an $LOLZ on behalf of arc7icwolf

(2/10)
Delegate Hive Tokens to Farm $LOLZ and earn 110% Rewards. Learn more.

Hey, @Arc7icWolf, thanks a ton! 😊

I totally get what you’re saying — I’ve been there too, doubting if I could ever call myself a programmer. (Edit: Still find myself there occasionally...) But trust me, all of this was just small steps and learning through mistakes. What you’re seeing now is the result of lots of trial and error (and a few headaches along the way).

The truth is, I still feel like I’m learning every day, and I bet every programmer out there feels the same. We all start from the same place — not knowing anything — but I guess the key is to keep at it. I’ve seen the ideas you’ve been working on, and I know you’re well on your way.

The fact that you’re passionate about learning means you’re already on the right track. Keep pushing, and one day you’ll look back at this and realize just how far you’ve come! Also, that I wasn't after all that far ahead... In some respects, I am looking at you from behind.

!PIZZA !WINE

I'll keep doing my best to learn new stuff :) and hopefully one day I'll be able to code a bit more properly 😅

In some respects, I am looking at you from behind

Only if we talk about errors or questions asked to ChatGPT, otherwise I'm the only one looking from behind !LOL

Now something I'd like to start understanding a bit more are databases and how to create, manage and access them.

I know, for example, that in your script you save in a cache some info about the fetched tokens (so you don't have to fetch them again each time you run the script), and I was asking myself: can be this considered as some sort of database? And if you would like to create a more complex one, you could use something like Pandas, right? And if wont to get even more complex, this is where you should consider using SQL?

Am I getting the steps right or I'm mixing different things together? 😅

!PIZZA

How many jazz musicians does it take to replace a lightbulb?
A-one, a-two, a one-two-three-four!

Credit: reddit
@gamer00, I sent you an $LOLZ on behalf of arc7icwolf

(1/10)
Delegate Hive Tokens to Farm $LOLZ and earn 110% Rewards. Learn more.

Glad you asked! I’ve worked with SQL, so I’ve got some knowledge to share.

On a computer, pretty much everything is some form of a database. It can be as simple as a list object in Python, a static file on a disk (like we do with the token cache), or even a blockchain! Each of these stores data in different ways. SQL databases are more structured and designed for serving large collections of data to clients — for example, websites or accounting software. They’re like a more formal system for managing and retrieving data efficiently.

You mentioned Pandas — but since I don't personally know much of them, I had to ask ChatGPT in my own turn... and yes, Pandas seems to be a great tool for handling data in Python, especially when working with data frames (like big tables of data in memory). But it’s not quite a database in the traditional sense. It's is more of a tool for manipulating and analyzing data once it’s already loaded into your program. If you wanted to persist that data across sessions or handle really large datasets, then you’d want to consider something like SQL.

SQL databases can handle large, complex queries efficiently, and allow you to store, retrieve, update, and delete data as needed. Pandas can help you work with data in your code, but it doesn't have the persistence of a proper database unless you save it to a file.

So you’ve got the right idea! Pandas can be cool for working with data in your code, but if you need something more powerful or persistent, SQL is the next step.

!WINE

No idea how I missed your reply, sorry 😅

Luckily I came back to read again your script and, while scrolling through the comments, I noticed one I hadn't seen before!

Thanks for the answer :)

So, databases are a bit too much for me, that's for sure, but if I ever need to manage data in a persistent way, they are the right tool. Gotcha!

!PIZZA !LOL !HUG

Did you hear about the cat who ate a ball of yarn?
She had mittens.

Credit: marshmellowman
@gamer00, I sent you an $LOLZ on behalf of arc7icwolf

(2/10)
Delegate Hive Tokens to Farm $LOLZ and earn 110% Rewards. Learn more.

I wonder if I should rename the script to "Liquidity Reverse Balancer" instead, now that I think of it. Since it doesn't really "balance" anything, quite contrary. And my 'explanation' was bit silly too. The script is basically a "stock" enhancer, as its main purpose is to target price reversals by advance stocking the other side of the pair in anticipation of one. It took me quite a while to fully understand what I had created.

Oh well, when we talk about trading I'm even more clueless than with coding 😅

There is a difference though. Trading comes with fees (and risk), but manipulating liquidity pools doesn't have any, if you deal with small margins and a large enough reserve to handle the changes.

I may try it with a small pool where I already have some liquidity in.

Putting it to the test will also help me better understand how it works.

!PIZZA

Good luck! :) Be sure to have some reserve in your wallet. The script tends to die sometimes if tokens run out when adding liquidity. (I used to have an error handler for it, but I forgot to add it back when I rewrote the thing for the n'th time.)

I didn't know you were a tech guy, this coding language is one I can never understand but I can understand that you have actually done a great job.

Thanks so much for the kind words! I can definitely understand how coding can seem like a foreign language at times, but I’m really, really happy that you think it’s a great job! 😊 If you ever get curious about any of it, I’d be more than happy to explain (or at least try to make it sound less mysterious, haha).

!WINE

I will definitely keep that in view 😊

Lol I was bored scrolling down. How difficult it would be to write entire code!😳 Tech guys are genius and got to have patient ofcourse.

Your post will scare a guy who is thinking to be a Tech guy.😅

Haha, I get it! 😅 Coding can seem like a maze at first, but it’s not all as intimidating as it looks — I promise! Most of it is small steps, lots of trial and error, and a bit of tea-fueled patience. 🍵😉 (Or coffee, if you prefer.)

And hey, if someone’s still brave enough to become a tech person after reading my post, they’re already halfway there!

Thanks for sticking with it though! 😄

PIZZA!
The Hive.Pizza team manually curated this post.

$PIZZA slices delivered:
@arc7icwolf(1/10) tipped @gamer00 (x3)
gamer00 tipped arc7icwolf

You can now send $PIZZA tips in Discord via tip.cc!