Creating a Crypto Bot Using CCXT (to capture the upper shadows of candles on a positive trend)

# bot.py
# This is a cryptocurrency trading bot for the Binance spot market using the ccxt library.
# The bot requests a list of spot pairs, filters by daily volume (>= 10M USDT), looks for pairs with a positive trend on 1h candles,
# selects top pairs by volume and trend strength, then checks for upper wick >3% from SMA3 on 1m chart.
# If the condition is met, enters a market buy, sets TP +1% and SL -3%.
# Manages concurrent trades based on config or deposit size.
# IMPORTANT: This is for educational purposes only. Trading involves risks. Use at your own risk.
# You need to provide your Binance API key and secret.
# Run the bot in a loop or with a scheduler, e.g., time.sleep().
import ccxt # Library for interacting with crypto exchanges
import pandas as pd # For data manipulation and SMA calculation
import time # For pauses and timing
from datetime import datetime # For logging timestamps
import logging # For logging actions
# Logging setup
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# Configuration settings - adjust as needed
CONFIG = {
    'api_key': 'YOUR_BINANCE_API_KEY', # Replace with your Binance API key
    'api_secret': 'YOUR_BINANCE_API_SECRET', # Replace with your Binance secret
    'test_mode': True, # Set True for testnet (sandbox), False for real trading
    'min_volume_usdt': 10_000_000, # Minimum 24h volume in USDT to consider a pair
    'trend_lookback_hours': 4, # Number of 1h candles to check positive trend (avg % change > 0)
    'min_trend_strength': 0.5, # Minimum average % change over the period for positive trend (in %)
    'max_concurrent_trades': 5, # Maximum number of pairs for concurrent trading (overrides based on deposit if >0)
    'risk_per_trade_percent': 1.0, # % of total USDT balance at risk per trade (if max_concurrent_trades=0)
    'wick_threshold': 3.0, # % upper wick above SMA3 on 1m for entry
    'take_profit_percent': 1.0, # Fixed TP +% from entry price
    'stop_loss_percent': 3.0, # Fixed SL -% from entry price
    'poll_interval_seconds': 60, # Interval for checking opportunities (every minute)
    'base_quote': 'USDT', # Focus on pairs quoted in USDT
}
# Initialize exchange connection
def init_exchange():
    logger.info("Initializing connection to Binance")
    # Create ccxt.binance instance with API credentials
    exchange = ccxt.binance({
        'apiKey': CONFIG['api_key'],
        'secret': CONFIG['api_secret'],
        'enableRateLimit': True, # Enable rate limiting to avoid bans
    })
    # Set to testnet if test_mode is True
    if CONFIG['test_mode']:
        logger.info("Test mode (sandbox) enabled")
        exchange.set_sandbox_mode(True) # Use Binance testnet for simulation
    else:
        logger.info("Real trading mode enabled")
    return exchange
# Get all spot pairs quoted in USDT
def get_spot_usdt_pairs(exchange):
    logger.info("Loading all markets from the exchange")
    markets = exchange.load_markets()
    logger.info(f"Found {len(markets)} markets")
    # Filter active spot pairs ending with /USDT
    usdt_pairs = [symbol for symbol in markets if markets[symbol]['active'] and markets[symbol]['spot'] and symbol.endswith('/' + CONFIG['base_quote'])]
    logger.info(f"Found {len(usdt_pairs)} spot pairs with USDT")
    return usdt_pairs
# Filter pairs by 24h volume >= min_volume_usdt
def filter_by_volume(exchange, pairs):
    logger.info(f"Filtering {len(pairs)} pairs by volume")
    filtered_pairs = []
    # Fetch tickers for all pairs (batch request for efficiency)
    tickers = exchange.fetch_tickers(pairs)
    logger.info("Tickers fetched")
    for pair in pairs:
        ticker = tickers.get(pair)
        if ticker and 'quoteVolume' in ticker and ticker['quoteVolume'] >= CONFIG['min_volume_usdt']:
            filtered_pairs.append(pair)
    logger.info(f"After volume filter, {len(filtered_pairs)} pairs remain")
    return filtered_pairs
