""" 掘金 """ from __future__ import print_function, absolute_import import datetime import json import logging import time as t import schedule import gm.api as gmapi import big_money_num_manager import client_manager import code_volumn_manager import constant import global_data_loader import global_util import gpcode_manager import threading import l2_trade_util import server import tool import redis_manager import authority import decimal import trade_gui from l2_code_operate import L2CodeOperate import l2_data_manager_new from log import logger_juejin_tick, logger_system from trade_data_manager import CodeActualPriceProcessor from trade_queue_manager import JueJinBuy1VolumnManager redisManager = redis_manager.RedisManager(0) __jueJinBuy1VolumnManager = JueJinBuy1VolumnManager() __actualPriceProcessor = CodeActualPriceProcessor() # 设置账户信息 def setAccountInfo(accountId, strategyId, token): redis = redisManager.getRedis() redis.set("juejin-account-id", accountId) redis.set("juejin-strategy-id", strategyId) redis.set("juejin-token", token) def getAccountInfo(): redis = redisManager.getRedis() account_id = redis.get("juejin-account-id") strategy_id = redis.get("juejin-strategy-id") token = redis.get("juejin-token") return account_id, strategy_id, token def init_data(): # 删除之前的分钟级大单撤单数据 l2_data_manager_new.SecondAverageBigNumComputer.clear_data() l2_data_manager_new.AverageBigNumComputer.clear_data() # 删除所有的涨停卖数据 l2_data_manager_new.L2LimitUpSellStatisticUtil.clear() # 重置所有的大单数据 big_money_num_manager.reset_all() # 清除水下捞数据 __actualPriceProcessor.clear_under_water_data() # 载入行业股票代码 global_data_loader.load_industry() # 载入代码自由流通市值 global_data_loader.load_zyltgb() # 载入量 global_data_loader.load_volumn() # 每日初始化 def everyday_init(): # 交易時間不能做初始化 if not tool.is_init_time(): raise Exception("交易时间不能初始化") codes = gpcode_manager.get_gp_list() logger_system.info("每日初始化") # 今日实时涨停 global_data_loader.add_limit_up_codes([], True) # 主要获取收盘价 get_latest_info(None) # 获取60天最大量与昨日量 global_util.today_volumn.clear() global_util.max60_volumn.clear() global_util.yesterday_volumn.clear() volumn_dict = get_volumns(codes) for key in volumn_dict: code_volumn_manager.set_histry_volumn(key, volumn_dict[key][0], volumn_dict[key][1]) # 清除大单数据 global_util.big_money_num.clear() # 初始化大单数据 for code in codes: big_money_num_manager.add_num(code, 0) big_money_num_manager.expire(code) # 清除涨停时间 global_util.limit_up_time.clear() init_data() # 初始化同花顺主站 l2_clients = client_manager.getValidL2Clients() for client in l2_clients: try: server.repair_ths_main_site(client) except Exception as e: pass def __run_schedule(): while True: schedule.run_pending() def init(context): # gmapi.subscribe(symbols="SZSE.002529", frequency="1d", count=30) # 订阅浦发银行, bar频率为一天和一分钟 # 订阅订阅多个频率的数据,可多次调用subscribe # 获取需要监听的股票 init_data() logger_system.info("掘金初始化") schedule.every().day.at("09:15:00").do(everyday_init) t1 = threading.Thread(target=lambda: __run_schedule()) # 后台运行 t1.setDaemon(True) t1.start() # 多个时间点获取收盘价 gmapi.schedule(schedule_func=get_latest_info, date_rule='1d', time_rule='08:30:00') gmapi.schedule(schedule_func=get_latest_info, date_rule='1d', time_rule='08:50:00') gmapi.schedule(schedule_func=get_latest_info, date_rule='1d', time_rule='09:28:00') gmapi.schedule(schedule_func=get_current_info, date_rule='1d', time_rule='09:25:00') gmapi.schedule(schedule_func=get_current_info, date_rule='1d', time_rule='09:29:00') re_subscribe_tick() # re_subscribe_bar() # 初始化内容 clients = authority.get_l2_clients() for client in clients: for i in range(0, 8): gpcode_manager.init_listen_code_by_pos(client, i) def get_latest_info(context): # 初始化内容 clients = authority.get_l2_clients() for c in clients: for i in range(0, 8): gpcode_manager.init_listen_code_by_pos(int(c), i) codes = gpcode_manager.get_gp_list(); result = JueJinManager.get_gp_latest_info(codes); for item in result: sec_level = item['sec_level'] symbol = item['symbol'] symbol = symbol.split(".")[1] pre_close = tool.to_price(decimal.Decimal(str(item['pre_close']))) if sec_level == 1: if symbol in codes: gpcode_manager.set_price_pre(symbol, pre_close) else: gpcode_manager.rm_gp(symbol) # 获取最新的信息 def get_current_info(): data = gpcode_manager.get_gp_list(); results = JueJinManager.get_gp_current_info(data); logger_juejin_tick.debug("定时获取:{}", results) for result in results: price = result["price"] symbol = result['symbol'] # 保存最新价 symbol = symbol.split(".")[1] accpt_price(symbol, price) # 设置收盘价 def re_set_price_pre(code): codes = [code] re_set_price_pres(codes) def re_set_price_pres(codes): result = JueJinManager.get_gp_latest_info(codes); for item in result: symbol = item['symbol'] symbol = symbol.split(".")[1] pre_close = tool.to_price(decimal.Decimal(str(item['pre_close']))) gpcode_manager.set_price_pre(symbol, pre_close) __prices_now = {} def on_tick(context, tick): # print(tick["created_at"]) relative_timestamp = t.time() % (24 * 60 * 60) + 8 * 60 * 60 # 9点20-15:05接受数据 start1 = 60 * 60 * 9 + 31 * 60 end1 = 60 * 60 * 11 + 35 * 60 start2 = 60 * 60 * 12 + 50 * 60 end2 = 60 * 60 * 15 + 5 * 60 # TODO 测试 if (start1 < relative_timestamp < end1 or start2 < relative_timestamp < end2) or constant.TEST: symbol = tick['symbol'] price = tick['price'] # print(symbol,price) # 价格有变化的时候才发送操作 # if symbol in __prices_now and __prices_now[symbol] == price: # return # 保存最新价 symbol = symbol.split(".")[1] JueJinManager.add_listen_code(symbol) time_ = tick["created_at"].strftime("%H:%M:%S") data_ = (symbol, time_, tick["quotes"][0]["bid_v"], tick["quotes"][0]["bid_p"]) logger_juejin_tick.info("买1量 {},{},{},{}", data_[1], data_[0], data_[2], data_[3]) # 暂时不采用 # need_sync = __jueJinBuy1VolumnManager.save(data_[0], data_[1], data_[2], data_[3]) # if need_sync: # # 同步数据 # L2LimitUpMoneyStatisticUtil.verify_num(data_[0], data_[2], data_[1]) # print(tick["created_at"],tick["quotes"][0]["bid_v"]) accpt_price(symbol, price) __prices_now[symbol] = price # 获取到现价 def accpt_price(code, price, price_from="juejin"): return gpcode_manager.set_price(code, price) # 获取收盘价 pricePre = gpcode_manager.get_price_pre(code) if pricePre is not None: rate = round((price - pricePre) * 100 / pricePre, 1) if rate >= 7: logger_juejin_tick.info("{}-{}-{}", code, price, rate) if not gpcode_manager.is_listen_old(code) and not gpcode_manager.is_operate( code) and not gpcode_manager.is_listen_full(): L2CodeOperate.get_instance().add_operate(1, code, "现价变化,rate-{} from-{}".format(rate, price_from)) # 进入监控 elif rate < 5: # 移除监控 if gpcode_manager.is_listen_old(code) and not gpcode_manager.is_operate(code): L2CodeOperate.get_instance().add_operate(0, code, "现价变化,rate-{} from-{}".format(rate, price_from)) # 获取到现价 def accpt_prices(prices): print("价格代码数量:", len(prices)) __actualPriceProcessor.save_current_price_codes_count(len(prices)) # 采集的代码数量不对 if len(gpcode_manager.get_gp_list()) - len(prices) > 2: return now_str = tool.get_now_time_str() now_strs = now_str.split(":") now_second = int(now_strs[0]) * 60 * 60 + int(now_strs[1]) * 60 + int(now_strs[2]) start = 60 * 60 * 9 + 31 * 60 if False: for d in prices: code, price = d["code"], float(d["price"]) accpt_price(code, price, "ths") else: _code_list = [] _delete_list = [] for d in prices: code, price = d["code"], float(d["price"]) gpcode_manager.set_price(code, price) # 获取收盘价 pricePre = gpcode_manager.get_price_pre(code) if pricePre is not None: rate = round((price - pricePre) * 100 / pricePre, 2) if rate >= 0: # 暂存涨幅为正的代码 _code_list.append((rate, code)) else: # 暂存涨幅为负的代码 _delete_list.append((rate, code)) try: __actualPriceProcessor.process_rate(code, rate, now_str) except Exception as e: logging.exception(e) try: __actualPriceProcessor.save_current_price(code, price, gpcode_manager.get_limit_up_price_by_preprice( pricePre) == tool.to_price( decimal.Decimal(d["price"]))) except Exception as e: logging.exception(e) # 排序 new_code_list = sorted(_code_list, key=lambda e: e.__getitem__(0), reverse=True) # 预填充下单代码 _buy_win_codes = [] for d in new_code_list: _buy_win_codes.append(d[1]) for d in _delete_list: _buy_win_codes.append(d[1]) try: trade_gui.THSBuyWinManagerNew.fill_codes(_buy_win_codes) except Exception as e: logging.exception(e) pass client_ids = client_manager.getValidL2Clients() # 最多填充的代码数量 max_count = len(client_ids) * 8 if max_count == 0: max_count = 8 # 截取前几个代码填充 add_list = new_code_list[:max_count] # 后面的代码全部删除 _delete_list.extend(new_code_list[max_count:]) add_code_list = [] del_list = [] for d in add_list: add_code_list.append(d[1]) for d in _delete_list: del_list.append(d[1]) # 后面的代码数量 # 先删除应该删除的代码 for code in del_list: if gpcode_manager.is_listen_old(code): # 判断是否在监听里面 L2CodeOperate.get_instance().add_operate(0, code, "现价变化") # 增加应该增加的代码 for code in add_code_list: if not gpcode_manager.is_listen_old(code): if not l2_trade_util.is_in_forbidden_trade_codes(code): L2CodeOperate.get_instance().add_operate(1, code, "现价变化") else: if l2_trade_util.is_in_forbidden_trade_codes(code): L2CodeOperate.get_instance().add_operate(0, code, "现价变化") print(add_code_list, del_list) def on_bar(context, bars): print("on_bar", bars) def re_subscribe_tick(): gpcode_list = gpcode_manager.get_gp_list_with_prefix() codes_str = ",".join(gpcode_list) print("订阅:", codes_str) # TODO gmapi.subscribe(symbols=codes_str, frequency='tick', count=2, wait_group=False, wait_group_timeout='10s', unsubscribe_previous=True) def re_subscribe_bar(): gpcode_list = gpcode_manager.get_gp_list_with_prefix() codes_str = ",".join(gpcode_list) print("订阅:", codes_str) gmapi.subscribe(symbols=codes_str, frequency='60s', count=1, unsubscribe_previous=True) def recieve_msg(pipe): while True: value = pipe.recv() print("跨进程通信:", value) jsonValue = json.loads(value) action = jsonValue["type"] if action == 'resub': re_subscribe_tick() elif action == 'accpt_price': try: datas = jsonValue["data"] for data in datas: accpt_price(data["code"], float(data["price"])) except Exception as e: print(str(e)) class JueJinManager: def __init__(self, pipe): self.pipe = pipe # 开启线程接受数据 t1 = threading.Thread(target=lambda: recieve_msg(pipe)) t1.setDaemon(True) t1.start() @staticmethod def get_gp_latest_info(codes): account_id, s_id, token = getAccountInfo() symbols = gpcode_manager.get_gp_list_with_prefix(codes) gmapi.set_token(token) data = gmapi.get_instruments(symbols=",".join(symbols)) print(data) return data @staticmethod def get_gp_current_info(codes): account_id, s_id, token = getAccountInfo() symbols = gpcode_manager.get_gp_list_with_prefix(codes) gmapi.set_token(token) data = gmapi.current(symbols=",".join(symbols)) print(data) return data def start(self): account_id, s_id, token = getAccountInfo() gmapi.run(strategy_id=s_id, filename='juejin.py', # todo 回测模式 mode=gmapi.MODE_LIVE, backtest_start_time='2022-08-18 09:25:00', backtest_end_time='2022-08-18 10:30:00', token=token) def stop(self): gmapi.stop() @classmethod def add_listen_code(cls, code): redis = redisManager.getRedis() redis.setex("juejin_listen_code-{}".format(code), 20, "1") @classmethod def get_listen_codes_lenth(cls): redis = redisManager.getRedis() keys = redis.keys("juejin_listen_code-*") return len(keys) def trade(code, volume): account_id, s_id, token = getAccountInfo() gmapi.set_token(token) gmapi.set_account_id(account_id) result = gmapi.order_volume(symbol=code, volume=volume, side=gmapi.OrderSide_Sell, order_type=gmapi.OrderType_Market, position_effect=gmapi.PositionEffect_Close) print(result) # 获取近90天的最大量与最近的量 def get_volumns(codes): end = datetime.datetime.now() # 获取近90天的历史数据 cha = datetime.timedelta(days=90) start = end - cha account_id, s_id, token = getAccountInfo() gmapi.set_token(token) gmapi.set_account_id(account_id) results = gmapi.history(symbol=gpcode_manager.get_gp_list_with_prefix(codes), frequency="1d", start_time="{:%Y-%m-%d}".format(start), fields="symbol,volume,eob", end_time="{:%Y-%m-%d}".format(end)) _fresult = {} for result in results: code = result["symbol"].split(".")[1] volumn = int(result["volume"]) day = "{:%Y-%m-%d}".format(result["eob"]) if _fresult.get(code) is None: _fresult[code] = (volumn, volumn) if volumn > _fresult[code][0]: _fresult[code] = (volumn, _fresult[code][1]) _fresult[code] = (_fresult[code][0], volumn) return _fresult if __name__ == '__main__': everyday_init()