用 Python 和币安 API 构建数字货币交易机器人(一)
Python中文社区
共 7258字,需浏览 15分钟
·
2021-05-26 00:43
/exchanges
/strategies
/models
./models/model.py
from datetime import datetime
class AbstractModel:
created: datetime
def __init__(self, **kwargs):
for key, value in kwargs.items():
setattr(self, key, value)
./models/price
from models.model import AbstractModel
class Price(AbstractModel):
pair: str = ''
exchange: str = ''
current: float = 0
lowest: float = 0
highest: float = 0
currency: str = ''
asset: str = ''
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.pair = self.get_pair()
def get_pair(self):
return self.currency + '_' + self.asset
./strategies/strategy.py
import json
import threading
import time
from datetime import datetime
from decouple import config
from models.price import Price
class Strategy(object):
price: Price
def __init__(self, exchange, interval=60, *args, **kwargs):
self._timer = None
self.interval = interval
self.args = args
self.kwargs = kwargs
self.is_running = False
self.next_call = time.time()
self.portfolio = {}
self.exchange = exchange
# Load account portfolio for pair at load
self.get_portfolio()
def _run(self):
self.is_running = False
self.start()
self.run(*self.args, **self.kwargs)
def start(self):
if not self.is_running:
print(datetime.now())
if self._timer is None:
self.next_call = time.time()
else:
self.next_call += self.interval
self._timer = threading.Timer(self.next_call - time.time(), self._run)
self._timer.start()
self.is_running = True
def stop(self):
self._timer.cancel()
self.is_running = False
def get_portfolio(self):
self.portfolio = {'currency': self.exchange.get_asset_balance(self.exchange.currency),
'asset': self.exchange.get_asset_balance(self.exchange.asset)}
def get_price(self):
try:
self.price = self.exchange.symbol_ticker()
except Exception as e:
pass
...
./strategies/watcher.py
from exchanges.exchange import Exchange
from strategies.strategy import Strategy
class Watcher(Strategy):
def __init__(self, exchange: Exchange, timeout=60, *args, **kwargs):
super().__init__(exchange, timeout, *args, **kwargs)
def run(self):
self.get_price()
print('*******************************')
print('Exchange: ', self.exchange.name)
print('Pair: ', self.exchange.get_symbol())
print('Available: ', self.portfolio['currency'] + ' ' + self.exchange.currency)
print('Available: ', self.portfolio['asset'] + ' ' + self.exchange.asset)
print('Price: ', self.price.current)
https://github.com/binance/binance/binance-spot-api-docs
https://pypi.org/project/requests/
评论