| | |
| | | from huaxin_client import socket_util, l1_subscript_codes_manager |
| | | import xmdapi |
| | | from huaxin_client import tool, constant |
| | | from log_module.log import logger_system, logger_local_huaxin_l1, logger_l2_codes_subscript |
| | | from log_module.log import logger_system, logger_local_huaxin_l1, logger_l2_codes_subscript, logger_debug |
| | | from third_data import custom_block_in_money_manager |
| | | from utils import tool as out_tool |
| | | |
| | | ################B类################## |
| | | ADDRESS = "udp://224.224.1.19:7880" |
| | |
| | | |
| | | def subscribe_codes(self, codes_sh, codes_sz): |
| | | # 重新订阅代码 |
| | | print(f"订阅数量:sh-{len(codes_sh)} sz-{len(codes_sz)}") |
| | | if codes_sh: |
| | | ret = self.__api.SubscribeMarketData(codes_sh, xmdapi.TORA_TSTP_EXD_SSE) |
| | | if ret != 0: |
| | | print('SubscribeMarketData fail, ret[%d]' % ret) |
| | | # print('SubscribeMarketData fail, ret[%d]' % ret) |
| | | pass |
| | | else: |
| | | print('SubscribeMarketData success, ret[%d]' % ret) |
| | | # print('SubscribeMarketData success, ret[%d]' % ret) |
| | | pass |
| | | |
| | | if codes_sz: |
| | | ret = self.__api.SubscribeMarketData(codes_sz, xmdapi.TORA_TSTP_EXD_SZSE) |
| | | if ret != 0: |
| | | print('SubscribeMarketData fail, ret[%d]' % ret) |
| | | # print('SubscribeMarketData fail, ret[%d]' % ret) |
| | | pass |
| | | else: |
| | | print('SubscribeMarketData success, ret[%d]' % ret) |
| | | # print('SubscribeMarketData success, ret[%d]' % ret) |
| | | pass |
| | | |
| | | def OnRspUserLogin(self, pRspUserLoginField, pRspInfoField, nRequestID): |
| | | if pRspInfoField.ErrorID == 0: |
| | | print('Login success! [%d]' % nRequestID) |
| | | # print('Login success! [%d]' % nRequestID) |
| | | |
| | | ''' |
| | | 订阅行情 |
| | |
| | | |
| | | |
| | | else: |
| | | print('Login fail!!! [%d] [%d] [%s]' |
| | | % (nRequestID, pRspInfoField.ErrorID, pRspInfoField.ErrorMsg)) |
| | | pass |
| | | # print('Login fail!!! [%d] [%d] [%s]' |
| | | # % (nRequestID, pRspInfoField.ErrorID, pRspInfoField.ErrorMsg)) |
| | | |
| | | def OnRspSubMarketData(self, pSpecificSecurityField, pRspInfoField): |
| | | if pRspInfoField.ErrorID == 0: |
| | | print('OnRspSubMarketData: OK!') |
| | | # print('OnRspSubMarketData: OK!') |
| | | pass |
| | | else: |
| | | print('OnRspSubMarketData: Error! [%d] [%s]' |
| | | % (pRspInfoField.ErrorID, pRspInfoField.ErrorMsg)) |
| | | # print('OnRspSubMarketData: Error! [%d] [%s]' |
| | | # % (pRspInfoField.ErrorID, pRspInfoField.ErrorMsg)) |
| | | pass |
| | | |
| | | def OnRspUnSubMarketData(self, pSpecificSecurityField, pRspInfoField): |
| | | if pRspInfoField.ErrorID == 0: |
| | | print('OnRspUnSubMarketData: OK!') |
| | | # print('OnRspUnSubMarketData: OK!') |
| | | pass |
| | | else: |
| | | print('OnRspUnSubMarketData: Error! [%d] [%s]' |
| | | % (pRspInfoField.ErrorID, pRspInfoField.ErrorMsg)) |
| | | pass |
| | | # print('OnRspUnSubMarketData: Error! [%d] [%s]' |
| | | # % (pRspInfoField.ErrorID, pRspInfoField.ErrorMsg)) |
| | | |
| | | def OnRtnMarketData(self, pMarketDataField): |
| | | if pMarketDataField.SecurityName.find("S") == 0: |
| | | return |
| | | if pMarketDataField.SecurityName.find("ST") >= 0: |
| | | return |
| | | close_price = round(pMarketDataField.UpperLimitPrice / 1.1, 2) |
| | | rate = round((pMarketDataField.LastPrice - close_price) * 100 / close_price, 2) |
| | | # print(pMarketDataField.SecurityID, pMarketDataField.SecurityName, rate, pMarketDataField.Volume) |
| | | |
| | | close_price = pMarketDataField.PreClosePrice |
| | | lastPrice = pMarketDataField.LastPrice |
| | | if pMarketDataField.BidPrice1: |
| | | lastPrice = pMarketDataField.BidPrice1 |
| | | rate = round((lastPrice - close_price) * 100 / close_price, 2) |
| | | if out_tool.get_limit_up_rate(pMarketDataField.SecurityID) > 1.1001: |
| | | # 涨停板20%以上的打折 |
| | | rate = rate / 2 |
| | | # (代码, 现价, 涨幅, 量, 当前时间, 买1价, 买1量, 买2价, 买2量, 更新时间) |
| | | level1_data_dict[pMarketDataField.SecurityID] = ( |
| | | pMarketDataField.SecurityID, pMarketDataField.LastPrice, rate, pMarketDataField.Volume, time.time(), |
| | | pMarketDataField.BidPrice1, pMarketDataField.BidVolume1) |
| | | # print( |
| | | # "SecurityID[%s] SecurityName[%s] LastPrice[%.2f] Volume[%d] Turnover[%.2f] BidPrice1[%.2f] BidVolume1[%d] AskPrice1[%.2f] AskVolume1[%d] UpperLimitPrice[%.2f] LowerLimitPrice[%.2f]" |
| | | # % (pMarketDataField.SecurityID, pMarketDataField.SecurityName, pMarketDataField.LastPrice, |
| | | # pMarketDataField.Volume, |
| | | # pMarketDataField.Turnover, pMarketDataField.BidPrice1, pMarketDataField.BidVolume1, |
| | | # pMarketDataField.AskPrice1, |
| | | # pMarketDataField.AskVolume1, pMarketDataField.UpperLimitPrice, pMarketDataField.LowerLimitPrice)) |
| | | pMarketDataField.BidPrice1, pMarketDataField.BidVolume1, pMarketDataField.BidPrice2, |
| | | pMarketDataField.BidVolume2, pMarketDataField.UpdateTime) |
| | | |
| | | |
| | | __latest_subscript_codes = set() |
| | | |
| | | |
| | | def __upload_codes_info(queue_l1_w_strategy_r: multiprocessing.Queue, datas): |
| | | if not tool.is_trade_time(): |
| | | if not tool.is_trade_time() and not tool.is_pre_trade_time(): |
| | | return |
| | | # 上传数据 |
| | | type_ = "set_target_codes" |
| | |
| | | logger_local_huaxin_l1.info(f"({request_id})新增加订阅的代码:{add_codes}") |
| | | |
| | | |
| | | is_re_subscript = False |
| | | |
| | | |
| | | # 重新订阅代码 |
| | | def re_subscript(spi: MdSpi): |
| | | try: |
| | | global is_re_subscript |
| | | if is_re_subscript: |
| | | return |
| | | is_re_subscript = True |
| | | codes_sh, codes_sz = l1_subscript_codes_manager.request_l1_subscript_target_codes() |
| | | if len(codes_sh) > 100 and len(codes_sz) > 100: |
| | | logger_local_huaxin_l1.info(f"重新订阅 sh-{len(codes_sh)} sz-{len(codes_sz)}") |
| | |
| | | codes = set(data["data"]) |
| | | global __position_codes |
| | | __position_codes = codes |
| | | logger_local_huaxin_l1.info(f"收到策略消息:{data}", ) |
| | | except: |
| | | pass |
| | | finally: |
| | | time.sleep(1) |
| | | |
| | | |
| | | def run(queue_l1_w_strategy_r, queue_l1_r_strategy_w): |
| | | logger_local_huaxin_l1.info("运行l1订阅服务") |
| | | def __run_subscript_task(spi): |
| | | """ |
| | | 运行订阅任务,在9:19到9:29之间开始订阅 |
| | | @return: |
| | | """ |
| | | is_re_subscript = False |
| | | while True: |
| | | try: |
| | | # 判断是否需要重新订阅 |
| | | if tool.is_pre_trade_time(): |
| | | re_subscript(spi) |
| | | is_re_subscript = True |
| | | if is_re_subscript: |
| | | break |
| | | except: |
| | | pass |
| | | finally: |
| | | time.sleep(3) |
| | | |
| | | |
| | | def run(queue_l1_w_strategy_r, queue_l1_r_strategy_w, fixed_codes=None): |
| | | """ |
| | | 运行l1订阅任务 |
| | | |
| | | @param queue_l1_w_strategy_r: L1方写,策略方读 |
| | | @param queue_l1_r_strategy_w: L1方读,策略方写 |
| | | @param fixed_codes: 固定要返回数据的代码 |
| | | @return: |
| | | """ |
| | | if fixed_codes is None: |
| | | fixed_codes = set() |
| | | logger_local_huaxin_l1.info(f"运行l1订阅服务,固定代码:{fixed_codes}") |
| | | codes_sh = [] |
| | | codes_sz = [] |
| | | for i in range(15): |
| | | try: |
| | | logger_local_huaxin_l1.info("开始获取目标代码") |
| | | codes_sh, codes_sz = l1_subscript_codes_manager.get_codes() |
| | | logger_local_huaxin_l1.info(f"获取上证,深证代码数量:sh-{len(codes_sh)} sz-{len(codes_sz)}") |
| | | break |
| | |
| | | # "002292", 8.06, 9.96, 969500 * 100, time.time()) |
| | | |
| | | threading.Thread(target=__read_from_strategy, args=(queue_l1_r_strategy_w,), daemon=True).start() |
| | | |
| | | threading.Thread(target=__run_subscript_task, args=(spi,), daemon=True).start() |
| | | |
| | | # 等待程序结束 |
| | | while True: |
| | | print("数量", len(level1_data_dict)) |
| | | # print("数量", len(level1_data_dict)) |
| | | try: |
| | | if len(level1_data_dict) < 1: |
| | | continue |
| | |
| | | # (代码,现价,涨幅,量,时间) |
| | | list_ = [level1_data_dict[k] for k in level1_data_dict] |
| | | flist = [] |
| | | plist = [] |
| | | now_time_int = int(tool.get_now_time_str().replace(":", "")) |
| | | threshold_rate = constant.L1_MIN_RATE |
| | | for d in list_: |
| | | if d[2] >= constant.L1_MIN_RATE: |
| | | # 涨幅小于5%的需要删除 |
| | | if d[2] >= threshold_rate or d[0] in fixed_codes: |
| | | # 涨幅小于3%的需要删除 |
| | | flist.append(d) |
| | | if d[0] in __position_codes: |
| | | plist.append(d) |
| | | flist.sort(key=lambda x: x[2], reverse=True) |
| | | datas = flist[:200] |
| | | # 将持仓股加入进去 |
| | | datas.extend(plist) |
| | | print("代码数量:", len(datas)) |
| | | logger_l2_codes_subscript.info("开始#华鑫L1上传代码:数量-{}", len(datas)) |
| | | __upload_codes_info(queue_l1_w_strategy_r, datas) |
| | | # 将固定代码的排在最前 |
| | | for code in fixed_codes: |
| | | if code in level1_data_dict: |
| | | flist.insert(0, level1_data_dict[code]) |
| | | # 正式交易之前先处理比较少的数据,不然处理时间久造成数据拥堵 |
| | | MAX_COUNT = 500 |
| | | if now_time_int < int("092600"): |
| | | MAX_COUNT = 200 |
| | | elif now_time_int < int("092800"): |
| | | MAX_COUNT = 300 |
| | | elif now_time_int < int("092900"): |
| | | MAX_COUNT = 400 |
| | | datas = flist[:MAX_COUNT] |
| | | if len(datas) > 0: |
| | | logger_l2_codes_subscript.info("开始#华鑫L1上传代码:数量-{}", len(datas)) |
| | | __upload_codes_info(queue_l1_w_strategy_r, datas) |
| | | except Exception as e: |
| | | logging.exception(e) |
| | | logger_debug.exception(e) |
| | | finally: |
| | | time.sleep(3) |
| | | try: |
| | | # 判断是否需要重新订阅 |
| | | if tool.is_pre_trade_time(): |
| | | re_subscript(spi) |
| | | else: |
| | | global is_re_subscript |
| | | is_re_subscript = False |
| | | except: |
| | | pass |
| | | |
| | | # 释放接口对象 |
| | | api.Release() |