import logging # For logging messages
import configparser # For reading configuration file
import os # For working with file system
import pandas as pd # For working with data in tabular format
import numpy as np # For numerical operations
import sys # For system parameters
import time # For delays
from joblib import dump, load # For saving/loading models
from collections import Counter # For counting classes
from imblearn.over_sampling import SMOTE # For oversampling classes
from sklearn.model_selection import train_test_split # For splitting data
from sklearn.ensemble import RandomForestClassifier # Random Forest model
from sklearn.metrics import accuracy_score, classification_report # Evaluation metrics
import ccxt # Library for interacting with exchanges
# Setting up logging to output messages to console and file
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
def get_application_path():
"""Determines the path to the executable file or script."""
if getattr(sys, 'frozen', False): # If running as .exe
return os.path.dirname(sys.executable)
elif '__file__' in globals(): # If running as .py script
return os.path.dirname(os.path.abspath(__file__))
else: # If running in interpreter (REPL, Jupyter)
return os.getcwd()
application_path = get_application_path()
print("Application path:", application_path)
# Reading configuration file settings.ini
config = configparser.ConfigParser()
config.read(os.path.join(application_path, "settings.ini"), encoding="utf-8")
try:
# Initializing Binance exchange via ccxt (assuming API keys in config if needed)
binance = ccxt.binance({
'apiKey': config.get('BINANCE', 'api_key', fallback=''), # If keys are needed
'secret': config.get('BINANCE', 'secret', fallback=''),
})
# Example pair, taken from config or argument (replace with real one)
pair = config.get('SETTINGS_PAIR', 'pair', fallback='BTC-USDT') # For example, 'BTC-USDT'
if binance.has['fetchOHLCV']:
logger.debug(f"binance 1h {pair}")
logger.debug(f"ms {int(binance.milliseconds())}")
# Path to the model
model_path = os.path.join(application_path, f"models/crypto_model_1h_{pair.replace('-', '')}.joblib")
if os.path.exists(model_path):
count_day = int(config['SETTINGS_PAIR']['count_day'])
else:
count_day = 0
logger.info(f"Model for pair {pair.replace('-', '')} not found! Fetching candles from the start of trading.")
# Calculating start date for data loading
if count_day != 0:
since = int(binance.milliseconds()) - 86400000 * count_day # -N days from current time
else:
since = 0 # From the start of trading
klines = [] # List to store candles
# Loop to load OHLCV data in batches of 1000 candles
while since < int(binance.milliseconds()):
orders = binance.fetch_ohlcv(
symbol=pair.replace('-', '/'),
since=since,
limit=1000,
timeframe='1h'
)
time.sleep(binance.rateLimit / 1000) # Delay based on exchange rate limit
if len(orders):
since = orders[-1][0] + 1 # Update since to the next candle
klines += orders
else:
break
# Function to convert raw data to DataFrame
def fetch_ohlcv(klines):
df = pd.DataFrame(klines, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume'])
return df
data_1h = fetch_ohlcv(klines)
data_1h['timestamp'] = pd.to_datetime(data_1h['timestamp'], unit='ms') # Convert timestamp to datetime
data_1h.set_index('timestamp', inplace=True) # Set timestamp as index
logger.debug(f"data_df {data_1h}")
# Function to add technical indicators and features
def add_features(data_1h):
if data_1h.empty:
logger.debug("DataFrame is empty. Check data source.")
return data_1h
# Calculating candle patterns in percentages
data_1h['lower_shadow'] = (data_1h['open'] - data_1h['low']) / data_1h['open'] * 100
data_1h['upper_shadow'] = (data_1h['high'] - data_1h['close']) / data_1h['open'] * 100
data_1h['body_size'] = abs(data_1h['close'] - data_1h['open']) / data_1h['open'] * 100
data_1h['volatility'] = (data_1h['high'] - data_1h['low']) / data_1h['open'] * 100
data_1h['volume_change'] = data_1h['volume'].pct_change().replace([np.inf, -np.inf], np.nan).fillna(0)
# Adding lagged features for shadows and volatility
for lag in range(1, 6):
data_1h[f'lag_{lag}_shadow'] = data_1h['lower_shadow'].shift(lag)
data_1h[f'lag_{lag}_volatility'] = data_1h['volatility'].shift(lag)
# Rate of Change (ROC) for 3 periods
data_1h['roc_3'] = data_1h['close'].pct_change(periods=3) * 100
# Function to calculate RSI
def calculate_rsi(series, period=14):
delta = series.diff()
gain = np.where(delta > 0, delta, 0)
loss = np.where(delta < 0, -delta, 0)
avg_gain = pd.Series(gain).rolling(window=period).mean()
avg_loss = pd.Series(loss).rolling(window=period).mean()
rs = avg_gain / avg_loss
rsi = 100 - (100 / (1 + rs))
return rsi
data_1h['rsi_3'] = calculate_rsi(data_1h['close'], period=3)
# Lagged price differences
for lag in range(1, 6):
data_1h[f'diff_lag_{lag}'] = data_1h['close'] - data_1h['close'].shift(lag)
# Volume clusters
data_1h['volume_cluster'] = data_1h['volume'] / data_1h['volume'].rolling(window=3).mean()
# Rolling statistics
data_1h['rolling_max'] = data_1h['high'].rolling(window=3).max()
data_1h['rolling_min'] = data_1h['low'].rolling(window=3).min()
data_1h['rolling_std'] = data_1h['close'].rolling(window=3).std()
# Trend indicators (SMA, EMA)
data_1h['sma_50'] = data_1h['close'].rolling(window=50).mean()
data_1h['sma_200'] = data_1h['close'].rolling(window=200).mean()
data_1h['ema_14'] = data_1h['close'].ewm(span=14, adjust=False).mean()
data_1h['price_sma50_diff'] = data_1h['close'] - data_1h['sma_50']
data_1h['price_sma200_diff'] = data_1h['close'] - data_1h['sma_200']
data_1h['price_ema_ratio'] = data_1h['close'] / data_1h['ema_14']
# Volatility indicators (ATR)
data_1h['atr'] = (data_1h['high'] - data_1h['low']).rolling(window=6).mean()
data_1h['volatility_ratio'] = data_1h['volatility'] / data_1h['atr'] # Improved: ratio to ATR instead of close
# Volume features
data_1h['volume_sma'] = data_1h['volume'].rolling(window=3).mean()
data_1h['volume_ratio'] = data_1h['volume'] / data_1h['volume_sma']
# Lagged differences
data_1h['price_diff_1'] = data_1h['close'] - data_1h['close'].shift(1)
data_1h['volume_diff_1'] = data_1h['volume'] - data_1h['volume'].shift(1)
# Logging columns and data
column_names = data_1h.columns.tolist()
logger.debug(f"Headers data_1h 1h: {column_names}")
logger.debug(f"Features data_1h 1h: {data_1h}")
# Handling NaN: forward fill, then zeros
data_1h.fillna(method='ffill', inplace=True)
data_1h.fillna(0, inplace=True)
return data_1h
# Function for training the model
def train_model(data_1h, model_path):
data_1h = add_features(data_1h)
# Defining target: price increase over 3 candles forward (sustained trend)
data_1h['target'] = ((data_1h['close'] > data_1h['close'].shift(1)) &
(data_1h['close'].shift(-1) > data_1h['close']) &
(data_1h['close'].shift(-2) > data_1h['close'])).astype(int)
# Dropping rows with NaN in target (due to shift(-1/-2))
data_1h.dropna(subset=['target'], inplace=True)
features = [col for col in data_1h.columns if col not in ['target']]
X = data_1h[features]
y = data_1h['target']
logger.debug(f"Class distribution: {Counter(y)}")
# Checking for sufficient data
if len(Counter(y)) < 2 or Counter(y).get(1, 0) < 5:
logger.warning("Insufficient data for training.")
return None
# Splitting into train/test
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
# Oversampling with SMOTE for class balance
smote = SMOTE(random_state=42)
X_train_res, y_train_res = smote.fit_resample(X_train, y_train)
# Training the model
model = RandomForestClassifier(n_estimators=10000, random_state=42)
model.fit(X_train_res, y_train_res)
# Predictions and metrics
y_pred = model.predict(X_test)
logger.debug(f"Accuracy: {accuracy_score(y_test, y_pred)}")
logger.debug(f"Classification Report:\n{classification_report(y_test, y_pred)}")
# Saving the model
dump(model, model_path)
logger.info(f"Model saved to {model_path}")
return model
# Function for predicting entry probability
def predict_entry_point(model, data_1h):
data_1h = add_features(data_1h) # Adding features if not added
features = [col for col in data_1h.columns if col not in ['target']]
latest_data = data_1h[features].iloc[-1:]
probability = model.predict_proba(latest_data)[:, 1][0]
logger.debug(f"Probability of long entry point: {probability:.2%}")
return probability
# Main logic
if data_1h.empty:
logger.debug("DataFrame is empty, skipping training")
else:
model_path = os.path.join(application_path, f"models/crypto_model_1h_{pair.replace('-', '')}.joblib")
if os.path.exists(model_path):
model = load(model_path)
logger.info(f"Model for pair: {pair.replace('-', '')} loaded.")
else:
model = train_model(data_1h, model_path)
if model:
prob = predict_entry_point(model, data_1h)
if prob > 0.7:
logger.info(f"Found long entry point with probability: {round(prob * 100, 2)}%.")
else:
logger.info(f"Long entry point for pair: {pair.replace('-', '')} not found. Probability: {round(prob * 100, 2)}%")
except Exception as error:
logger.exception(error)