# Calculate positive trend strength on 1h candles
# Trend strength: average % change over the last N hours, must be > min_trend_strength
def get_trend_strength(exchange, pair):
    logger.info(f"Calculating trend for pair {pair}")
    # Fetch last N+1 1h OHLCV candles (open, high, low, close, volume)
    ohlcv = exchange.fetch_ohlcv(pair, '1h', limit=CONFIG['trend_lookback_hours'] + 1)
    if len(ohlcv) < CONFIG['trend_lookback_hours'] + 1:
        logger.warning(f"Insufficient data for {pair}")
        return 0 # Insufficient data
    # Convert to DataFrame for easy calculation
    df = pd.DataFrame(ohlcv, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume'])
    # Calculate % change for each candle: (close - open) / open * 100
    df['pct_change'] = (df['close'] - df['open']) / df['open'] * 100
    # Average % change over the last N candles
    avg_change = df['pct_change'][-CONFIG['trend_lookback_hours']:].mean()
    trend = avg_change if avg_change > 0 else 0 # Only positive trends
    logger.info(f"Trend strength for {pair}: {trend}")
    return trend
# Get pairs with positive trend, sorted by volume and trend strength
def get_top_trending_pairs(exchange, pairs):
    logger.info(f"Searching top trending pairs among {len(pairs)}")
    pair_data = []
    tickers = exchange.fetch_tickers(pairs) # Fetch volumes again or cache if needed
    logger.info("Tickers for trending pairs fetched")
    for pair in pairs:
        trend = get_trend_strength(exchange, pair)
        if trend >= CONFIG['min_trend_strength']:
            volume = tickers[pair]['quoteVolume']
            pair_data.append({'pair': pair, 'trend': trend, 'volume': volume})
    # Sort by volume descending, then by trend descending
    pair_data.sort(key=lambda x: (x['volume'], x['trend']), reverse=True)
    top_pairs = [p['pair'] for p in pair_data]
    logger.info(f"Found {len(top_pairs)} top pairs with positive trend")
    return top_pairs
# Determine maximum trades based on config or deposit
def get_max_trades(exchange):
    if CONFIG['max_concurrent_trades'] > 0:
        max_trades = CONFIG['max_concurrent_trades']
        logger.info(f"Maximum concurrent trades from config: {max_trades}")
        return max_trades
    # Otherwise, based on deposit
    balance = exchange.fetch_balance()
    usdt_balance = balance['free'].get('USDT', 0)
    logger.info(f"Free USDT balance: {usdt_balance}")
    # Assume risking risk_per_trade_percent% per trade, each trade ~ entry size for min order
    # For simplicity: max_trades = int(100 / risk_per_trade_percent) # E.g., 1% risk allows 100 trades theoretically
    max_trades = max(1, int(100 / CONFIG['risk_per_trade_percent']))
    logger.info(f"Calculated maximum concurrent trades: {max_trades}")
    return max_trades
# Check upper wick on 1m chart: upper wick > 3% of SMA3
def check_upper_wick(exchange, pair):
    logger.info(f"Checking upper wick for {pair}")
    # Fetch last 4 1m candles (need 3 for SMA + current)
    ohlcv = exchange.fetch_ohlcv(pair, '1m', limit=4)
    if len(ohlcv) < 4:
        logger.warning(f"Insufficient data for upper wick {pair}")
        return False
    df = pd.DataFrame(ohlcv, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume'])
    # SMA3 of closes from candles 0,1,2 (excluding current)
    sma3 = df['close'][:-1].mean() # Average of last 3 closes before current
    # Current candle
    current_open = df['open'].iloc[-1]
    current_high = df['high'].iloc[-1]
    current_close = df['close'].iloc[-1]
    # Upper wick: high - max(open, close)
    upper_wick = current_high - max(current_open, current_close)
    wick_percent = (upper_wick / sma3) * 100
    is_wick = wick_percent > CONFIG['wick_threshold']
    logger.info(f"Upper wick for {pair}: {wick_percent}%, condition: {is_wick}")
    return is_wick
# Enter trade: market buy, then set TP and SL as limit orders
def enter_trade(exchange, pair, amount):
    logger.info(f"Entering trade for {pair} with amount {amount}")
    # Market buy
    order = exchange.create_market_buy_order(pair, amount)
    entry_price = order['average'] # Average fill price
    logger.info(f"Entry price: {entry_price}")
    # Calculate TP and SL prices
    tp_price = entry_price * (1 + CONFIG['take_profit_percent'] / 100)
    sl_price = entry_price * (1 - CONFIG['stop_loss_percent'] / 100)
    # Amount for sell in TP/SL (assuming full position)
    position_amount = order['filled']
    # Create limit sell for TP
    exchange.create_limit_sell_order(pair, position_amount, tp_price)
    logger.info(f"Set TP at {tp_price}")
    # Create stop-limit sell for SL (ccxt supports stop orders via params)
    exchange.create_order(pair, 'stop_limit', 'sell', position_amount, sl_price, params={'stopPrice': sl_price})
    logger.info(f"Set SL at {sl_price}")
    print(f"{datetime.now()}: Entered trade on {pair} at {entry_price}, TP {tp_price}, SL {sl_price}")
# Get minimum order amount for pair and calculate based on balance
def get_trade_amount(exchange, pair, usdt_balance):
    logger.info(f"Calculating trade amount for {pair}")
    market = exchange.market(pair)
    min_amount = market['limits']['amount']['min']
    # Calculation based on risk: but since fixed SL%, risk = entry * (stop_loss_percent/100)
    # For simplicity, use balance portion
    portion_usdt = usdt_balance / get_max_trades(exchange) # Divide balance evenly
    ticker = exchange.fetch_ticker(pair)
    price = ticker['last']
    amount = portion_usdt / price
    trade_amount = max(min_amount, amount) # Ensure not less than minimum
    logger.info(f"Trade amount: {trade_amount}")
    return trade_amount
# Main bot loop
def main():
    exchange = init_exchange()
    active_trades = {} # Track active pairs: {pair: entry_time}
    while True:
        try:
            logger.info("Starting check cycle")
            # Step 1: Get all spot USDT pairs
            all_pairs = get_spot_usdt_pairs(exchange)
            # Step 2: Filter by volume
            volume_filtered = filter_by_volume(exchange, all_pairs)
            # Step 3: Get top trending pairs
            top_pairs = get_top_trending_pairs(exchange, volume_filtered)
            # Step 4: Determine max trades
            max_trades = get_max_trades(exchange)
            # Select top-N pairs, where N = max_trades - current active
            available_slots = max_trades - len(active_trades)
            candidates = top_pairs[:available_slots]
            logger.info(f"Available slots: {available_slots}, candidates: {len(candidates)}")
            # Step 5: Check upper wick and enter trades
            balance = exchange.fetch_balance()
            usdt_free = balance['free']['USDT']
            logger.info(f"Free USDT: {usdt_free}")
            for pair in candidates:
                if check_upper_wick(exchange, pair):
                    amount = get_trade_amount(exchange, pair, usdt_free)
                    enter_trade(exchange, pair, amount)
                    active_trades[pair] = time.time()
                    usdt_free -= amount * exchange.fetch_ticker(pair)['last'] # Approximate update of available balance
                    logger.info(f"Active trades: {len(active_trades)}")
            # Step 6: Check active trades (optional: close if needed, but here rely on TP/SL)
            # Simplified: remove after time or check balance
            time.sleep(CONFIG['poll_interval_seconds'])
            logger.info("End of cycle, pausing")
        except Exception as e:
            logger.error(f"Error: {e}")
            time.sleep(60) # Pause on error
if __name__ == "__main__":
    main()