Pre release #1

Merged
Beesquit merged 5 commits from dev into main 2025-05-19 14:27:37 +03:00
15 changed files with 645 additions and 180 deletions

View File

@ -0,0 +1,48 @@
name: Build and Push Docker Image
on:
release:
types: [published]
env:
REGISTRY: git.frik.su
IMAGE_NAME: ${{ gitea.repository }}
jobs:
build-and-push:
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v3
- name: Install Docker
run: curl -fsSL https://get.docker.com | sh
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v2
- name: Login to registry
uses: docker/login-action@v2
with:
registry: ${{ env.REGISTRY }}
username: ${{ secrets.DOCKER_USERNAME }}
password: ${{ secrets.DOCKER_PASSWORD }}
- name: Extract Docker tags from release
id: meta
uses: docker/metadata-action@v4
with:
images: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}
tags: |
type=ref,event=tag
type=semver,pattern={{version}}
type=semver,pattern={{major}}.{{minor}}
type=semver,pattern={{major}}
- name: Build and push Docker image
uses: docker/build-push-action@v4
with:
context: .
push: true
tags: ${{ steps.meta.outputs.tags }}
labels: ${{ steps.meta.outputs.labels }}

7
.gitignore vendored
View File

@ -1,6 +1,11 @@
.venv
src/__pycache__
src/test.py
src/credentials.py
exampleData.py
data.json
data_backup_*.json
botlog.log
src/test2.py
tradingLog.log
generalLog.log
src/.env

4
README.md Normal file
View File

@ -0,0 +1,4 @@
# Side Strategy Bybit Bot
This is a simple semi-automatic trading bot working with Bybit API and written in Python.
The strategy is based on the side moving of the token which crosses user input levels. On the level cross long and short orders are opened.
### For the install and setup guide look at the `setup.md`

25
setup.md Normal file
View File

@ -0,0 +1,25 @@
# Краткий гайд по установке и настройке бота.
Бот устанавливается с помощью **docker compose** файла и **git pull** (предварительно) и последующей лёгкой настройки через файлы.
Данный "гайд" расчитан на **linux** и **docker**, однако вы можете развернуть данное приложение/бота на любой системе с *Python 3*.
### Для установки через `docker compose`:
1) Создайте директорию для бота
2) Введите команду:
```
git clone
```
### После базовой развёртки выполните следующие шаги:
1) Создайте data.json файл и заполните его данным содержимым:
```> {}```
2) Создайте файл `credentials.py` в `src/` и заполните его по аналогии с `cretentialsExample.py`
*Рекомендуется использовать суб аккаунт Bybit для бота*
3) Настройте options.py файл на ваше усмотрение (обратите особо внимание на параметр `testnet`)
4) Погадайте на молочной гуще и сделайте ***4*** круга с пчелиным ульем вокруг сервера
5) Всё готово к запуску, наслаждайтесь!
### Для запуска бота введите следующую команду:
```
docker compose up -d
```
*По надобности используйте `sudo` перед командой*
6) На платформе **Bybit** включите режим хеджирования на все пары, которыми планируете торговать
Спасибо что заглянули, желаем удачной настройки и стабильной работы!

View File

