huaxin_client/l2_client.py | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
huaxin_client/l2_data_manager.py | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
l2/l2_data_manager_new.py | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
trade/huaxin/trade_server.py | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 |
huaxin_client/l2_client.py
@@ -51,6 +51,8 @@ # 代码的上次成交的订单唯一索引 __last_transaction_keys_dict = {} # 买入的大单订单号 def __init__(self, api): lev2mdapi.CTORATstpLev2MdSpi.__init__(self) self.__api = api @@ -269,6 +271,10 @@ min_volume, limit_up_price = self.codes_volume_and_price_dict.get(code) # 输出逐笔成交数据 if pTransaction['ExecType'] == b"2": transaction_big_order_no = l2_data_manager.get_latest_transaction_order_no(code) if transaction_big_order_no == pTransaction['BuyNo']: # 正在成交的订单撤单了 l2_data_manager.trading_order_canceled(code, pTransaction['BuyNo']) if min_volume is None: # 默认筛选50w if pTransaction['TradePrice'] * pTransaction['Volume'] < 500000: @@ -294,6 +300,7 @@ item["Side"] = "2" # 深证撤单 print("逐笔委托", item) l2_data_manager.add_l2_order_detail(item, True) else: if abs(pTransaction['TradePrice'] - limit_up_price) < 0.201: @@ -336,6 +343,12 @@ else: self.special_code_volume_for_order_dict.pop(code) if not can_listen: transaction_big_order_no = l2_data_manager.get_latest_transaction_order_no(code) if transaction_big_order_no == pOrderDetail['OrderNO'] and pOrderDetail['OrderStatus'] == b'D': # 正在成交的订单撤单了 l2_data_manager.trading_order_canceled(code,pOrderDetail['OrderNO']) min_volume, limit_up_price = self.codes_volume_and_price_dict.get(code) if min_volume is None: # 默认筛选50w @@ -554,6 +567,7 @@ t1.start() __init_l2() l2_data_manager.run_upload_common() l2_data_manager.run_upload_trading_canceled() global l2CommandManager l2CommandManager = command_manager.L2CommandManager() l2CommandManager.init(MyL2ActionCallback()) huaxin_client/l2_data_manager.py
@@ -10,7 +10,7 @@ from huaxin_client.client_network import SendResponseSkManager # 活动时间 from log_module.log import logger_local_huaxin_l2_upload, logger_local_huaxin_l2_error from log_module.log import logger_local_huaxin_l2_error order_detail_upload_active_time_dict = {} transaction_upload_active_time_dict = {} @@ -19,6 +19,21 @@ tmep_transaction_queue_dict = {} target_codes = set() common_queue = queue.Queue() trading_canceled_queue = queue.Queue() # 买入订单号的字典 buy_order_nos_dict = {} # 最近的大单成交单号 latest_big_order_transaction_order_dict = {} # 获取最近的大单成交订单号 def get_latest_transaction_order_no(code): return latest_big_order_transaction_order_dict.get(code) # 正在成交的订单撤单了 def trading_order_canceled(code_, order_no): trading_canceled_queue.put((code_, order_no)) # 添加委托详情 @@ -37,6 +52,11 @@ # "OrderTime": pOrderDetail['OrderTime'], "MainSeq": pOrderDetail['MainSeq'], # "SubSeq": pOrderDetail['SubSeq'], "OrderNO": pOrderDetail['OrderNO'], # "OrderStatus": pOrderDetail['OrderStatus'].decode()} if data['Side'] == "1": # 记录所有买入的订单号 if data['SecurityID'] not in buy_order_nos_dict: buy_order_nos_dict[data['SecurityID']] = set() buy_order_nos_dict[data['SecurityID']].add(data['OrderNO']) tmep_order_detail_queue_dict[code].put( (data['SecurityID'], data['Price'], data['Volume'], data['Side'], data['OrderType'], data['OrderTime'], @@ -48,7 +68,6 @@ code = data["SecurityID"] if code not in tmep_transaction_queue_dict: tmep_transaction_queue_dict[code] = queue.Queue() # 原来的格式 # item = {"SecurityID": pTransaction['SecurityID'], "TradePrice": pTransaction['TradePrice'], # "TradeVolume": pTransaction['TradeVolume'], @@ -56,6 +75,11 @@ # "SubSeq": pTransaction['SubSeq'], "BuyNo": pTransaction['BuyNo'], "SellNo": pTransaction['SellNo'], # "ExecType": pTransaction['ExecType'].decode()} # 判断是否为大单成交 code = data['SecurityID'] if code in buy_order_nos_dict: if data['BuyNo'] in buy_order_nos_dict[code]: latest_big_order_transaction_order_dict[code] = data['BuyNo'] tmep_transaction_queue_dict[code].put((data['SecurityID'], data['TradePrice'], data['TradeVolume'], data['OrderTime'], data['MainSeq'], data['SubSeq'], data['BuyNo'], data['SellNo'], data['ExecType'])) @@ -114,15 +138,16 @@ # print("请求开始", uid, len(datas), len(fdata), f"{fdata[:20]}...{fdata[-20:]}") result = None start_time = time.time() logger_local_huaxin_l2_upload.info(f"{code} 上传数据开始-{_type}") # logger_local_huaxin_l2_upload.info(f"{code} 上传数据开始-{_type}") try: result = send_response(key, fdata.encode('utf-8')) except Exception as e: logging.exception(e) finally: pass # print("请求结束", uid, result) logger_local_huaxin_l2_upload.info( f"{code} 上传数据耗时-{_type}: {round((time.time() - start_time) * 1000, 1)} 数据量:{len(datas)}") # logger_local_huaxin_l2_upload.info( # f"{code} 上传数据耗时-{_type}: {round((time.time() - start_time) * 1000, 1)} 数据量:{len(datas)}") # print("上传结果", result) @@ -183,6 +208,17 @@ logger_local_huaxin_l2_error.error(f"上传普通数据出错:{str(e)}") def __run_upload_trading_canceled(): print("__run_upload_trading_canceled") while True: try: temp = trading_canceled_queue.get() upload_data(temp[0], "trading_order_canceled", temp[1]) except Exception as e: logger_local_huaxin_l2_error.exception(e) logger_local_huaxin_l2_error.error(f"上传普通数据出错:{str(e)}") # 运行上传任务 def run_upload_task(code): # 如果代码没有在目标代码中就不需要运行 @@ -203,6 +239,11 @@ t.start() def run_upload_trading_canceled(): t = threading.Thread(target=lambda: __run_upload_trading_canceled(), daemon=True) t.start() if __name__ == "__main__": code = "603809" target_codes.add(code) l2/l2_data_manager_new.py
@@ -1152,10 +1152,6 @@ @classmethod def __sum_buy_num_for_order_3(cls, code, compute_start_index, compute_end_index, origin_num, origin_count, threshold_money, buy_single_index, max_num_set): def get_threshold_count(): count = threshold_count return count _start_time = t.time() total_datas = local_today_datas[code] # is_first_code = gpcode_manager.FirstCodeManager().is_in_first_record_cache(code) @@ -1213,9 +1209,9 @@ # 只统计59万以上的金额 buy_nums += int(_val["num"]) * int(total_datas[i]["re"]) buy_count += int(total_datas[i]["re"]) if buy_nums >= threshold_num and buy_count >= get_threshold_count(): if buy_nums >= threshold_num and buy_count >= threshold_count: logger_l2_trade_buy.info( f"{code}获取到买入执行点:{i} 统计纯买手数:{buy_nums} 目标纯买手数:{threshold_num} 统计纯买单数:{buy_count} 目标纯买单数:{get_threshold_count()}, 大单数量:{len(max_buy_num_set)}") f"{code}获取到买入执行点:{i} 统计纯买手数:{buy_nums} 目标纯买手数:{threshold_num} 统计纯买单数:{buy_count} 目标纯买单数:{threshold_count}, 大单数量:{len(max_buy_num_set)}") elif L2DataUtil.is_limit_up_price_buy_cancel(_val): if _val["num"] >= bigger_num: # 只统计59万以上的金额 @@ -1254,13 +1250,13 @@ for i in max_buy_num_set: max_buy_num_set_count += total_datas[i]["re"] # 有撤单信号,且小于阈值 if buy_nums >= threshold_num and buy_count >= get_threshold_count() and trigger_buy and max_buy_num_set_count >= big_num_count: if buy_nums >= threshold_num and buy_count >= threshold_count and trigger_buy and max_buy_num_set_count >= big_num_count: return i, buy_nums, buy_count, None, max_buy_num_set l2_log.buy_debug(code, "尚未获取到买入执行点,起始计算位置:{} 统计纯买手数:{} 目标纯买手数:{} 统计纯买单数:{} 目标纯买单数:{} 大单数量:{} 目标大单数量:{}", compute_start_index, buy_nums, threshold_num, buy_count, get_threshold_count(), max_buy_num_set_count, big_num_count) threshold_num, buy_count, threshold_count, max_buy_num_set_count, big_num_count) return None, buy_nums, buy_count, None, max_buy_num_set trade/huaxin/trade_server.py
@@ -332,6 +332,13 @@ logging.exception(e) finally: sk.sendall(socket_util.load_header(json.dumps({"code": 0}).encode(encoding='utf-8'))) elif data_json["type"] == "trading_order_canceled": data = data_json["data"] code = data["code"] order_no = data["data"] hx_logger_l2_upload.info(f"{code}-正在成交的订单撤单,order_no:{order_no}") # 执行撤单 l2_data_manager_new.L2TradeDataProcessor.cancel_buy(code, "G撤撤单", "G撤") else: # 断开连接 break