""" 掘金 """ from __future__ import print_function, absolute_import import datetime import json import logging import time as t import numpy import schedule import gm.api as gmapi import big_money_num_manager import client_manager import code_volumn_manager import constant import global_data_loader import global_util import gpcode_first_screen_manager import gpcode_manager import threading import server import tool from db import redis_manager import authority import decimal from trade import trade_gui, l2_trade_util from l2.cancel_buy_strategy import L2LimitUpSellStatisticUtil from l2_code_operate import L2CodeOperate from log import logger_juejin_tick, logger_system from trade.trade_data_manager import CodeActualPriceProcessor from trade.trade_queue_manager import JueJinBuy1VolumnManager redisManager = redis_manager.RedisManager(0) __jueJinBuy1VolumnManager = JueJinBuy1VolumnManager() __actualPriceProcessor = CodeActualPriceProcessor() # 设置账户信息 def setAccountInfo(accountId, strategyId, token): redis = redisManager.getRedis() redis.set("juejin-account-id", accountId) redis.set("juejin-strategy-id", strategyId) redis.set("juejin-token", token) def getAccountInfo(): redis = redisManager.getRedis() account_id = redis.get("juejin-account-id") strategy_id = redis.get("juejin-strategy-id") token = redis.get("juejin-token") return account_id, strategy_id, token def init_data(): # 删除所有的涨停卖数据 L2LimitUpSellStatisticUtil.clear() # 重置所有的大单数据 big_money_num_manager.reset_all() # 清除水下捞数据 __actualPriceProcessor.clear_under_water_data() # 载入行业股票代码 global_data_loader.load_industry() # 载入代码自由流通市值 global_data_loader.load_zyltgb() # 载入量 global_data_loader.load_volumn() # 9点25之前删除所有代码 if tool.trade_time_sub(tool.get_now_time_str(), "09:25:00") <= 0: # 删除L2监听代码 gpcode_manager.clear_listen_codes() # 删除首板代码 gpcode_manager.clear_first_codes() # 删除首板未筛选代码 gpcode_first_screen_manager.clear_first_no_screen_codes() # 删除禁止代码 l2_trade_util.init_forbidden_trade_codes() # 清空白名单 l2_trade_util.WhiteListCodeManager.clear() # 清空想要买 gpcode_manager.WantBuyCodesManager.clear() # 每日初始化 def everyday_init(): # 交易時間不能做初始化 if not tool.is_init_time(): raise Exception("交易时间不能初始化") codes = gpcode_manager.get_gp_list() logger_system.info("每日初始化") # 今日实时涨停 global_data_loader.add_limit_up_codes([], True) # 主要获取收盘价 get_latest_info(None) # 获取60天最大量与昨日量 global_util.today_volumn.clear() global_util.max60_volumn.clear() global_util.yesterday_volumn.clear() volumn_dict = get_volumns(codes) for key in volumn_dict: code_volumn_manager.set_histry_volumn(key, volumn_dict[key][0], volumn_dict[key][1]) # 清除大单数据 global_util.big_money_num.clear() # 初始化大单数据 for code in codes: big_money_num_manager.add_num(code, 0) big_money_num_manager.expire(code) # 清除涨停时间 global_util.limit_up_time.clear() init_data() # 初始化同花顺主站 l2_clients = client_manager.getValidL2Clients() for client in l2_clients: try: server.repair_ths_main_site(client) except Exception as e: pass def __run_schedule(): while True: schedule.run_pending() def init(context): # gmapi.subscribe(symbols="SZSE.002529", frequency="1d", count=30) # 订阅浦发银行, bar频率为一天和一分钟 # 订阅订阅多个频率的数据,可多次调用subscribe # 获取需要监听的股票 init_data() logger_system.info("掘金初始化") schedule.every().day.at("09:15:00").do(everyday_init) t1 = threading.Thread(target=lambda: __run_schedule()) # 后台运行 t1.setDaemon(True) t1.start() # 多个时间点获取收盘价 gmapi.schedule(schedule_func=get_latest_info, date_rule='1d', time_rule='08:30:00') gmapi.schedule(schedule_func=get_latest_info, date_rule='1d', time_rule='08:50:00') gmapi.schedule(schedule_func=get_latest_info, date_rule='1d', time_rule='09:28:00') gmapi.schedule(schedule_func=get_current_info, date_rule='1d', time_rule='09:25:00') gmapi.schedule(schedule_func=get_current_info, date_rule='1d', time_rule='09:29:00') re_subscribe_tick() # re_subscribe_bar() # 初始化内容 clients = authority.get_l2_clients() for client in clients: for i in range(0, constant.L2_CODE_COUNT_PER_DEVICE): gpcode_manager.init_listen_code_by_pos(client, i) def get_latest_info(context): # 初始化内容 clients = authority.get_l2_clients() for c in clients: for i in range(0, constant.L2_CODE_COUNT_PER_DEVICE): gpcode_manager.init_listen_code_by_pos(int(c), i) codes = gpcode_manager.get_gp_list() result = JueJinManager.get_gp_latest_info(codes) for item in result: sec_level = item['sec_level'] symbol = item['symbol'] symbol = symbol.split(".")[1] pre_close = tool.to_price(decimal.Decimal(str(item['pre_close']))) if sec_level == 1: if symbol in codes: gpcode_manager.set_price_pre(symbol, pre_close) else: gpcode_manager.rm_gp(symbol) # 获取最新的信息 def get_current_info(): data = gpcode_manager.get_gp_list() results = JueJinManager.get_gp_current_info(data) logger_juejin_tick.debug("定时获取:{}", results) for result in results: price = result["price"] symbol = result['symbol'] # 保存最新价 symbol = symbol.split(".")[1] accpt_price(symbol, price) # 设置收盘价 def re_set_price_pre(code): codes = [code] re_set_price_pres(codes) def re_set_price_pres(codes): result = JueJinManager.get_gp_latest_info(codes) for item in result: symbol = item['symbol'] symbol = symbol.split(".")[1] pre_close = tool.to_price(decimal.Decimal(str(item['pre_close']))) gpcode_manager.set_price_pre(symbol, pre_close) __prices_now = {} def on_tick(context, tick): # print(tick["created_at"]) relative_timestamp = t.time() % (24 * 60 * 60) + 8 * 60 * 60 # 9点20-15:05接受数据 start1 = 60 * 60 * 9 + 31 * 60 end1 = 60 * 60 * 11 + 35 * 60 start2 = 60 * 60 * 12 + 50 * 60 end2 = 60 * 60 * 15 + 5 * 60 # TODO 测试 if (start1 < relative_timestamp < end1 or start2 < relative_timestamp < end2) or constant.TEST: symbol = tick['symbol'] price = tick['price'] # print(symbol,price) # 价格有变化的时候才发送操作 # if symbol in __prices_now and __prices_now[symbol] == price: # return # 保存最新价 symbol = symbol.split(".")[1] JueJinManager.add_listen_code(symbol) time_ = tick["created_at"].strftime("%H:%M:%S") data_ = (symbol, time_, tick["quotes"][0]["bid_v"], tick["quotes"][0]["bid_p"]) logger_juejin_tick.info("买1量 {},{},{},{}", data_[1], data_[0], data_[2], data_[3]) # 暂时不采用 # need_sync = __jueJinBuy1VolumnManager.save(data_[0], data_[1], data_[2], data_[3]) # if need_sync: # # 同步数据 # L2LimitUpMoneyStatisticUtil.verify_num(data_[0], data_[2], data_[1]) # print(tick["created_at"],tick["quotes"][0]["bid_v"]) accpt_price(symbol, price) __prices_now[symbol] = price # 获取到现价 def accpt_price(code, price, price_from="juejin"): return gpcode_manager.set_price(code, price) # 获取收盘价 pricePre = gpcode_manager.get_price_pre(code) if pricePre is not None: rate = round((price - pricePre) * 100 / pricePre, 1) if rate >= 7: logger_juejin_tick.info("{}-{}-{}", code, price, rate) if not gpcode_manager.is_listen_old(code) and not gpcode_manager.is_operate( code) and not gpcode_manager.is_listen_full(): L2CodeOperate.get_instance().add_operate(1, code, "现价变化,rate-{} from-{}".format(rate, price_from)) # 进入监控 elif rate < 5: # 移除监控 if gpcode_manager.is_listen_old(code) and not gpcode_manager.is_operate(code): L2CodeOperate.get_instance().add_operate(0, code, "现价变化,rate-{} from-{}".format(rate, price_from)) # 获取到现价 def accept_prices(prices): # 获取首板代码 first_codes = gpcode_manager.get_first_gp_codes() print("价格代码数量:", len(prices)) __actualPriceProcessor.save_current_price_codes_count(len(prices)) # 采集的代码数量不对 if len(gpcode_manager.get_gp_list()) - len(prices) > 10: return now_str = tool.get_now_time_str() now_strs = now_str.split(":") now_second = int(now_strs[0]) * 60 * 60 + int(now_strs[1]) * 60 + int(now_strs[2]) start = 60 * 60 * 9 + 31 * 60 if False: for d in prices: code, price = d["code"], float(d["price"]) accpt_price(code, price, "ths") else: _code_list = [] _delete_list = [] for d in prices: code, price = d["code"], float(d["price"]) gpcode_manager.set_price(code, price) # 获取收盘价 pricePre = gpcode_manager.get_price_pre(code) if pricePre is not None: rate = round((price - pricePre) * 100 / pricePre, 2) if first_codes and code in first_codes: rate = rate / 2 if rate >= 0: # 暂存涨幅为正的代码 _code_list.append((rate, code)) else: # 暂存涨幅为负的代码 _delete_list.append((rate, code)) try: __actualPriceProcessor.process_rate(code, rate, now_str) except Exception as e: logging.exception(e) try: __actualPriceProcessor.save_current_price(code, price, gpcode_manager.get_limit_up_price_by_preprice( pricePre) == tool.to_price( decimal.Decimal(d["price"]))) except Exception as e: logging.exception(e) # -------------------------------处理交易位置分配--------------------------------- # 排序 new_code_list = sorted(_code_list, key=lambda e: (e.__getitem__(0), e.__getitem__(1)), reverse=True) # 预填充下单代码 _buy_win_codes = [] for d in new_code_list: _buy_win_codes.append(d[1]) for d in _delete_list: _buy_win_codes.append(d[1]) try: trade_gui.THSBuyWinManagerNew.fill_codes(_buy_win_codes) except Exception as e: logging.exception(e) pass # -------------------------------处理L2监听--------------------------------- client_ids = client_manager.getValidL2Clients() # 最多填充的代码数量 max_count = len(client_ids) * constant.L2_CODE_COUNT_PER_DEVICE if max_count == 0: max_count = constant.L2_CODE_COUNT_PER_DEVICE _delete_list = [] for item in new_code_list: if l2_trade_util.is_in_forbidden_trade_codes(item[1]) or item[0] < 0: _delete_list.append(item) for item in _delete_list: new_code_list.remove(item) # 截取前几个代码填充 add_list = new_code_list[:max_count] # 后面的代码全部删除 _delete_list.extend(new_code_list[max_count:]) add_code_list = [] del_code_list = [] for d in add_list: add_code_list.append(d[1]) for d in _delete_list: del_code_list.append(d[1]) # 后面的代码数量 # 先删除应该删除的代码 for code in del_code_list: if gpcode_manager.is_listen_old(code): # 判断是否在监听里面 L2CodeOperate.get_instance().add_operate(0, code, "现价变化") # 增加应该增加的代码 for code in add_code_list: if not gpcode_manager.is_listen_old(code): L2CodeOperate.get_instance().add_operate(1, code, "现价变化") # 获取卡位数量 free_count = gpcode_manager.get_free_listen_pos_count() if free_count < 2: # 空闲位置不足 listen_codes = gpcode_manager.get_listen_codes() for code in listen_codes: if not gpcode_manager.is_in_gp_pool(code): client_id, pos = gpcode_manager.get_listen_code_pos(code) gpcode_manager.set_listen_code_by_pos(client_id, pos, "") free_count += 1 if free_count > 2: break print(add_code_list, del_code_list) def on_bar(context, bars): print("on_bar", bars) def re_subscribe_tick(): gpcode_list = gpcode_manager.get_gp_list_with_prefix() codes_str = ",".join(gpcode_list) print("订阅:", codes_str) # TODO gmapi.subscribe(symbols=codes_str, frequency='tick', count=2, wait_group=False, wait_group_timeout='10s', unsubscribe_previous=True) def re_subscribe_bar(): gpcode_list = gpcode_manager.get_gp_list_with_prefix() codes_str = ",".join(gpcode_list) print("订阅:", codes_str) gmapi.subscribe(symbols=codes_str, frequency='60s', count=1, unsubscribe_previous=True) def recieve_msg(pipe): while True: value = pipe.recv() print("跨进程通信:", value) jsonValue = json.loads(value) action = jsonValue["type"] if action == 'resub': re_subscribe_tick() elif action == 'accpt_price': try: datas = jsonValue["data"] for data in datas: accpt_price(data["code"], float(data["price"])) except Exception as e: print(str(e)) class JueJinManager: def __init__(self, pipe): self.pipe = pipe # 开启线程接受数据 t1 = threading.Thread(target=lambda: recieve_msg(pipe)) t1.setDaemon(True) t1.start() @classmethod def get_gp_latest_info(cls, codes): account_id, s_id, token = getAccountInfo() symbols = gpcode_manager.get_gp_list_with_prefix(codes) gmapi.set_token(token) data = gmapi.get_instruments(symbols=",".join(symbols)) print(data) return data @classmethod def get_now_price(cls, codes): data = JueJinManager.get_gp_current_info(codes) prices = [] for item in data: code = item["symbol"].split(".")[1] price = item["price"] prices.append((code, price)) return prices # 获取代码的涨幅 @classmethod def get_codes_limit_rate(cls, codes): datas = JueJinManager.get_gp_latest_info(codes) pre_price_dict = {} for data in datas: code = data["sec_id"] pre_close = tool.to_price(decimal.Decimal(str(data['pre_close']))) pre_price_dict[code] = pre_close now_prices = JueJinManager.get_now_price(codes) f_results = [] for data in now_prices: code = data[0] price = data[1] pre_price = float(pre_price_dict.get(code)) rate = round((price - pre_price) * 100 / pre_price, 2) f_results.append((code, rate)) f_results.sort(key=lambda tup: tup[1]) f_results.reverse() return f_results @classmethod def get_history_tick_n(cls, code, count, fields=None): account_id, s_id, token = getAccountInfo() symbols = gpcode_manager.get_gp_list_with_prefix([code]) gmapi.set_token(token) results = gmapi.history_n(symbol=symbols[0], frequency="1d", count=count, fields=fields) return results @classmethod def get_lowest_price_rate(cls, code, count): datas = cls.get_history_tick_n(code, count) low_price = datas[0]["close"] for data in datas: if low_price > data["close"]: low_price = data["close"] return (datas[-1]["close"] - low_price) / low_price @classmethod def get_gp_current_info(cls, codes): account_id, s_id, token = getAccountInfo() symbols = gpcode_manager.get_gp_list_with_prefix(codes) gmapi.set_token(token) data = gmapi.current(symbols=",".join(symbols)) print(data) return data @classmethod def get_gp_codes_names(cls, codes): datas = cls.get_gp_latest_info(codes) results = {} for data in datas: code = data["symbol"].split(".")[1] code_name = data['sec_name'] results[code] = code_name return results def start(self): account_id, s_id, token = getAccountInfo() gmapi.run(strategy_id=s_id, filename='juejin.py', # todo 回测模式 mode=gmapi.MODE_LIVE, backtest_start_time='2022-08-18 09:25:00', backtest_end_time='2022-08-18 10:30:00', token=token) def stop(self): gmapi.stop() @classmethod def add_listen_code(cls, code): redis = redisManager.getRedis() redis.setex("juejin_listen_code-{}".format(code), 20, "1") @classmethod def get_listen_codes_lenth(cls): redis = redisManager.getRedis() keys = redis.keys("juejin_listen_code-*") return len(keys) @classmethod def get_previous_trading_date(cls, date): account_id, s_id, token = getAccountInfo() gmapi.set_token(token) return gmapi.get_previous_trading_date("SHSE", date) def trade(code, volume): account_id, s_id, token = getAccountInfo() gmapi.set_token(token) gmapi.set_account_id(account_id) result = gmapi.order_volume(symbol=code, volume=volume, side=gmapi.OrderSide_Sell, order_type=gmapi.OrderType_Market, position_effect=gmapi.PositionEffect_Close) print(result) # 获取近90天的最大量与最近的量 def get_volumns(codes): end = datetime.datetime.now() # 获取近90天的历史数据 cha = datetime.timedelta(days=90) start = end - cha account_id, s_id, token = getAccountInfo() gmapi.set_token(token) gmapi.set_account_id(account_id) results = gmapi.history(symbol=gpcode_manager.get_gp_list_with_prefix(codes), frequency="1d", start_time="{:%Y-%m-%d}".format(start), fields="symbol,volume,eob", end_time="{:%Y-%m-%d}".format(end)) _fresult = {} for result in results: code = result["symbol"].split(".")[1] volumn = int(result["volume"]) day = "{:%Y-%m-%d}".format(result["eob"]) if _fresult.get(code) is None: _fresult[code] = (volumn, volumn) if volumn > _fresult[code][0]: _fresult[code] = (volumn, _fresult[code][1]) _fresult[code] = (_fresult[code][0], volumn) return _fresult # 获取近90天的最大量与最近的量 # 获取最近一次涨停/涨停下一个交易日的最大值 def get_volumns_by_code(code, count=60) -> object: datas = JueJinManager.get_history_tick_n(code, count, "open,high,low,close,volume,pre_close,bob") # 计算 datas.sort(key=lambda x: x["bob"], reverse=True) return datas # 解析最大量 def parse_max_volume(datas, is_new_top=False): max_volume = 0 max_volume_date = None if is_new_top: # 如果是突破前高就取最大量 for item in datas: if max_volume < item["volume"]: max_volume = item["volume"] max_volume_date = item["bob"] else: date = None for i in range(len(datas)): # 查询涨停 item = datas[i] volume = item["volume"] if max_volume < volume: max_volume = volume max_volume_date = item['bob'] # 是否有涨停 limit_up_price = float(gpcode_manager.get_limit_up_price_by_preprice(item["pre_close"])) if abs(limit_up_price - item["high"]) < 0.01: next_volume = 0 if i > 0: next_volume = datas[i - 1]["volume"] date = datas[i]["bob"] if volume < next_volume: volume = next_volume date = datas[i - 1]["bob"] return volume, volume, date.strftime("%Y-%m-%d") return max_volume, max_volume, max_volume_date.strftime("%Y-%m-%d") # 是否有涨停 def is_have_limit_up(datas): for i in range(len(datas)): item = datas[i] limit_up_price = float(gpcode_manager.get_limit_up_price_by_preprice(item["pre_close"])) if abs(limit_up_price - item["close"]) < 0.01: return True return False # 首板涨停溢价率 def get_limit_up_money_percent(datas): datas.sort(key=lambda x: x["bob"]) first_rate_list = [] for i in range(0, len(datas)): item = datas[i] limit_up_price = float(gpcode_manager.get_limit_up_price_by_preprice(item["pre_close"])) if abs(limit_up_price - item["close"]) < 0.001 and abs( limit_up_price - datas[i - 1]["close"]) >= 0.001 and 0 < i < len(datas) - 1: # 首板涨停 rate = (datas[i + 1]["high"] - datas[i + 1]["pre_close"]) / datas[i + 1]["pre_close"] first_rate_list.append(rate) if not first_rate_list: return 1 count = 0 for rate in first_rate_list: if rate >= 0.01: count += 1 return count / len(first_rate_list) # 根据涨幅高低分配交易窗口 def distribute_buy_win(): if tool.trade_time_sub(tool.get_now_time_str(), "09:30:00") > 0: raise Exception("只能9:30之前重新分配窗口") datas = JueJinManager.get_codes_limit_rate(gpcode_manager.get_gp_list()) matrix = numpy.array(datas) codes = matrix[:, 0].tolist() trade_gui.THSBuyWinManagerNew.fill_codes(codes) if __name__ == '__main__': datas = (get_volumns_by_code("603083", 150)) print(datas) print(get_limit_up_money_percent(datas))