@ -1,78 +1,30 @@
import json
import options
from random import randint
async def parseParams(params):
paramsList = params.split()
paramsDict = {}
for i in range(len(options.paramsLines)):
paramsDict[options.paramsLines[i]] = paramsList[i]
return paramsDict
async def getLevels(amount, highPrice, lowPrice, roundDecimals):
# Returns array of prices from low to high for each level
levels = []
delta = (highPrice - lowPrice)/amount
async def toDictPairParams(pair: str, params):
paramsList = params.split()
if len(paramsList) != len(options.paramsLines):
return -1
levelPrice = lowPrice
for i in range(amount - 1):
levels.append(levelPrice)
levelPrice = round(levelPrice + delta, roundDecimals)
levels.append(highPrice)
return levels
paramsDict = {pair: {}}
for i in range(len(options.paramsLines)):
paramsDict[pair][options.paramsLines[i]] = paramsList[i]
return paramsDict
def floor(value, decimals):
# Rounds float to the lower side with the decimals given
factor = 1/(10**decimals)
return (value//factor)*factor
async def checkPair(pair: str):
currentData = {}
try:
with open('data.json', 'r') as f:
currentData = json.load(f)
except json.decoder.JSONDecodeError as e:
print('WARNING: JSON file is empty! Ignore if your installation is fresh.')
def countDecimals(value):
# Counts decimals in a float
decimals = len(str(value).split('.')[-1])
return decimals
if pair in currentData:
print(pair, ' exists in data file.')
return 1
else:
print(pair, ' not found in data file.')
return 0
async def deletePair(pair: str):
currentData = {}
try:
with open('data.json', 'r') as f:
currentData = json.load(f)
except json.decoder.JSONDecodeError as e:
print('WARNING: JSON file is empty! Ignore if your installation is fresh.')
if pair in currentData:
print(pair, ' exists in data file.')
del currentData[pair]
with open('data.json', 'w', encoding = 'utf-8') as f:
json.dump(currentData, f, ensure_ascii = False, indent = 4)
return 0
else:
print(pair, ' not found in data file.')
return -1
async def mainWrapper(pair: str, params):
newData = await toDictPairParams(pair, params)
if newData == -1:
return -1
currentData = {}
try:
with open('data.json', 'r') as f:
currentData = json.load(f)
except json.decoder.JSONDecodeError as e:
print('WARNING: JSON file is empty! Ignore if your installation is fresh.')
if pair in currentData:
print(pair, ' already exists.')
return -2
else:
with open('data.json', 'w', encoding = 'utf-8') as f:
currentData.update(newData)
json.dump(currentData, f, ensure_ascii = False, indent = 4)
print(pair, ' was added!')
return 0
def getArbus():
# Returnes n arbus (n is random 1 byte positive number)
return ("arbus"*randint(0, 255))

View File

@ -1,75 +1,288 @@
import time
import traceback
import asyncio
from pybit.unified_trading import HTTP
from pybit.unified_trading import WebSocket
import options
import credentials
import jsonProcessing
import arbus
from logger import generalLogger
from logger import tradingLogger
async def getClient(apiKey, apiSecret, testnet):
if testnet:
print('Using testnet API.')
else:
print('Using real API.')
class tradingData:
def __init__(self, pair, levels, highBreak, lowBreak, takeDelta, stopDelta, orderSize):
self.client = HTTP(
testnet = options.testnet,
demo = options.demoTrading,
api_key = credentials.api_key,
api_secret = credentials.api_secret
)
self.pair = pair
self.balance = self.getBalance(pair)
self.levels = levels
self.highBreak = highBreak
self.lowBreak = lowBreak
self.takeDelta = takeDelta
self.stopDelta = stopDelta
self.orderSize = orderSize
self.priceDecimals, self.qtyDecimals, self.minimumQty = self.getFilters(pair)
self.previousPrice = -1
self.counter = 0
def getBalance(self, pair):
coin = pair[:-4]
response = self.client.get_wallet_balance(
accountType = 'UNIFIED',
coin = coin
)
balance = float(response['result']['list'][0]['totalAvailableBalance'])
return balance
def getFilters(self, pair):
instrumentInfo = self.client.get_instruments_info(
symbol = pair,
category = "linear"
)
infoContents = instrumentInfo.get('result').get('list')[0]
minimumQty = float(infoContents.get('lotSizeFilter').get('minOrderQty'))
qtyDecimals = arbus.countDecimals(minimumQty)
priceDecimals = int(infoContents.get('priceScale'))
return priceDecimals, qtyDecimals, minimumQty
def close(self):
jsonProcessing.deletePair(self.pair)
generalLogger.info(f"Closing strategy on {self.pair}")
def checkCloseConditions(self, markPrice):
# If the price is outside of the (lowBreak; highBreak) interval then stop strategy
if not (self.lowBreak < markPrice < self.highBreak):
self.close()
def checkOrderConditions(self, markPrice):
for i in self.levels:
levelPrice = i.get('price')
# If level gets crossed from below or from above then go on
# Can be split for different sides (long or short)
if self.previousPrice <= levelPrice <= markPrice or \
self.previousPrice >= levelPrice >= markPrice:
if i.get('long') == False:
id = self.placeCombiOrder(
True,
markPrice,
'Buy',
levelPrice+self.takeDelta,
levelPrice-self.stopDelta,
)
i['long'] = True
i['longIDs'][0] = id
if i.get('short') == False:
id = self.placeCombiOrder(
True,
markPrice,
'Sell',
levelPrice-self.takeDelta,
levelPrice+self.stopDelta,
)
i['short'] = True
i['shortIDs'][0] = id
def placeCombiOrder(self, useQuoteSymbol, markPrice, side, tp, sl):
positionIdx = 1
if side == 'Buy':
positionIdx = 1
if side == 'Sell':
positionIdx = 2
qty = self.orderSize
if useQuoteSymbol:
qty = arbus.floor(self.orderSize/markPrice, self.qtyDecimals)
orderID = '-1'
if qty >= self.minimumQty and qty < self.balance:
response = self.client.place_order(
category = "linear",
symbol = self.pair,
side = side,
orderType = "Market",
qty = str(qty),
isLeverage = 1,
positionIdx = positionIdx,
takeProfit = str(tp),
stopLoss = str(sl),
tpslMode = "Full"
)
orderID = response.get('result').get('orderId')
generalLogger.info(f"Placed oder on {self.pair} with TP {tp}; SL {sl}")
tradingLogger.info(f"Placed oder on {self.pair} with TP {tp}; SL {sl}")
else:
generalLogger.warning(f"Failed to place order on {self.pair}; qty is too small!")
return orderID
def handlePrice(self, message):
try:
markPrice = float(message.get('data')['markPrice'])
self.checkCloseConditions(markPrice)
if self.previousPrice != -1:
self.checkOrderConditions(markPrice)
self.previousPrice = markPrice
except Exception as e:
generalLogger.error(e)
for line in traceback.format_exception(e):
generalLogger.error(line)
def handlePositionInfo(self, message):
data = message.get('data')
# Usually the 3-order response means SL + market + TP orders were placed.
if len(data) == 3:
orderType = []
orderIDs = []
mainOrderID = ''
f = 0
# Fill order types accordingly to the ids
for i in data:
orderType.append(i['stopOrderType'])
orderIDs.append(i['orderId'])
author = i['createType']
# Verifying it was the TP/SL order placement
if i['stopOrderType'] == '' and author == 'CreateByUser':
mainOrderID = i['orderID']
f = 1
if f:
for i in self.levels:
if mainOrderID == i.get('longIDs')[0]:
for j in range(3):
if orderType[j] == 'StopLoss':
i['longIDs'][1] = orderIDs[j]
if orderType[j] == 'TakeProfit':
i['longIDs'][2] = orderIDs[j]
else:
for i in data:
orderID = i['orderId']
orderStatus = i['orderStatus']
if orderStatus == 'Triggered':
for j in self.levels:
longIDs = j['longIDs']
shortIDs = j['shortIDs']
if orderID in longIDs:
j['long'] = False
j['longIDs'] = ['-1', '-1', '-1']
generalLogger.info(f"Long order on {self.pair} level {j['price']} triggered TP/SL")
if orderID in shortIDs:
j['short'] = False
j['shortIDs'] = ['-1', '-1', '-1']
generalLogger.info(f"Short order on {self.pair} level {j['price']} triggered TP/SL")
if orderStatus == 'Filled':
for j in self.levels:
longIDs = j['longIDs']
shortIDs = j['shortIDs']
if orderID in longIDs:
generalLogger.info(f"Long order on {self.pair} level {j['price']} filled with P&L {i['closedPnl']} and qty {i['qty']}")
tradingLogger.info(f"Long order on {self.pair} level {j['price']} filled with P&L {i['closedPnl']} and qty {i['qty']}")
if orderID in shortIDs:
tradingLogger.info(f"Short order on {self.pair} level {j['price']} filled with P&L {i['closedPnl']} and qty {i['qty']}")
async def getClient(apiKey, apiSecret, testnet, demoTrading):
client = HTTP(
testnet = testnet,
demo = demoTrading,
api_key = apiKey,
api_secret = apiSecret,
)
try:
response = client.get_account_info()
print('Auth succesful!')
print('Account info:', response.get('retMsg'))
generalLogger.info("Got client from getClient")
generalLogger.info(f"Account info: {response.get('retMsg')}")
return client
except Exception as e:
print('Auth failed! Check API key!')
print('Error:', e)
generalLogger.warning("Auth failed! Check API key or internet connection!")
return -1
async def strategy(client: HTTP, pair: str, params):
startTime = time.time()
print('Starting strategy with ', pair)
paramsDict = await arbus.parseParams(params)
async def strategy(pair: str, params):
generalLogger.info('Starting strategy with ' + pair)
paramsDict = await jsonProcessing.parseParams(params)
client = HTTP(testnet=True)
levelsAmount = int(paramsDict['netLevelsAmount'])
highEnd = float(paramsDict['highEnd'])
lowEnd = float(paramsDict['lowEnd'])
instrumentInfo = client.get_instruments_info(
symbol = pair,
category = "linear"
)
infoContents = instrumentInfo.get('result').get('list')[0]
priceDecimals = int(infoContents.get('priceScale'))
# Levels have 3 IDs for both long and short position. The 1 is main order (market opening), 2 is SL and 3 is TP.
levelsRaw = await arbus.getLevels(levelsAmount, highEnd, lowEnd, priceDecimals)
levels = []
for i in range(levelsAmount):
levels.append({'price':levelsRaw[i], 'long':False, 'longIDs':['-1', '-1', '-1'], 'short':False, 'shortIDs':['-1', '-1', '-1']})
td = tradingData(
pair,
levels,
float(paramsDict['highBreak']),
float(paramsDict['lowBreak']),
float(paramsDict['takeDelta']),
float(paramsDict['stopDelta']),
float(paramsDict['orderSize'])
)
ws = WebSocket(
testnet = options.testnet,
channel_type = 'linear',
)
wsPrivate = WebSocket(
testnet = options.testnet,
demo = options.demoTrading,
channel_type = "private",
api_key = credentials.api_key,
api_secret = credentials.api_secret,
)
generalLogger.info(f"Websocket connection state: {ws.is_connected()} (for {pair})")
ws.ticker_stream(
symbol = pair,
callback = td.handlePrice
)
wsPrivate.order_stream(
callback = td.handlePositionInfo
)
i = 0
t = await arbus.checkPair(pair)
t = await jsonProcessing.checkPair(pair)
while t:
t = await arbus.checkPair(pair)
t = await jsonProcessing.checkPair(pair)
if t != 1:
generalLogger.info("Closing websockets for {pair}")
ws.exit()
wsPrivate.exit()
break
# client = getClient(credentials.api_key, credentials.api_secret, options.testnet)
r1 = client.get_order_history(
category=options.category,
symbol=pair
)
print(r1, '\n')
r2 = client.place_order(
category = options.category,
symbol = pair,
side = 'BUY',
orderType = 'Market',
qty = paramsDict['orderSize'],
marketUnit = 'quoteCoin'
)
print(r2, '\n')
if r2['retMsg'] == 'OK':
print('Order placed succesfully!')
await asyncio.sleep(20)
await asyncio.sleep(options.loopSleepTime)
i += 1
print('Ending strategy with ', pair)
print('Ended on the iteration number ', i)
generalLogger.info(f"Ending strategy with {pair}; Ended on the iteration number {i}")
return i

11
src/credentials.py Normal file
View File

@ -0,0 +1,11 @@
from decouple import config
# Bybit
api_key = config('API_KEY', default='')
api_secret = config('API_SECRET', default='')
# Telegram
bot_token = config('BOT_TOKEN', default='')

View File

@ -1,9 +0,0 @@
# Bybit
api_key = "..."
api_secret = "..."
# Telegram
bot_token = "..."

112
src/jsonProcessing.py Normal file
View File

@ -0,0 +1,112 @@
import json
import os
import shutil
from datetime import datetime
from logger import generalLogger
import options
def startUp():
filePath = 'data.json'
if os.path.exists(filePath):
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
backupPath = (f'data_backup_{timestamp}.json')
shutil.copy(filePath, backupPath)
generalLogger.info(f'JSON backup was created: {backupPath}')
with open(filePath, 'w') as f:
json.dump({}, f, ensure_ascii = False, indent=4)
generalLogger.info(f'New {filePath} created with empty JSON.')
async def parseParams(params):
# Returnes dictionary of string params as paramsLines in options
paramsList = params.split()
paramsDict = {}
for i in range(len(options.paramsLines)):
paramsDict[options.paramsLines[i]] = paramsList[i]
return paramsDict
async def toDictPairParams(pair: str, params):
# Returnes dictionary as pair:internal(params)
paramsList = params.split()
if len(paramsList) != len(options.paramsLines):
return -1
paramsDict = {pair: {}}
for i in range(len(options.paramsLines)):
paramsDict[pair][options.paramsLines[i]] = paramsList[i]
return paramsDict
async def checkPair(pair: str):
# Returnes 1 if pair exists and 0 if not
currentData = {}
try:
with open('data.json', 'r') as f:
currentData = json.load(f)
except json.decoder.JSONDecodeError as e:
generalLogger.info('WARNING: JSON file is empty! Ignore if your installation is fresh.')
if pair in currentData:
return 1
else:
return 0
async def deletePair(pair: str):
# Returnes 0 if deleted successfully and -1 if not
currentData = {}
try:
with open('data.json', 'r') as f:
currentData = json.load(f)
except json.decoder.JSONDecodeError as e:
generalLogger.info('WARNING: JSON file is empty! Ignore if your installation is fresh.')
if pair in currentData:
del currentData[pair]
with open('data.json', 'w', encoding = 'utf-8') as f:
json.dump(currentData, f, ensure_ascii = False, indent = 4)
generalLogger.info(f'Pair {pair} was deleted successfully!')
return 0
else:
generalLogger.info(f'Pair {pair} was not found in the data file when trying to delete it.')
return -1
async def savePairParams(pair: str, params):
# Saves or updates data in JSON
newData = await toDictPairParams(pair, params)
if newData == -1:
return -1
currentData = {}
try:
with open('data.json', 'r') as f:
currentData = json.load(f)
except json.decoder.JSONDecodeError as e:
generalLogger.info('WARNING: JSON file is empty! Ignore if your installation is fresh.')
if pair in currentData:
generalLogger.info(f"Pair {pair} already exists.")
return -2
else:
with open('data.json', 'w', encoding = 'utf-8') as f:
currentData.update(newData)
json.dump(currentData, f, ensure_ascii = False, indent = 4)
generalLogger.info(f"Pair {pair} was added!")
return 0
async def loadJson():
# Returnes the contents of the JSON file as a dictionary
data = {}
try:
with open('data.json', 'r') as f:
data = json.load(f)
except json.decoder.JSONDecodeError as e:
generalLogger.info('WARNING: JSON file is empty! Ignore if your installation is fresh.')
return data

37
src/logger.py Normal file
View File

@ -0,0 +1,37 @@
import logging
import sys
import options
generalLogPath = "./generalLog.log"
tradingLogPath = "./tradingLog.log"
def setupLogger(name, level, logPath, formatter):
logger = logging.getLogger(name)
logger.setLevel(level)
streamHandler = logging.StreamHandler(sys.stdout)
fileHandler = logging.FileHandler(logPath)
streamHandler.setFormatter(formatter)
fileHandler.setFormatter(formatter)
logger.addHandler(streamHandler)
logger.addHandler(fileHandler)
return logger
# Основной лог
generalFormatter = logging.Formatter('%(asctime)s - %(module)s - %(levelname)s - %(message)s')
generalLogger = setupLogger('general', logging.INFO, generalLogPath, generalFormatter)
# Торговый лог (ордера)
tradingFormatter = logging.Formatter('%(asctime)s - %(message)s')
tradingLogger = setupLogger('trade', logging.NOTSET, tradingLogPath, tradingFormatter)
# Максимально подробный общий лог
if options.showExtraDebugLogs:
logging.basicConfig(level=logging.DEBUG)
superGeneralLogger = logging.getLogger('superGeneral')

View File

@ -12,28 +12,22 @@ from aiogram import Router, F
from aiogram.fsm.state import State, StatesGroup
from aiogram.fsm.storage.memory import MemoryStorage
from logger import generalLogger
from logger import tradingLogger
import bybit
import arbus
import jsonProcessing
import credentials
import whitelist
import strings
import options
# Входные поля для трейдинг-бота:
# Верхняя граница ордеров
# Нижняя граница ордеров
# Верхняя граница для брейка
# Нижняя граница для брейка
# Количество уровней сетки
# Дельта для тейка
# Дельта для стопа
# Размер позиции на каждом уровне
async def set_commands():
commands = [BotCommand(command='start', description='Старт'),
BotCommand(command='help', description='Инструкция'),
BotCommand(command='info', description='Статус'),
BotCommand(command='strategy', description='Запустить стратегию'),
BotCommand(command='stop', description='Остановить стратегию')
]
@ -60,88 +54,101 @@ stop_router = Router()
@dp.message(Command("start"))
async def commandStart(message: Message) -> None:
print("Called function commandStart")
# print(whitelist.chatIDs)
# id = message.from_user.id
# print('Got message from', id, ' with type ', type(id))
await message.answer(strings.startCommand)
@dp.message(Command("help"))
@dp.message(Command("help"), F.chat.id.in_(whitelist.chatIDs))
async def commandHelp(message: Message) -> None:
print("Called function commandHelp")
await message.answer(strings.helpCommand)
@dp.message(Command("info"), F.chat.id.in_(whitelist.chatIDs))
async def commandInfo(message: Message) -> None:
data = await jsonProcessing.loadJson()
msgText = strings.foundData
if data == {}:
msgText = strings.noData
else:
msgText = strings.foundData
for i in data:
msgText += (f"<b>{str(i)}</b>: P&L - x%\n")
await message.answer(msgText)
@strategy_router.message(Command("strategy"))
@strategy_router.message(Command("strategy"), F.chat.id.in_(whitelist.chatIDs))
async def commandStrategy(message: Message, state: FSMContext):
print("Called function commandStrategy")
await message.answer(strings.strategyCommand + '\n' + strings.askPair)
await state.set_state(startForm.pair)
@strategy_router.message(F.text, startForm.pair)
async def capture_start_pair(message: Message, state: FSMContext):
print("Called function capture_start_pair")
await state.update_data(pair=message.text)
data = await state.get_data()
t = 0
if await arbus.checkPair(data.get("pair")) == 1:
msg_text = (f'Стратегия на паре <b>{data.get("pair")}</b> уже запущена.\nПожалуйста остановите стратегию либо введите другую пару.')
if await jsonProcessing.checkPair(data.get("pair")) == 1:
msgText = (f'Стратегия на паре <b>{data.get("pair")}</b> уже запущена.\nПожалуйста остановите стратегию либо введите другую пару.')
t = 1
else:
msg_text = strings.askParams
msgText = strings.askParams
await message.answer(msg_text)
await message.answer(msgText)
if t == 1:
print('Clearing state!')
await state.clear()
else:
await state.set_state(startForm.params)
@strategy_router.message(F.text, startForm.params)
async def capture_params(message: Message, state: FSMContext):
print("Called function capture_params")
await state.update_data(params=message.text)
data = await state.get_data()
t = await arbus.mainWrapper(pair=data.get("pair"), params=data.get("params"))
t = await jsonProcessing.savePairParams(pair=data.get("pair"), params=data.get("params"))
if t == 0:
client = await bybit.getClient(credentials.api_key, credentials.api_secret, options.testnet)
client = await bybit.getClient(
credentials.api_key,
credentials.api_secret,
options.testnet,
options.demoTrading
)
if client == -1:
msg_text = (f'Аутентификация не удалась, сообщите администратору если увидете данное сообщение.')
msgText = strings.authFailed
await jsonProcessing.deletePair(pair=data.get("pair"))
else:
asyncio.create_task(bybit.strategy(client, data.get("pair"), data.get("params")))
msg_text = (f'Вы запустили стратегию на паре <b>{data.get("pair")}</b> с данными параметрами:\n<b>{data.get("params")}</b>\n')
try:
asyncio.create_task(bybit.strategy(data.get("pair"), data.get("params")))
msgText = (f'Вы запустили стратегию на паре <b>{data.get("pair")}</b> с данными параметрами:\n<b>{data.get("params")}</b>\n')
except:
await jsonProcessing.deletePair(pair=data.get("pair"))
msgText = (f'Возникла ошибка в работе стратегии =( Пожалуйста сообщите об этом администратору.')
elif t == -1:
msg_text = (f'Параметры введены в неверном формате, пожалуйста начните заново.')
msgText = (f'Параметры введены в неверном формате, пожалуйста начните заново.')
elif t == -2:
msg_text = (f'Стратегия на паре <b>{data.get("pair")}</b> уже запущена.')
msgText = (f'Стратегия на паре <b>{data.get("pair")}</b> уже запущена.')
else:
msg_text = (f'Возникла непредвиденная ошибка. =(')
await message.answer(msg_text)
msgText = (f'Возникла непредвиденная ошибка. =(')
await message.answer(msgText)
await state.clear()
@stop_router.message(Command("stop"))
@stop_router.message(Command("stop"), F.chat.id.in_(whitelist.chatIDs))
async def commandStop(message: Message, state: FSMContext):
print("Called function commandStop")
await message.answer(strings.stopCommand + '\n' + strings.askPair)
await state.set_state(stopForm.pair)
@stop_router.message(F.text, stopForm.pair)
async def capture_stop_pair(message: Message, state: FSMContext):
print("Called function capture_stop_pair")
await state.update_data(pair=message.text)
data = await state.get_data()
if await arbus.checkPair(data.get("pair")) == 1:
t = await arbus.deletePair(data.get("pair"))
if t == 0:
print('Deleted pair succesfuly')
else:
print('Error with deleting pair')
msg_text = strings.stopStrategy
t = await jsonProcessing.deletePair(data.get("pair"))
if t == 0:
msgText = strings.stopStrategy
else:
msg_text = strings.pairNotFound
msgText = strings.pairNotFound
await message.answer(msg_text)
await message.answer(msgText)
await state.clear()
@ -158,5 +165,7 @@ async def main() -> None:
# Main
if __name__ == "__main__":
print('Started bot!')
generalLogger.info("Started bot!")
tradingLogger.info("Started bot!")
jsonProcessing.startUp()
asyncio.run(main())

View File

@ -1,15 +1,15 @@
url = 'https://testnet.binance.vision/api' # API url
testnet = True # Use testnet or not
from decouple import config
pairSymbol = 'ETHUSDT' # Trading pair
mainSymbol = 'USDT' # Balance asset
timeScape = '15m' # Candle length
category = 'spot'
leverage = 1 # Leverage
testnet = config('TESTNET', default='False').lower() != 'false' # Use testnet or not
demoTrading = config('DEMOTRADING', default='False').lower() != 'false' # Use demo trading or not
# Please do not combine testnet and demo trading
notification = 1 # Telegram notifications
leverage = int(config('LEVERAGE', default='1')) # Leverage
loopSleepTime = 2 # Time passing between loops/checks
# notification = 1 # Telegram notifications (not currently supported)
showExtraDebugLogs = config('SHOWEXTRADEBUGLOGS', default='False').lower() != 'false'
loopSleepTime = int(config('LOOPSLEEPTIME', default='1')) # Time passing between checks for stopping strategy
paramsLines = ['highEnd',
'lowEnd',

View File

@ -18,7 +18,7 @@ stopBot = "Бот остановлен!"
# Commands
startCommand = "Привет! Это приватный бот для полуавтоматической торговли криптовалютой. Хороших позиций!"
startCommand = "Привет! Это приватный бот для полуавтоматической торговли криптовалютой. В данный момент он работает по вайтлисту. Хороших позиций!"
stopCommand = "Вы собираетесь остановить стратегию."
@ -44,3 +44,12 @@ askParams = "Введите параметры:"
gotParams = "Параметры заданы!"
pairNotFound = "Стратегия на данную монетную пару не найдена."
authFailed = (f'Аутентификация не удалась, пожалуйста сообщите администратору если увидете данное сообщение.')
# Data status
noData = "Нет запущенных стратегий!"
foundData = "В данный момент стратегия запущена на следующих монетных парах:\n"

11
src/whitelist.py Normal file
View File

@ -0,0 +1,11 @@
from decouple import config
import re
def checkWhiteList(id):
# Checks if id exists in the whitelist. Returnes 1 if exists and 0 if not
return id in chatIDs
chatIDsstring = config('WHITELIST', default='')
chatIDs = [int(x) for x in re.split(r',\s*', chatIDsstring)]

38
todo.md Normal file
View File

@ -0,0 +1,38 @@
Версия 0.x.x
### ToFix
- [x] ID бота (создать нового)
- [x] Замена print на логирование
### Новые функции
- [x] Реализация базы программы
- - [x] Общее взаимодействие файлов и функций
- - [x] Обработка ошибок
- [x] Реализация JSON хранилища
- - [x] Функция записи данных в JSON
- - [x] Функция удаления данных из JSON
- - [x] Функция получения данных из JSON
- - [x] Функция проверки наличия данных в JSON
- - [x] Создание JSON файла при первом запуске
- - [x] Сохранение предыдущего JSON файла в архив и создание нового при перезапуске
- [x] Реализация взаимодействия с ботом через телеграмм
- - [x] Реализация команд
- - - [x] Start (общая информация про бота)
- - - [x] Help (шпаргалка по другим командам и порядку заполнения параметров)
- - - [x] Strategy (Запуск стратегии)
- - - [x] Stop (Остановка стратегии)
- - - [x] Info (Информация о запущеных стратегиях)
- - [x] Обеспечение безопасности и приватности бота (через chat id или пароль)
- [x] Реализация стратегии
- - [x] Основная функция для запуска стратегии
- - [x] Класс работы по параметрам
- - [x] Реализация уровней
- - [x] Установка позиций
- [ ] Рализация развёртывания программы
- - [ ] Написать compose.yml
- - [ ] Добавить requirements.txt
- - [ ] Сделать подсасывание контейнера с гита
- - [x] Составить список и реализовать получение переменных окружения
- [ ] QOL
- - [x] Написать todo.md
- - [ ] Написать README.md
- - [ ] Написать setup.md