"""
|
掘金
|
"""
|
|
from __future__ import print_function, absolute_import
|
|
import datetime
|
import json
|
import logging
|
import time as t
|
|
import numpy
|
import schedule
|
|
import gm.api as gmapi
|
|
import big_money_num_manager
|
import client_manager
|
import code_volumn_manager
|
import constant
|
import global_data_loader
|
import global_util
|
import gpcode_first_screen_manager
|
import gpcode_manager
|
import threading
|
|
import server
|
import tool
|
|
from db import redis_manager
|
import authority
|
import decimal
|
|
from trade import trade_gui, l2_trade_util
|
from l2.cancel_buy_strategy import L2LimitUpSellStatisticUtil
|
from l2_code_operate import L2CodeOperate
|
from log import logger_juejin_tick, logger_system
|
from trade.trade_data_manager import CodeActualPriceProcessor
|
from trade.trade_queue_manager import JueJinBuy1VolumnManager
|
|
redisManager = redis_manager.RedisManager(0)
|
__jueJinBuy1VolumnManager = JueJinBuy1VolumnManager()
|
__actualPriceProcessor = CodeActualPriceProcessor()
|
|
|
# 设置账户信息
|
def setAccountInfo(accountId, strategyId, token):
|
redis = redisManager.getRedis()
|
redis.set("juejin-account-id", accountId)
|
redis.set("juejin-strategy-id", strategyId)
|
redis.set("juejin-token", token)
|
|
|
def getAccountInfo():
|
redis = redisManager.getRedis()
|
account_id = redis.get("juejin-account-id")
|
strategy_id = redis.get("juejin-strategy-id")
|
token = redis.get("juejin-token")
|
return account_id, strategy_id, token
|
|
|
def init_data():
|
# 删除所有的涨停卖数据
|
L2LimitUpSellStatisticUtil.clear()
|
# 重置所有的大单数据
|
big_money_num_manager.reset_all()
|
# 清除水下捞数据
|
__actualPriceProcessor.clear_under_water_data()
|
# 载入行业股票代码
|
global_data_loader.load_industry()
|
# 载入代码自由流通市值
|
global_data_loader.load_zyltgb()
|
# 载入量
|
global_data_loader.load_volumn()
|
|
# 9点25之前删除所有代码
|
if tool.trade_time_sub(tool.get_now_time_str(), "09:25:00") <= 0:
|
# 删除L2监听代码
|
gpcode_manager.clear_listen_codes()
|
# 删除首板代码
|
gpcode_manager.clear_first_codes()
|
# 删除首板未筛选代码
|
gpcode_first_screen_manager.clear_first_no_screen_codes()
|
# 删除禁止代码
|
l2_trade_util.init_forbidden_trade_codes()
|
# 清空白名单
|
l2_trade_util.WhiteListCodeManager.clear()
|
# 清空想要买
|
gpcode_manager.WantBuyCodesManager.clear()
|
|
|
# 每日初始化
|
def everyday_init():
|
# 交易時間不能做初始化
|
if not tool.is_init_time():
|
raise Exception("交易时间不能初始化")
|
|
codes = gpcode_manager.get_gp_list()
|
logger_system.info("每日初始化")
|
|
# 今日实时涨停
|
global_data_loader.add_limit_up_codes([], True)
|
# 主要获取收盘价
|
get_latest_info(None)
|
# 获取60天最大量与昨日量
|
global_util.today_volumn.clear()
|
global_util.max60_volumn.clear()
|
global_util.yesterday_volumn.clear()
|
volumn_dict = get_volumns(codes)
|
for key in volumn_dict:
|
code_volumn_manager.set_histry_volumn(key, volumn_dict[key][0], volumn_dict[key][1])
|
# 清除大单数据
|
global_util.big_money_num.clear()
|
# 初始化大单数据
|
for code in codes:
|
big_money_num_manager.add_num(code, 0)
|
big_money_num_manager.expire(code)
|
# 清除涨停时间
|
global_util.limit_up_time.clear()
|
init_data()
|
# 初始化同花顺主站
|
l2_clients = client_manager.getValidL2Clients()
|
for client in l2_clients:
|
try:
|
server.repair_ths_main_site(client)
|
except Exception as e:
|
pass
|
|
|
def __run_schedule():
|
while True:
|
schedule.run_pending()
|
|
|
def init(context):
|
# gmapi.subscribe(symbols="SZSE.002529", frequency="1d", count=30)
|
|
# 订阅浦发银行, bar频率为一天和一分钟
|
# 订阅订阅多个频率的数据,可多次调用subscribe
|
# 获取需要监听的股票
|
init_data()
|
logger_system.info("掘金初始化")
|
schedule.every().day.at("09:15:00").do(everyday_init)
|
t1 = threading.Thread(target=lambda: __run_schedule())
|
# 后台运行
|
t1.setDaemon(True)
|
t1.start()
|
|
# 多个时间点获取收盘价
|
gmapi.schedule(schedule_func=get_latest_info, date_rule='1d', time_rule='08:30:00')
|
gmapi.schedule(schedule_func=get_latest_info, date_rule='1d', time_rule='08:50:00')
|
gmapi.schedule(schedule_func=get_latest_info, date_rule='1d', time_rule='09:28:00')
|
gmapi.schedule(schedule_func=get_current_info, date_rule='1d', time_rule='09:25:00')
|
gmapi.schedule(schedule_func=get_current_info, date_rule='1d', time_rule='09:29:00')
|
re_subscribe_tick()
|
# re_subscribe_bar()
|
|
# 初始化内容
|
clients = authority.get_l2_clients()
|
for client in clients:
|
for i in range(0, constant.L2_CODE_COUNT_PER_DEVICE):
|
gpcode_manager.init_listen_code_by_pos(client, i)
|
|
|
def get_latest_info(context):
|
# 初始化内容
|
clients = authority.get_l2_clients()
|
for c in clients:
|
for i in range(0, constant.L2_CODE_COUNT_PER_DEVICE):
|
gpcode_manager.init_listen_code_by_pos(int(c), i)
|
codes = gpcode_manager.get_gp_list()
|
result = JueJinManager.get_gp_latest_info(codes)
|
for item in result:
|
sec_level = item['sec_level']
|
symbol = item['symbol']
|
symbol = symbol.split(".")[1]
|
pre_close = tool.to_price(decimal.Decimal(str(item['pre_close'])))
|
if sec_level == 1:
|
if symbol in codes:
|
gpcode_manager.set_price_pre(symbol, pre_close)
|
else:
|
gpcode_manager.rm_gp(symbol)
|
|
|
# 获取最新的信息
|
def get_current_info():
|
data = gpcode_manager.get_gp_list()
|
results = JueJinManager.get_gp_current_info(data)
|
logger_juejin_tick.debug("定时获取:{}", results)
|
for result in results:
|
price = result["price"]
|
symbol = result['symbol']
|
# 保存最新价
|
symbol = symbol.split(".")[1]
|
accpt_price(symbol, price)
|
|
|
# 设置收盘价
|
def re_set_price_pre(code):
|
codes = [code]
|
re_set_price_pres(codes)
|
|
|
def re_set_price_pres(codes):
|
result = JueJinManager.get_gp_latest_info(codes)
|
for item in result:
|
symbol = item['symbol']
|
symbol = symbol.split(".")[1]
|
pre_close = tool.to_price(decimal.Decimal(str(item['pre_close'])))
|
gpcode_manager.set_price_pre(symbol, pre_close)
|
|
|
__prices_now = {}
|
|
|
def on_tick(context, tick):
|
# print(tick["created_at"])
|
relative_timestamp = t.time() % (24 * 60 * 60) + 8 * 60 * 60
|
# 9点20-15:05接受数据
|
start1 = 60 * 60 * 9 + 31 * 60
|
end1 = 60 * 60 * 11 + 35 * 60
|
start2 = 60 * 60 * 12 + 50 * 60
|
end2 = 60 * 60 * 15 + 5 * 60
|
# TODO 测试
|
if (start1 < relative_timestamp < end1 or start2 < relative_timestamp < end2) or constant.TEST:
|
symbol = tick['symbol']
|
price = tick['price']
|
# print(symbol,price)
|
# 价格有变化的时候才发送操作
|
# if symbol in __prices_now and __prices_now[symbol] == price:
|
# return
|
|
# 保存最新价
|
symbol = symbol.split(".")[1]
|
JueJinManager.add_listen_code(symbol)
|
time_ = tick["created_at"].strftime("%H:%M:%S")
|
data_ = (symbol, time_, tick["quotes"][0]["bid_v"], tick["quotes"][0]["bid_p"])
|
logger_juejin_tick.info("买1量 {},{},{},{}", data_[1], data_[0], data_[2],
|
data_[3])
|
# 暂时不采用
|
# need_sync = __jueJinBuy1VolumnManager.save(data_[0], data_[1], data_[2], data_[3])
|
# if need_sync:
|
# # 同步数据
|
# L2LimitUpMoneyStatisticUtil.verify_num(data_[0], data_[2], data_[1])
|
|
# print(tick["created_at"],tick["quotes"][0]["bid_v"])
|
|
accpt_price(symbol, price)
|
__prices_now[symbol] = price
|
|
|
# 获取到现价
|
def accpt_price(code, price, price_from="juejin"):
|
return
|
gpcode_manager.set_price(code, price)
|
# 获取收盘价
|
pricePre = gpcode_manager.get_price_pre(code)
|
if pricePre is not None:
|
rate = round((price - pricePre) * 100 / pricePre, 1)
|
if rate >= 7:
|
logger_juejin_tick.info("{}-{}-{}", code, price, rate)
|
if not gpcode_manager.is_listen_old(code) and not gpcode_manager.is_operate(
|
code) and not gpcode_manager.is_listen_full():
|
L2CodeOperate.get_instance().add_operate(1, code, "现价变化,rate-{} from-{}".format(rate, price_from))
|
# 进入监控
|
elif rate < 5:
|
# 移除监控
|
if gpcode_manager.is_listen_old(code) and not gpcode_manager.is_operate(code):
|
L2CodeOperate.get_instance().add_operate(0, code, "现价变化,rate-{} from-{}".format(rate, price_from))
|
|
|
# 获取到现价
|
def accept_prices(prices):
|
# 获取首板代码
|
first_codes = gpcode_manager.get_first_gp_codes()
|
|
print("价格代码数量:", len(prices))
|
|
__actualPriceProcessor.save_current_price_codes_count(len(prices))
|
# 采集的代码数量不对
|
if len(gpcode_manager.get_gp_list()) - len(prices) > 10:
|
return
|
now_str = tool.get_now_time_str()
|
now_strs = now_str.split(":")
|
now_second = int(now_strs[0]) * 60 * 60 + int(now_strs[1]) * 60 + int(now_strs[2])
|
start = 60 * 60 * 9 + 31 * 60
|
if False:
|
for d in prices:
|
code, price = d["code"], float(d["price"])
|
accpt_price(code, price, "ths")
|
else:
|
_code_list = []
|
_delete_list = []
|
for d in prices:
|
code, price = d["code"], float(d["price"])
|
gpcode_manager.set_price(code, price)
|
# 获取收盘价
|
pricePre = gpcode_manager.get_price_pre(code)
|
if pricePre is not None:
|
rate = round((price - pricePre) * 100 / pricePre, 2)
|
if first_codes and code in first_codes:
|
rate = rate / 2
|
if rate >= 0:
|
# 暂存涨幅为正的代码
|
_code_list.append((rate, code))
|
else:
|
# 暂存涨幅为负的代码
|
_delete_list.append((rate, code))
|
try:
|
__actualPriceProcessor.process_rate(code, rate, now_str)
|
except Exception as e:
|
logging.exception(e)
|
|
try:
|
__actualPriceProcessor.save_current_price(code, price,
|
gpcode_manager.get_limit_up_price_by_preprice(
|
pricePre) == tool.to_price(
|
decimal.Decimal(d["price"])))
|
except Exception as e:
|
logging.exception(e)
|
|
# -------------------------------处理交易位置分配---------------------------------
|
# 排序
|
new_code_list = sorted(_code_list, key=lambda e: (e.__getitem__(0), e.__getitem__(1)), reverse=True)
|
# 预填充下单代码
|
_buy_win_codes = []
|
for d in new_code_list:
|
_buy_win_codes.append(d[1])
|
for d in _delete_list:
|
_buy_win_codes.append(d[1])
|
try:
|
trade_gui.THSBuyWinManagerNew.fill_codes(_buy_win_codes)
|
except Exception as e:
|
logging.exception(e)
|
pass
|
|
# -------------------------------处理L2监听---------------------------------
|
|
client_ids = client_manager.getValidL2Clients()
|
# 最多填充的代码数量
|
max_count = len(client_ids) * constant.L2_CODE_COUNT_PER_DEVICE
|
if max_count == 0:
|
max_count = constant.L2_CODE_COUNT_PER_DEVICE
|
|
_delete_list = []
|
for item in new_code_list:
|
if l2_trade_util.is_in_forbidden_trade_codes(item[1]) or item[0] < 0:
|
_delete_list.append(item)
|
|
for item in _delete_list:
|
new_code_list.remove(item)
|
# 截取前几个代码填充
|
add_list = new_code_list[:max_count]
|
# 后面的代码全部删除
|
_delete_list.extend(new_code_list[max_count:])
|
|
add_code_list = []
|
del_code_list = []
|
for d in add_list:
|
add_code_list.append(d[1])
|
|
for d in _delete_list:
|
del_code_list.append(d[1])
|
|
# 后面的代码数量
|
# 先删除应该删除的代码
|
for code in del_code_list:
|
if gpcode_manager.is_listen_old(code):
|
# 判断是否在监听里面
|
L2CodeOperate.get_instance().add_operate(0, code, "现价变化")
|
# 增加应该增加的代码
|
for code in add_code_list:
|
if not gpcode_manager.is_listen_old(code):
|
L2CodeOperate.get_instance().add_operate(1, code, "现价变化")
|
|
# 获取卡位数量
|
free_count = gpcode_manager.get_free_listen_pos_count()
|
if free_count < 2:
|
# 空闲位置不足
|
listen_codes = gpcode_manager.get_listen_codes()
|
for code in listen_codes:
|
if not gpcode_manager.is_in_gp_pool(code):
|
client_id, pos = gpcode_manager.get_listen_code_pos(code)
|
gpcode_manager.set_listen_code_by_pos(client_id, pos, "")
|
free_count += 1
|
if free_count > 2:
|
break
|
|
print(add_code_list, del_code_list)
|
|
|
def on_bar(context, bars):
|
print("on_bar", bars)
|
|
|
def re_subscribe_tick():
|
gpcode_list = gpcode_manager.get_gp_list_with_prefix()
|
codes_str = ",".join(gpcode_list)
|
print("订阅:", codes_str)
|
# TODO
|
gmapi.subscribe(symbols=codes_str, frequency='tick', count=2, wait_group=False, wait_group_timeout='10s',
|
unsubscribe_previous=True)
|
|
|
def re_subscribe_bar():
|
gpcode_list = gpcode_manager.get_gp_list_with_prefix()
|
codes_str = ",".join(gpcode_list)
|
print("订阅:", codes_str)
|
gmapi.subscribe(symbols=codes_str, frequency='60s', count=1,
|
unsubscribe_previous=True)
|
|
|
def recieve_msg(pipe):
|
while True:
|
value = pipe.recv()
|
print("跨进程通信:", value)
|
jsonValue = json.loads(value)
|
action = jsonValue["type"]
|
if action == 'resub':
|
re_subscribe_tick()
|
elif action == 'accpt_price':
|
try:
|
datas = jsonValue["data"]
|
for data in datas:
|
accpt_price(data["code"], float(data["price"]))
|
except Exception as e:
|
print(str(e))
|
|
|
class JueJinManager:
|
def __init__(self, pipe):
|
self.pipe = pipe
|
# 开启线程接受数据
|
t1 = threading.Thread(target=lambda: recieve_msg(pipe))
|
t1.setDaemon(True)
|
t1.start()
|
|
@classmethod
|
def get_gp_latest_info(cls, codes):
|
account_id, s_id, token = getAccountInfo()
|
symbols = gpcode_manager.get_gp_list_with_prefix(codes)
|
gmapi.set_token(token)
|
data = gmapi.get_instruments(symbols=",".join(symbols))
|
print(data)
|
return data
|
|
@classmethod
|
def get_now_price(cls, codes):
|
data = JueJinManager.get_gp_current_info(codes)
|
prices = []
|
for item in data:
|
code = item["symbol"].split(".")[1]
|
price = item["price"]
|
prices.append((code, price))
|
return prices
|
|
# 获取代码的涨幅
|
@classmethod
|
def get_codes_limit_rate(cls, codes):
|
datas = JueJinManager.get_gp_latest_info(codes)
|
pre_price_dict = {}
|
for data in datas:
|
code = data["sec_id"]
|
pre_close = tool.to_price(decimal.Decimal(str(data['pre_close'])))
|
pre_price_dict[code] = pre_close
|
|
now_prices = JueJinManager.get_now_price(codes)
|
f_results = []
|
for data in now_prices:
|
code = data[0]
|
price = data[1]
|
pre_price = float(pre_price_dict.get(code))
|
rate = round((price - pre_price) * 100 / pre_price, 2)
|
f_results.append((code, rate))
|
f_results.sort(key=lambda tup: tup[1])
|
f_results.reverse()
|
return f_results
|
|
@classmethod
|
def get_history_tick_n(cls, code, count, fields=None):
|
account_id, s_id, token = getAccountInfo()
|
symbols = gpcode_manager.get_gp_list_with_prefix([code])
|
gmapi.set_token(token)
|
results = gmapi.history_n(symbol=symbols[0], frequency="1d", count=count, fields=fields)
|
return results
|
|
@classmethod
|
def get_lowest_price_rate(cls, code, count):
|
datas = cls.get_history_tick_n(code, count)
|
low_price = datas[0]["close"]
|
for data in datas:
|
if low_price > data["close"]:
|
low_price = data["close"]
|
return (datas[-1]["close"] - low_price) / low_price
|
|
@classmethod
|
def get_gp_current_info(cls, codes):
|
account_id, s_id, token = getAccountInfo()
|
symbols = gpcode_manager.get_gp_list_with_prefix(codes)
|
gmapi.set_token(token)
|
data = gmapi.current(symbols=",".join(symbols))
|
print(data)
|
return data
|
|
@classmethod
|
def get_gp_codes_names(cls, codes):
|
datas = cls.get_gp_latest_info(codes)
|
results = {}
|
for data in datas:
|
code = data["symbol"].split(".")[1]
|
code_name = data['sec_name']
|
results[code] = code_name
|
return results
|
|
def start(self):
|
account_id, s_id, token = getAccountInfo()
|
gmapi.run(strategy_id=s_id,
|
filename='juejin.py',
|
# todo 回测模式
|
mode=gmapi.MODE_LIVE,
|
backtest_start_time='2022-08-18 09:25:00',
|
backtest_end_time='2022-08-18 10:30:00',
|
token=token)
|
|
def stop(self):
|
gmapi.stop()
|
|
@classmethod
|
def add_listen_code(cls, code):
|
redis = redisManager.getRedis()
|
redis.setex("juejin_listen_code-{}".format(code), 20, "1")
|
|
@classmethod
|
def get_listen_codes_lenth(cls):
|
redis = redisManager.getRedis()
|
keys = redis.keys("juejin_listen_code-*")
|
return len(keys)
|
|
@classmethod
|
def get_previous_trading_date(cls, date):
|
account_id, s_id, token = getAccountInfo()
|
gmapi.set_token(token)
|
return gmapi.get_previous_trading_date("SHSE", date)
|
|
|
def trade(code, volume):
|
account_id, s_id, token = getAccountInfo()
|
gmapi.set_token(token)
|
gmapi.set_account_id(account_id)
|
result = gmapi.order_volume(symbol=code, volume=volume, side=gmapi.OrderSide_Sell,
|
order_type=gmapi.OrderType_Market, position_effect=gmapi.PositionEffect_Close)
|
print(result)
|
|
|
# 获取近90天的最大量与最近的量
|
def get_volumns(codes):
|
end = datetime.datetime.now()
|
# 获取近90天的历史数据
|
cha = datetime.timedelta(days=90)
|
start = end - cha
|
|
account_id, s_id, token = getAccountInfo()
|
gmapi.set_token(token)
|
gmapi.set_account_id(account_id)
|
results = gmapi.history(symbol=gpcode_manager.get_gp_list_with_prefix(codes), frequency="1d",
|
start_time="{:%Y-%m-%d}".format(start),
|
fields="symbol,volume,eob",
|
end_time="{:%Y-%m-%d}".format(end))
|
_fresult = {}
|
|
for result in results:
|
code = result["symbol"].split(".")[1]
|
volumn = int(result["volume"])
|
day = "{:%Y-%m-%d}".format(result["eob"])
|
|
if _fresult.get(code) is None:
|
_fresult[code] = (volumn, volumn)
|
|
if volumn > _fresult[code][0]:
|
_fresult[code] = (volumn, _fresult[code][1])
|
_fresult[code] = (_fresult[code][0], volumn)
|
return _fresult
|
|
|
# 获取近90天的最大量与最近的量
|
# 获取最近一次涨停/涨停下一个交易日的最大值
|
def get_volumns_by_code(code, count=60) -> object:
|
datas = JueJinManager.get_history_tick_n(code, count, "open,high,low,close,volume,pre_close,bob")
|
# 计算
|
datas.sort(key=lambda x: x["bob"], reverse=True)
|
return datas
|
|
|
# 解析最大量
|
def parse_max_volume(datas, is_new_top=False):
|
max_volume = 0
|
|
max_volume_date = None
|
if is_new_top:
|
# 如果是突破前高就取最大量
|
for item in datas:
|
if max_volume < item["volume"]:
|
max_volume = item["volume"]
|
max_volume_date = item["bob"]
|
else:
|
date = None
|
for i in range(len(datas)):
|
# 查询涨停
|
item = datas[i]
|
volume = item["volume"]
|
if max_volume < volume:
|
max_volume = volume
|
max_volume_date = item['bob']
|
# 是否有涨停
|
limit_up_price = float(gpcode_manager.get_limit_up_price_by_preprice(item["pre_close"]))
|
if abs(limit_up_price - item["high"]) < 0.01:
|
next_volume = 0
|
if i > 0:
|
next_volume = datas[i - 1]["volume"]
|
date = datas[i]["bob"]
|
if volume < next_volume:
|
volume = next_volume
|
date = datas[i - 1]["bob"]
|
return volume, volume, date.strftime("%Y-%m-%d")
|
return max_volume, max_volume, max_volume_date.strftime("%Y-%m-%d")
|
|
|
# 是否有涨停
|
def is_have_limit_up(datas):
|
for i in range(len(datas)):
|
item = datas[i]
|
limit_up_price = float(gpcode_manager.get_limit_up_price_by_preprice(item["pre_close"]))
|
if abs(limit_up_price - item["close"]) < 0.01:
|
return True
|
return False
|
|
|
# 首板涨停溢价率
|
def get_limit_up_money_percent(datas):
|
datas.sort(key=lambda x: x["bob"])
|
first_rate_list = []
|
for i in range(0, len(datas)):
|
item = datas[i]
|
limit_up_price = float(gpcode_manager.get_limit_up_price_by_preprice(item["pre_close"]))
|
if abs(limit_up_price - item["close"]) < 0.001 and abs(
|
limit_up_price - datas[i - 1]["close"]) >= 0.001 and 0 < i < len(datas) - 1:
|
# 首板涨停
|
rate = (datas[i + 1]["high"] - datas[i + 1]["pre_close"]) / datas[i + 1]["pre_close"]
|
first_rate_list.append(rate)
|
if not first_rate_list:
|
return 1
|
count = 0
|
for rate in first_rate_list:
|
if rate >= 0.01:
|
count += 1
|
return count / len(first_rate_list)
|
|
|
# 根据涨幅高低分配交易窗口
|
def distribute_buy_win():
|
if tool.trade_time_sub(tool.get_now_time_str(), "09:30:00") > 0:
|
raise Exception("只能9:30之前重新分配窗口")
|
|
datas = JueJinManager.get_codes_limit_rate(gpcode_manager.get_gp_list())
|
matrix = numpy.array(datas)
|
codes = matrix[:, 0].tolist()
|
trade_gui.THSBuyWinManagerNew.fill_codes(codes)
|
|
|
if __name__ == '__main__':
|
datas = (get_volumns_by_code("603083", 150))
|
print(datas)
|
print(get_limit_up_money_percent(datas))
|