# bot.py
# Это криптовалютный торговый бот для спотового рынка Binance с использованием библиотеки ccxt.
# Бот запрашивает список спотовых пар, фильтрует по суточному объему (>= 10M USDT), ищет пары с положительным трендом на 1h свече,
# выбирает топ-пары по объему и силе тренда, затем проверяет просадку >2% от SMA3 на 1m графике.
# Если условие выполнено, входит в маркет-бай, устанавливает TP +1% и SL -3%.
# Управляет одновременными сделками на основе конфига или размера депозита.
# ВАЖНО: Это только для образовательных целей. Торговля сопряжена с рисками. Используйте на свой страх и риск.
# Вам нужно предоставить ваш API-ключ и секрет Binance.
# Запускайте бота в цикле или с планировщиком, например, time.sleep().
import ccxt # Библиотека для взаимодействия с крипто-биржами
import pandas as pd # Для манипуляции данными и расчета SMA
import time # Для пауз и тайминга
from datetime import datetime # Для логирования временных меток
import logging # Для логирования действий
# Настройка логирования
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# Конфигурационные настройки - настройте по необходимости
CONFIG = {
'api_key': 'YOUR_BINANCE_API_KEY', # Замените на ваш API-ключ Binance
'api_secret': 'YOUR_BINANCE_API_SECRET', # Замените на ваш секрет Binance
'test_mode': True, # Установите True для тестнета (sandbox), False для реальной торговли
'min_volume_usdt': 10_000_000, # Минимальный 24h объем в USDT для рассмотрения пары
'trend_lookback_hours': 4, # Количество 1h свечей для проверки положительного тренда (ср. % изменение > 0)
'min_trend_strength': 0.5, # Минимальное среднее % изменение за период для положительного тренда (в %)
'max_concurrent_trades': 5, # Максимальное количество пар для одновременной торговли (переопределяет на основе депозита, если >0)
'risk_per_trade_percent': 1.0, # % от общего баланса USDT на риск в одной сделке (если max_concurrent_trades=0)
'dip_threshold': 2.0, # % просадки ниже SMA3 на 1m для входа в сделку
'take_profit_percent': 1.0, # Фиксированный TP +% от цены входа
'stop_loss_percent': 3.0, # Фиксированный SL -% от цены входа
'poll_interval_seconds': 60, # Интервал проверки возможностей (каждую минуту)
'base_quote': 'USDT', # Фокус на парах с котировкой в USDT
}
# Инициализация соединения с биржей
def init_exchange():
logger.info("Инициализация соединения с Binance")
# Создание экземпляра ccxt.binance с API-credentials
exchange = ccxt.binance({
'apiKey': CONFIG['api_key'],
'secret': CONFIG['api_secret'],
'enableRateLimit': True, # Включение лимита запросов для избежания банов
})
# Установка в тестнет, если test_mode True
if CONFIG['test_mode']:
logger.info("Включен тестовый режим (sandbox)")
exchange.set_sandbox_mode(True) # Использование тестнета Binance для симуляции
else:
logger.info("Включен реальный торговый режим")
return exchange
# Получение всех спотовых пар с котировкой в USDT
def get_spot_usdt_pairs(exchange):
logger.info("Загрузка всех рынков с биржи")
markets = exchange.load_markets()
logger.info(f"Найдено {len(markets)} рынков")
# Фильтрация активных спотовых пар, заканчивающихся на /USDT
usdt_pairs = [symbol for symbol in markets if markets[symbol]['active'] and markets[symbol]['spot'] and symbol.endswith('/' + CONFIG['base_quote'])]
logger.info(f"Найдено {len(usdt_pairs)} спотовых пар с USDT")
return usdt_pairs
# Фильтрация пар по 24h объему >= min_volume_usdt
def filter_by_volume(exchange, pairs):
logger.info(f"Фильтрация {len(pairs)} пар по объему")
filtered_pairs = []
# Получение тикеров для всех пар (пакетный запрос для эффективности)
tickers = exchange.fetch_tickers(pairs)
logger.info("Тикеры получены")
for pair in pairs:
ticker = tickers.get(pair)
if ticker and 'quoteVolume' in ticker and ticker['quoteVolume'] >= CONFIG['min_volume_usdt']:
filtered_pairs.append(pair)
logger.info(f"После фильтрации по объему осталось {len(filtered_pairs)} пар")
return filtered_pairs
# Расчет силы положительного тренда на 1h свечах
# Сила тренда: среднее % изменение за последние N часов, должно быть > min_trend_strength
def get_trend_strength(exchange, pair):
logger.info(f"Расчет тренда для пары {pair}")
# Получение последних N+1 1h OHLCV свечей (open, high, low, close, volume)
ohlcv = exchange.fetch_ohlcv(pair, '1h', limit=CONFIG['trend_lookback_hours'] + 1)
if len(ohlcv) < CONFIG['trend_lookback_hours'] + 1:
logger.warning(f"Недостаточно данных для {pair}")
return 0 # Недостаточно данных
# Преобразование в DataFrame для удобного расчета
df = pd.DataFrame(ohlcv, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume'])
# Расчет % изменения для каждой свечи: (close - open) / open * 100
df['pct_change'] = (df['close'] - df['open']) / df['open'] * 100
# Среднее % изменение за последние N свечей
avg_change = df['pct_change'][-CONFIG['trend_lookback_hours']:].mean()
trend = avg_change if avg_change > 0 else 0 # Только положительные тренды
logger.info(f"Сила тренда для {pair}: {trend}")
return trend
# Получение пар с положительным трендом, отсортированных по объему и силе тренда
def get_top_trending_pairs(exchange, pairs):
logger.info(f"Поиск топ-трендовых пар среди {len(pairs)}")
pair_data = []
tickers = exchange.fetch_tickers(pairs) # Получение объемов снова или кэширование, если нужно
logger.info("Тикеры для трендовых пар получены")
for pair in pairs:
trend = get_trend_strength(exchange, pair)
if trend >= CONFIG['min_trend_strength']:
volume = tickers[pair]['quoteVolume']
pair_data.append({'pair': pair, 'trend': trend, 'volume': volume})
# Сортировка по объему, затем по тренду
pair_data.sort(key=lambda x: (x['volume'], x['trend']), reverse=True)
top_pairs = [p['pair'] for p in pair_data]
logger.info(f"Найдено {len(top_pairs)} топ-пар с положительным трендом")
return top_pairs
# Определение максимального количества сделок на основе конфига или депозита
def get_max_trades(exchange):
if CONFIG['max_concurrent_trades'] > 0:
max_trades = CONFIG['max_concurrent_trades']
logger.info(f"Максимум одновременных сделок из конфига: {max_trades}")
return max_trades
# Иначе, на основе депозита
balance = exchange.fetch_balance()
usdt_balance = balance['free'].get('USDT', 0)
logger.info(f"Свободный баланс USDT: {usdt_balance}")
# Предполагаем, что рискуем risk_per_trade_percent% на сделку, каждая сделка ~ размер входа на мин. ордер
# Для простоты: max_trades = int(100 / risk_per_trade_percent) # Например, 1% риск позволяет 100 сделок теоретически
max_trades = max(1, int(100 / CONFIG['risk_per_trade_percent']))
logger.info(f"Расчетное максимум одновременных сделок: {max_trades}")
return max_trades
# Проверка просадки на 1m графике: текущая цена < SMA3 * (1 - dip_threshold/100)
def check_dip(exchange, pair):
logger.info(f"Проверка просадки для {pair}")
# Получение последних 4 1m свечей (нужно 3 для SMA + текущая)
ohlcv = exchange.fetch_ohlcv(pair, '1m', limit=4)
if len(ohlcv) < 4:
logger.warning(f"Недостаточно данных для просадки {pair}")
return False
df = pd.DataFrame(ohlcv, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume'])
# SMA3 закрытий свечей 0,1,2 (исключая текущую)
sma3 = df['close'][:-1].mean() # Среднее последних 3 закрытий перед текущей
current_price = df['close'].iloc[-1]
dip_percent = (sma3 - current_price) / sma3 * 100
is_dip = dip_percent > CONFIG['dip_threshold']
logger.info(f"Просадка для {pair}: {dip_percent}%, условие: {is_dip}")
return is_dip
# Вход в сделку: маркет-бай, затем установка TP и SL как лимит-ордеров
def enter_trade(exchange, pair, amount):
logger.info(f"Вход в сделку для {pair} на сумму {amount}")
# Маркет-бай
order = exchange.create_market_buy_order(pair, amount)
entry_price = order['average'] # Средняя цена заполнения
logger.info(f"Входная цена: {entry_price}")
# Расчет цен TP и SL
tp_price = entry_price * (1 + CONFIG['take_profit_percent'] / 100)
sl_price = entry_price * (1 - CONFIG['stop_loss_percent'] / 100)
# Сумма для продажи в TP/SL (предполагая полную позицию)
position_amount = order['filled']
# Создание лимит-селл для TP
exchange.create_limit_sell_order(pair, position_amount, tp_price)
logger.info(f"Установлен TP на {tp_price}")
# Создание стоп-лимит-селл для SL (ccxt поддерживает стоп-ордера через params)
exchange.create_order(pair, 'stop_limit', 'sell', position_amount, sl_price, params={'stopPrice': sl_price})
logger.info(f"Установлен SL на {sl_price}")
print(f"{datetime.now()}: Вошли в сделку на {pair} по {entry_price}, TP {tp_price}, SL {sl_price}")
# Получение минимальной суммы ордера для пары и расчет на основе баланса
def get_trade_amount(exchange, pair, usdt_balance):
logger.info(f"Расчет суммы сделки для {pair}")
market = exchange.market(pair)
min_amount = market['limits']['amount']['min']
# Расчет на основе риска: но поскольку фиксированный SL%, риск = entry * (stop_loss_percent/100)
# Для простоты, используем долю баланса
portion_usdt = usdt_balance / get_max_trades(exchange) # Делим баланс поровну
ticker = exchange.fetch_ticker(pair)
price = ticker['last']
amount = portion_usdt / price
trade_amount = max(min_amount, amount) # Убедимся, что не меньше минимума
logger.info(f"Сумма сделки: {trade_amount}")
return trade_amount
# Основной цикл бота
def main():
exchange = init_exchange()
active_trades = {} # Отслеживание активных пар: {pair: entry_time}
while True:
try:
logger.info("Начало цикла проверки")
# Шаг 1: Получение всех спотовых USDT пар
all_pairs = get_spot_usdt_pairs(exchange)
# Шаг 2: Фильтрация по объему
volume_filtered = filter_by_volume(exchange, all_pairs)
# Шаг 3: Получение топ-трендовых пар
top_pairs = get_top_trending_pairs(exchange, volume_filtered)
# Шаг 4: Определение максимума сделок
max_trades = get_max_trades(exchange)
# Выбор топ-N пар, где N = max_trades - текущие активные
available_slots = max_trades - len(active_trades)
candidates = top_pairs[:available_slots]
logger.info(f"Доступно слотов: {available_slots}, кандидатов: {len(candidates)}")
# Шаг 5: Проверка просадки и вход в сделки
balance = exchange.fetch_balance()
usdt_free = balance['free']['USDT']
logger.info(f"Свободный USDT: {usdt_free}")
for pair in candidates:
if check_dip(exchange, pair):
amount = get_trade_amount(exchange, pair, usdt_free)
enter_trade(exchange, pair, amount)
active_trades[pair] = time.time()
usdt_free -= amount * exchange.fetch_ticker(pair)['last'] # Обновление доступного баланса приблизительно
logger.info(f"Активные сделки: {len(active_trades)}")
# Шаг 6: Проверка активных сделок (опционально: закрытие если нужно, но здесь полагаемся на TP/SL)
# Упрощенно: удаление после времени или проверка баланса
time.sleep(CONFIG['poll_interval_seconds'])
logger.info("Конец цикла, пауза")
except Exception as e:
logger.error(f"Ошибка: {e}")
time.sleep(60) # Пауза при ошибке
if __name__ == "__main__":
main()