from __future__ import print_function, absolute_import
|
|
import datetime
|
import json
|
import time as t
|
|
import gm.api as gmapi
|
import gpcode_manager
|
import threading
|
import tool
|
|
import redis_manager
|
import authority
|
import decimal
|
from l2_code_operate import L2CodeOperate
|
from log import logger_juejin_tick
|
|
redisManager = redis_manager.RedisManager()
|
|
|
# 设置账户信息
|
def setAccountInfo(accountId, strategyId, token):
|
redis = redisManager.getRedis()
|
redis.set("juejin-account-id", accountId)
|
redis.set("juejin-strategy-id", strategyId)
|
redis.set("juejin-token", token)
|
|
|
def getAccountInfo():
|
redis = redisManager.getRedis()
|
account_id = redis.get("juejin-account-id")
|
strategy_id = redis.get("juejin-strategy-id")
|
token = redis.get("juejin-token")
|
return account_id, strategy_id, token
|
|
|
def init(context):
|
# gmapi.subscribe(symbols="SZSE.002529", frequency="1d", count=30)
|
|
# 订阅浦发银行, bar频率为一天和一分钟
|
# 订阅订阅多个频率的数据,可多次调用subscribe
|
# 获取需要监听的股票
|
print("掘金初始化")
|
# 多个时间点获取收盘价
|
gmapi.schedule(schedule_func=get_latest_info, date_rule='1d', time_rule='08:30:00')
|
gmapi.schedule(schedule_func=get_latest_info, date_rule='1d', time_rule='08:50:00')
|
gmapi.schedule(schedule_func=get_latest_info, date_rule='1d', time_rule='09:15:00')
|
gmapi.schedule(schedule_func=get_current_info, date_rule='1d', time_rule='09:29:00')
|
gmapi.schedule(schedule_func=get_current_info, date_rule='1d', time_rule='09:29:35')
|
re_subscribe_tick()
|
# re_subscribe_bar()
|
|
# 初始化内容
|
clients = authority.get_l2_clients()
|
for client in clients:
|
for i in range(0, 8):
|
gpcode_manager.init_listen_code_by_pos(client, i)
|
|
|
def get_latest_info(context):
|
# 初始化内容
|
clients = authority.get_l2_clients()
|
for c in clients:
|
for i in range(0, 8):
|
gpcode_manager.init_listen_code_by_pos(int(c), i)
|
data = gpcode_manager.get_gp_list();
|
result = JueJinManager.get_gp_latest_info(data);
|
for item in result:
|
sec_level = item['sec_level']
|
symbol = item['symbol']
|
symbol = symbol.split(".")[1]
|
pre_close = tool.to_price(decimal.Decimal(str(item['pre_close'])))
|
if sec_level == 1:
|
gpcode_manager.set_price_pre(symbol, pre_close)
|
else:
|
gpcode_manager.rm_gp(symbol)
|
|
|
# 获取最新的信息
|
def get_current_info():
|
data = gpcode_manager.get_gp_list();
|
results = JueJinManager.get_gp_current_info(data);
|
logger_juejin_tick.debug("定时获取:{}",results)
|
for result in results:
|
price = result["price"]
|
symbol = result['symbol']
|
# 保存最新价
|
symbol = symbol.split(".")[1]
|
accpt_price(symbol, price)
|
|
|
# 设置收盘价
|
def set_price_pre(code):
|
codes = [code]
|
set_price_pres(codes)
|
|
|
def set_price_pres(codes):
|
result = JueJinManager.get_gp_latest_info(codes);
|
for item in result:
|
symbol = item['symbol']
|
symbol = symbol.split(".")[1]
|
pre_close = tool.to_price(decimal.Decimal(str(item['pre_close'])))
|
gpcode_manager.set_price_pre(symbol, pre_close)
|
|
|
__prices_now = {}
|
|
|
def on_tick(context, tick):
|
# print(tick["created_at"])
|
relative_timestamp = t.time() % (24 * 60 * 60) + 8 * 60 * 60
|
# 9点20-15:05接受数据
|
start1 = 60 * 60 * 9 + 24 * 60;
|
end1 = 60 * 60 * 11 + 35 * 60;
|
start2 = 60 * 60 * 12 + 50 * 60;
|
end2 = 60 * 60 * 15 + 5 * 60;
|
# TODO 测试
|
test = False
|
if (start1 < relative_timestamp < end1 or start2 < relative_timestamp < end2) or test:
|
symbol = tick['symbol']
|
price = tick['price']
|
# print(symbol,price)
|
# 价格有变化的时候才发送操作
|
# if symbol in __prices_now and __prices_now[symbol] == price:
|
# return
|
|
# 保存最新价
|
symbol = symbol.split(".")[1]
|
logger_juejin_tick.info("{} {} {}".format(symbol, price, tick["created_at"]))
|
accpt_price(symbol, price)
|
__prices_now[symbol] = price
|
|
|
# 获取到现价
|
def accpt_price(code, price):
|
gpcode_manager.set_price(code, price)
|
# 获取收盘价
|
pricePre = gpcode_manager.get_price_pre(code)
|
if pricePre is not None:
|
rate = round((price - pricePre) * 100 / pricePre, 1)
|
if rate >= 7:
|
print(code, price, rate)
|
if not gpcode_manager.is_listen(code) and not gpcode_manager.is_operate(
|
code) and not gpcode_manager.is_listen_full():
|
L2CodeOperate.get_instance().add_operate(1, code)
|
# 进入监控
|
elif rate < 5:
|
# 移除监控
|
if gpcode_manager.is_listen(code) and not gpcode_manager.is_operate(code):
|
L2CodeOperate.get_instance().add_operate(0, code)
|
|
|
def on_bar(context, bars):
|
print("on_bar", bars)
|
|
|
def re_subscribe_tick():
|
gpcode_list = gpcode_manager.get_gp_list_with_prefix()
|
codes_str = ",".join(gpcode_list)
|
print("订阅:", codes_str)
|
# TODO
|
gmapi.subscribe(symbols=codes_str, frequency='tick', count=2, wait_group=False, wait_group_timeout='10s',
|
unsubscribe_previous=True)
|
|
|
def re_subscribe_bar():
|
gpcode_list = gpcode_manager.get_gp_list_with_prefix()
|
codes_str = ",".join(gpcode_list)
|
print("订阅:", codes_str)
|
gmapi.subscribe(symbols=codes_str, frequency='60s', count=1,
|
unsubscribe_previous=True)
|
|
|
def recieve_msg(pipe):
|
while True:
|
value = pipe.recv()
|
print(value)
|
if value == 'resub':
|
re_subscribe_tick()
|
|
|
class JueJinManager:
|
def __init__(self, pipe):
|
self.pipe = pipe
|
# 开启线程接受数据
|
t1 = threading.Thread(target=lambda: recieve_msg(pipe))
|
t1.setDaemon(True)
|
t1.start()
|
|
@staticmethod
|
def get_gp_latest_info(codes):
|
account_id, s_id, token = getAccountInfo()
|
symbols = gpcode_manager.get_gp_list_with_prefix(codes)
|
gmapi.set_token(token)
|
data = gmapi.get_instruments(symbols=",".join(symbols))
|
print(data)
|
return data
|
|
@staticmethod
|
def get_gp_current_info(codes):
|
account_id, s_id, token = getAccountInfo()
|
symbols = gpcode_manager.get_gp_list_with_prefix(codes)
|
gmapi.set_token(token)
|
data = gmapi.current(symbols=",".join(symbols))
|
print(data)
|
return data
|
|
def start(self):
|
account_id, s_id, token = getAccountInfo()
|
gmapi.run(strategy_id=s_id,
|
filename='juejin.py',
|
# todo 回测模式
|
mode=gmapi.MODE_LIVE,
|
backtest_start_time='2022-08-18 09:25:00',
|
backtest_end_time='2022-08-18 10:30:00',
|
token=token)
|
|
def stop(self):
|
gmapi.stop()
|
|
|
def trade(code, volume):
|
account_id, s_id, token = getAccountInfo()
|
gmapi.set_token(token)
|
gmapi.set_account_id(account_id)
|
result = gmapi.order_volume(symbol=code, volume=volume, side=gmapi.OrderSide_Sell,
|
order_type=gmapi.OrderType_Market, position_effect=gmapi.PositionEffect_Close)
|
print(result)
|
|
|
def get_volumns(codes):
|
end = datetime.datetime.now()
|
# 获取近90天的历史数据
|
cha = datetime.timedelta(days=90)
|
start = end - cha
|
|
account_id, s_id, token = getAccountInfo()
|
gmapi.set_token(token)
|
gmapi.set_account_id(account_id)
|
results = gmapi.history(symbol=gpcode_manager.get_gp_list_with_prefix(codes), frequency="1d",
|
start_time="{:%Y-%m-%d}".format(start),
|
fields="symbol,volume,eob",
|
end_time="{:%Y-%m-%d}".format(end))
|
_fresult = {}
|
|
for result in results:
|
code = result["symbol"].split(".")[1]
|
volumn = int(result["volume"])
|
day = "{:%Y-%m-%d}".format(result["eob"])
|
if _fresult.get(code) is None:
|
_fresult[code] = {"max_volumn": volumn, "latest_volumn": volumn}
|
|
if volumn > _fresult[code]["max_volumn"]:
|
_fresult[code]["max_volumn"] = volumn;
|
_fresult[code]["latest_volumn"] = volumn;
|
return _fresult
|
|
|
if __name__ == '__main__':
|
# trade("SZSE.000521", 100)
|
# print("")
|
# JueJinManager.get_gp_latest_info(["000592","002808"])
|
get_current_info()
|
|
# data_process.saveCodeVolumn(get_volumns(["000333","002911"]))
|
# _redis_manager = redis_manager.RedisManager()
|
# redis = _redis_manager.getRedis()
|
# keys = redis.keys("test-inrec")
|
# print(keys)
|