Files
TradingBot-with-BybitAPI/src/main.py
2025-04-29 19:32:43 +03:00

163 lines
5.7 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import asyncio
from aiogram import Bot, Dispatcher
from aiogram.filters import Command
from aiogram.types import Message
from aiogram.client.default import DefaultBotProperties
from aiogram.enums import ParseMode
from aiogram.types import BotCommand, BotCommandScopeDefault
from aiogram.fsm.context import FSMContext
from aiogram import Router, F
from aiogram.fsm.state import State, StatesGroup
from aiogram.fsm.storage.memory import MemoryStorage
import bybit
import jsonProcessing
import credentials
import strings
import options
# Входные поля для трейдинг-бота:
# Верхняя граница ордеров
# Нижняя граница ордеров
# Верхняя граница для брейка
# Нижняя граница для брейка
# Количество уровней сетки
# Дельта для тейка
# Дельта для стопа
# Размер позиции на каждом уровне
async def set_commands():
commands = [BotCommand(command='start', description='Старт'),
BotCommand(command='help', description='Инструкция'),
BotCommand(command='strategy', description='Запустить стратегию'),
BotCommand(command='stop', description='Остановить стратегию')
]
await bot.set_my_commands(commands, BotCommandScopeDefault())
class startForm(StatesGroup):
pair = State()
params = State()
class stopForm(StatesGroup):
pair = State()
dp = Dispatcher(storage=MemoryStorage())
bot = Bot(
token=credentials.bot_token,
default=DefaultBotProperties(parse_mode=ParseMode.HTML),
)
strategy_router = Router()
stop_router = Router()
@dp.message(Command("start"))
async def commandStart(message: Message) -> None:
print("Called function commandStart")
await message.answer(strings.startCommand)
@dp.message(Command("help"))
async def commandHelp(message: Message) -> None:
print("Called function commandHelp")
await message.answer(strings.helpCommand)
@strategy_router.message(Command("strategy"))
async def commandStrategy(message: Message, state: FSMContext):
print("Called function commandStrategy")
await message.answer(strings.strategyCommand + '\n' + strings.askPair)
await state.set_state(startForm.pair)
@strategy_router.message(F.text, startForm.pair)
async def capture_start_pair(message: Message, state: FSMContext):
print("Called function capture_start_pair")
await state.update_data(pair=message.text)
data = await state.get_data()
t = 0
if await jsonProcessing.checkPair(data.get("pair")) == 1:
msg_text = (f'Стратегия на паре <b>{data.get("pair")}</b> уже запущена.\nПожалуйста остановите стратегию либо введите другую пару.')
t = 1
else:
msg_text = strings.askParams
await message.answer(msg_text)
if t == 1:
print('Clearing state!')
await state.clear()
else:
await state.set_state(startForm.params)
@strategy_router.message(F.text, startForm.params)
async def capture_params(message: Message, state: FSMContext):
print("Called function capture_params")
await state.update_data(params=message.text)
data = await state.get_data()
t = await jsonProcessing.savePairParams(pair=data.get("pair"), params=data.get("params"))
if t == 0:
client = await bybit.getClient(credentials.api_key, credentials.api_secret, options.testnet)
if client == -1:
msg_text = (f'Аутентификация не удалась, сообщите администратору если увидете данное сообщение.')
else:
asyncio.create_task(bybit.socketStrategy(data.get("pair"), data.get("params")))
msg_text = (f'Вы запустили стратегию на паре <b>{data.get("pair")}</b> с данными параметрами:\n<b>{data.get("params")}</b>\n')
elif t == -1:
msg_text = (f'Параметры введены в неверном формате, пожалуйста начните заново.')
elif t == -2:
msg_text = (f'Стратегия на паре <b>{data.get("pair")}</b> уже запущена.')
else:
msg_text = (f'Возникла непредвиденная ошибка. =(')
await message.answer(msg_text)
await state.clear()
@stop_router.message(Command("stop"))
async def commandStop(message: Message, state: FSMContext):
print("Called function commandStop")
await message.answer(strings.stopCommand + '\n' + strings.askPair)
await state.set_state(stopForm.pair)
@stop_router.message(F.text, stopForm.pair)
async def capture_stop_pair(message: Message, state: FSMContext):
print("Called function capture_stop_pair")
await state.update_data(pair=message.text)
data = await state.get_data()
if await jsonProcessing.checkPair(data.get("pair")) == 1:
t = await jsonProcessing.deletePair(data.get("pair"))
if t == 0:
print('Deleted pair succesfuly')
else:
print('Error with deleting pair')
msg_text = strings.stopStrategy
else:
msg_text = strings.pairNotFound
await message.answer(msg_text)
await state.clear()
async def start_bot():
await set_commands()
async def main() -> None:
dp.include_router(strategy_router)
dp.include_router(stop_router)
dp.startup.register(start_bot)
await dp.start_polling(bot)
# Main
if __name__ == "__main__":
print('Started bot!')
asyncio.run(main())