AI Trading Bot: Developing an Algorithm for Long Trades in Python Using CCXT (Machine Learning for Trading: Predicting Trend Reversals in the Crypto Market)

import logging # For logging messages
import configparser # For reading configuration file
import os # For working with file system
import pandas as pd # For working with data in tabular format
import numpy as np # For numerical operations
import sys # For system parameters
import time # For delays
from joblib import dump, load # For saving/loading models
from collections import Counter # For counting classes
from imblearn.over_sampling import SMOTE # For oversampling classes
from sklearn.model_selection import train_test_split # For splitting data
from sklearn.ensemble import RandomForestClassifier # Random Forest model
from sklearn.metrics import accuracy_score, classification_report # Evaluation metrics
import ccxt # Library for interacting with exchanges
# Setting up logging to output messages to console and file
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
def get_application_path():
    """Determines the path to the executable file or script."""
    if getattr(sys, 'frozen', False): # If running as .exe
        return os.path.dirname(sys.executable)
    elif '__file__' in globals(): # If running as .py script
        return os.path.dirname(os.path.abspath(__file__))
    else: # If running in interpreter (REPL, Jupyter)
        return os.getcwd()
application_path = get_application_path()
print("Application path:", application_path)
# Reading configuration file settings.ini
config = configparser.ConfigParser()
config.read(os.path.join(application_path, "settings.ini"), encoding="utf-8")
try:
    # Initializing Binance exchange via ccxt (assuming API keys in config if needed)
    binance = ccxt.binance({
        'apiKey': config.get('BINANCE', 'api_key', fallback=''), # If keys are needed
        'secret': config.get('BINANCE', 'secret', fallback=''),
    })

    # Example pair, taken from config or argument (replace with real one)
    pair = config.get('SETTINGS_PAIR', 'pair', fallback='BTC-USDT') # For example, 'BTC-USDT'
    if binance.has['fetchOHLCV']:
        logger.debug(f"binance 1h {pair}")
        logger.debug(f"ms {int(binance.milliseconds())}")

        # Path to the model
        model_path = os.path.join(application_path, f"models/crypto_model_1h_{pair.replace('-', '')}.joblib")

        if os.path.exists(model_path):
            count_day = int(config['SETTINGS_PAIR']['count_day'])
        else:
            count_day = 0
            logger.info(f"Model for pair {pair.replace('-', '')} not found! Fetching candles from the start of trading.")

        # Calculating start date for data loading
        if count_day != 0:
            since = int(binance.milliseconds()) - 86400000 * count_day # -N days from current time
        else:
            since = 0 # From the start of trading

        klines = [] # List to store candles

        # Loop to load OHLCV data in batches of 1000 candles
        while since < int(binance.milliseconds()):
            orders = binance.fetch_ohlcv(
                symbol=pair.replace('-', '/'),
                since=since,
                limit=1000,
                timeframe='1h'
            )
            time.sleep(binance.rateLimit / 1000) # Delay based on exchange rate limit

            if len(orders):
                since = orders[-1][0] + 1 # Update since to the next candle
                klines += orders
            else:
                break

        # Function to convert raw data to DataFrame
        def fetch_ohlcv(klines):
            df = pd.DataFrame(klines, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume'])
            return df

        data_1h = fetch_ohlcv(klines)
        data_1h['timestamp'] = pd.to_datetime(data_1h['timestamp'], unit='ms') # Convert timestamp to datetime
        data_1h.set_index('timestamp', inplace=True) # Set timestamp as index
        logger.debug(f"data_df {data_1h}")

        # Function to add technical indicators and features
        def add_features(data_1h):
            if data_1h.empty:
                logger.debug("DataFrame is empty. Check data source.")
                return data_1h

            # Calculating candle patterns in percentages
            data_1h['lower_shadow'] = (data_1h['open'] - data_1h['low']) / data_1h['open'] * 100
            data_1h['upper_shadow'] = (data_1h['high'] - data_1h['close']) / data_1h['open'] * 100
            data_1h['body_size'] = abs(data_1h['close'] - data_1h['open']) / data_1h['open'] * 100
            data_1h['volatility'] = (data_1h['high'] - data_1h['low']) / data_1h['open'] * 100
            data_1h['volume_change'] = data_1h['volume'].pct_change().replace([np.inf, -np.inf], np.nan).fillna(0)

            # Adding lagged features for shadows and volatility
            for lag in range(1, 6):
                data_1h[f'lag_{lag}_shadow'] = data_1h['lower_shadow'].shift(lag)
                data_1h[f'lag_{lag}_volatility'] = data_1h['volatility'].shift(lag)

            # Rate of Change (ROC) for 3 periods
            data_1h['roc_3'] = data_1h['close'].pct_change(periods=3) * 100

            # Function to calculate RSI
            def calculate_rsi(series, period=14):
                delta = series.diff()
                gain = np.where(delta > 0, delta, 0)
                loss = np.where(delta < 0, -delta, 0)
                avg_gain = pd.Series(gain).rolling(window=period).mean()
                avg_loss = pd.Series(loss).rolling(window=period).mean()
                rs = avg_gain / avg_loss
                rsi = 100 - (100 / (1 + rs))
                return rsi

            data_1h['rsi_3'] = calculate_rsi(data_1h['close'], period=3)

            # Lagged price differences
            for lag in range(1, 6):
                data_1h[f'diff_lag_{lag}'] = data_1h['close'] - data_1h['close'].shift(lag)

            # Volume clusters
            data_1h['volume_cluster'] = data_1h['volume'] / data_1h['volume'].rolling(window=3).mean()

            # Rolling statistics
            data_1h['rolling_max'] = data_1h['high'].rolling(window=3).max()
            data_1h['rolling_min'] = data_1h['low'].rolling(window=3).min()
            data_1h['rolling_std'] = data_1h['close'].rolling(window=3).std()

            # Trend indicators (SMA, EMA)
            data_1h['sma_50'] = data_1h['close'].rolling(window=50).mean()
            data_1h['sma_200'] = data_1h['close'].rolling(window=200).mean()
            data_1h['ema_14'] = data_1h['close'].ewm(span=14, adjust=False).mean()
            data_1h['price_sma50_diff'] = data_1h['close'] - data_1h['sma_50']
            data_1h['price_sma200_diff'] = data_1h['close'] - data_1h['sma_200']
            data_1h['price_ema_ratio'] = data_1h['close'] / data_1h['ema_14']

            # Volatility indicators (ATR)
            data_1h['atr'] = (data_1h['high'] - data_1h['low']).rolling(window=6).mean()
            data_1h['volatility_ratio'] = data_1h['volatility'] / data_1h['atr'] # Improved: ratio to ATR instead of close

            # Volume features
            data_1h['volume_sma'] = data_1h['volume'].rolling(window=3).mean()
            data_1h['volume_ratio'] = data_1h['volume'] / data_1h['volume_sma']

            # Lagged differences
            data_1h['price_diff_1'] = data_1h['close'] - data_1h['close'].shift(1)
            data_1h['volume_diff_1'] = data_1h['volume'] - data_1h['volume'].shift(1)

            # Logging columns and data
            column_names = data_1h.columns.tolist()
            logger.debug(f"Headers data_1h 1h: {column_names}")
            logger.debug(f"Features data_1h 1h: {data_1h}")

            # Handling NaN: forward fill, then zeros
            data_1h.fillna(method='ffill', inplace=True)
            data_1h.fillna(0, inplace=True)

            return data_1h

        # Function for training the model
        def train_model(data_1h, model_path):
            data_1h = add_features(data_1h)

            # Defining target: price increase over 3 candles forward (sustained trend)
            data_1h['target'] = ((data_1h['close'] > data_1h['close'].shift(1)) &
                                 (data_1h['close'].shift(-1) > data_1h['close']) &
                                 (data_1h['close'].shift(-2) > data_1h['close'])).astype(int)

            # Dropping rows with NaN in target (due to shift(-1/-2))
            data_1h.dropna(subset=['target'], inplace=True)

            features = [col for col in data_1h.columns if col not in ['target']]
            X = data_1h[features]
            y = data_1h['target']
            logger.debug(f"Class distribution: {Counter(y)}")

            # Checking for sufficient data
            if len(Counter(y)) < 2 or Counter(y).get(1, 0) < 5:
                logger.warning("Insufficient data for training.")
                return None

            # Splitting into train/test
            X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

            # Oversampling with SMOTE for class balance
            smote = SMOTE(random_state=42)
            X_train_res, y_train_res = smote.fit_resample(X_train, y_train)

            # Training the model
            model = RandomForestClassifier(n_estimators=10000, random_state=42)
            model.fit(X_train_res, y_train_res)

            # Predictions and metrics
            y_pred = model.predict(X_test)
            logger.debug(f"Accuracy: {accuracy_score(y_test, y_pred)}")
            logger.debug(f"Classification Report:\n{classification_report(y_test, y_pred)}")

            # Saving the model
            dump(model, model_path)
            logger.info(f"Model saved to {model_path}")
            return model

        # Function for predicting entry probability
        def predict_entry_point(model, data_1h):
            data_1h = add_features(data_1h) # Adding features if not added
            features = [col for col in data_1h.columns if col not in ['target']]
            latest_data = data_1h[features].iloc[-1:]
            probability = model.predict_proba(latest_data)[:, 1][0]
            logger.debug(f"Probability of long entry point: {probability:.2%}")
            return probability

        # Main logic
        if data_1h.empty:
            logger.debug("DataFrame is empty, skipping training")
        else:
            model_path = os.path.join(application_path, f"models/crypto_model_1h_{pair.replace('-', '')}.joblib")
            if os.path.exists(model_path):
                model = load(model_path)
                logger.info(f"Model for pair: {pair.replace('-', '')} loaded.")
            else:
                model = train_model(data_1h, model_path)

            if model:
                prob = predict_entry_point(model, data_1h)
                if prob > 0.7:
                    logger.info(f"Found long entry point with probability: {round(prob * 100, 2)}%.")
                else:
                    logger.info(f"Long entry point for pair: {pair.replace('-', '')} not found. Probability: {round(prob * 100, 2)}%")
except Exception as error:
    logger.exception(error)