| | |
| | | from db.redis_manager_delegate import RedisUtils |
| | | from huaxin_client import l1_subscript_codes_manager, l2_data_transform_protocol |
| | | from huaxin_client.client_network import SendResponseSkManager |
| | | from huaxin_client.trade_transform_protocol import TradeResponse |
| | | from l2 import l2_data_manager_new, l2_log, code_price_manager, l2_data_util, l2_data_manager, transaction_progress |
| | | from l2.cancel_buy_strategy import HourCancelBigNumComputer, LCancelBigNumComputer, DCancelBigNumComputer, \ |
| | | GCancelBigNumComputer, SecondCancelBigNumComputer |
| | |
| | | elif data_json["type"] == "response": |
| | | # 主动触发的响应 |
| | | try: |
| | | client_id = data_json.get("client_id") |
| | | hx_logger_trade_callback.info(f"response:request_id-{data_json['request_id']}") |
| | | # 设置响应内容 |
| | | trade_api.set_response(data_json["request_id"], data_json['data']) |
| | | my_trade_response.OnTradeResponse(data_json) |
| | | finally: |
| | | sk.sendall(socket_util.load_header(json.dumps({"code": 0}).encode(encoding='utf-8'))) |
| | | elif data_json["type"] == "trade_callback": |
| | | try: |
| | | # 交易回调 |
| | | data_json = data_json["data"] |
| | | type_ = data_json["type"] |
| | | # 记录交易反馈日志 |
| | | hx_logger_trade_callback.info(data_json) |
| | | # 重新请求委托列表与资金 |
| | | huaxin_trade_data_update.add_delegate_list() |
| | | huaxin_trade_data_update.add_deal_list() |
| | | huaxin_trade_data_update.add_money_list() |
| | | # print("响应结果:", data_json['data']) |
| | | my_trade_response.OnTradeCallback(data_json) |
| | | finally: |
| | | sk.sendall(socket_util.load_header(json.dumps({"code": 0}).encode(encoding='utf-8'))) |
| | | elif data_json["type"] == "l2_order": |
| | |
| | | data = data_json["data"] |
| | | code = data["code"] |
| | | datas = data["data"] |
| | | hx_logger_l2_transaction.info(f"{code}#{datas}") |
| | | if datas: |
| | | # 设置成交价 |
| | | current_price_process_manager.set_trade_price(code, datas[-1][1]) |
| | | try: |
| | | buyno_map = l2_data_util.local_today_buyno_map.get(code) |
| | | hx_logger_l2_transaction.info( |
| | | f"{code}的买入订单号数量:{len(buyno_map.keys()) if buyno_map else 0}") |
| | | buy_progress_index = None |
| | | for i in range(len(datas) - 1, -1, -1): |
| | | d = datas[i] |
| | | buy_no = f"{d[6]}" |
| | | if buyno_map and buy_no in buyno_map: |
| | | hx_logger_l2_transaction.info(f"{code}成交进度:{buyno_map[buy_no]}") |
| | | buy_progress_index = buyno_map[buy_no]["index"] |
| | | break |
| | | |
| | | # 获取执行位时间 |
| | | buy_single_index, buy_exec_index, compute_index, num, count, max_num_set, volume_rate = l2_data_manager.TradePointManager().get_buy_compute_start_data_cache( |
| | | code) |
| | | if True: |
| | | if buy_progress_index is not None: |
| | | self.__TradeBuyQueue.set_traded_index(code, buy_progress_index) |
| | | total_datas = l2_data_util.local_today_datas.get(code) |
| | | num_operate_map = l2_data_util.local_today_num_operate_map.get( |
| | | code) |
| | | logger_l2_trade_buy_queue.info("获取成交位置成功: code-{} index-{}", code, |
| | | buy_progress_index) |
| | | buy_time = total_datas[buy_progress_index]["val"]["time"] |
| | | limit_up_price = gpcode_manager.get_limit_up_price(code) |
| | | if buy_exec_index: |
| | | need_cancel, msg = DCancelBigNumComputer().set_trade_progress(code, |
| | | buy_progress_index, |
| | | buy_exec_index, |
| | | total_datas, |
| | | num_operate_map, |
| | | num * 100 * float( |
| | | limit_up_price), |
| | | limit_up_price) |
| | | if need_cancel: |
| | | L2TradeDataProcessor.cancel_buy(code, f"D撤:{msg}", source="d_cancel") |
| | | |
| | | f1 = dask.delayed(HourCancelBigNumComputer().set_trade_progress)(code, buy_time, |
| | | buy_exec_index, |
| | | buy_progress_index, |
| | | total_datas, |
| | | num_operate_map) |
| | | f2 = dask.delayed(LCancelBigNumComputer().set_trade_progress)(code, |
| | | buy_progress_index, |
| | | total_datas) |
| | | f3 = dask.delayed( |
| | | deal_big_money_manager.DealComputeProgressManager().set_trade_progress)( |
| | | code, |
| | | buy_progress_index, |
| | | total_datas, |
| | | num_operate_map) |
| | | |
| | | f4 = dask.delayed( |
| | | SecondCancelBigNumComputer().set_transaction_index)( |
| | | code, |
| | | buy_progress_index) |
| | | |
| | | dask.compute(f1, f2, f3, f4) |
| | | except Exception as e: |
| | | hx_logger_l2_transaction.exception(e) |
| | | TradeServerProcessor.l2_transaction(code, datas) |
| | | finally: |
| | | sk.sendall(socket_util.load_header(json.dumps({"code": 0}).encode(encoding='utf-8'))) |
| | | elif data_json["type"] == "l2_market_data": |
| | |
| | | data = data_json["data"] |
| | | code = data["code"] |
| | | data = data["data"] |
| | | time_str = f"{data['dataTimeStamp']}" |
| | | if time_str.startswith("9"): |
| | | time_str = "0" + time_str |
| | | time_str = time_str[:6] |
| | | time_str = f"{time_str[0:2]}:{time_str[2:4]}:{time_str[4:6]}" |
| | | buy_1_price, buy_1_volume = data["buy"][0] |
| | | sell_1_price, sell_1_volume = data["sell"][0] |
| | | limit_up_price = gpcode_manager.get_limit_up_price(code) |
| | | |
| | | if limit_up_price is not None: |
| | | # 处理买1,卖1信息 |
| | | code_price_manager.Buy1PriceManager().process(code, buy_1_price, time_str, |
| | | limit_up_price, |
| | | sell_1_price, sell_1_volume // 100) |
| | | pre_close_price = round(float(limit_up_price) / 1.1, 2) |
| | | # 如果涨幅大于7%就读取板块 |
| | | price_rate = (buy_1_price - pre_close_price) / pre_close_price |
| | | if price_rate > 0.07: |
| | | if not self.__KPLCodeJXBlockManager.get_jx_blocks_cache(code): |
| | | blocks = kpl_api.getCodeJingXuanBlocks(code) |
| | | self.__KPLCodeJXBlockManager.save_jx_blocks(code, blocks) |
| | | elif price_rate > 0.03: |
| | | # 添加备用板块 |
| | | if not self.__KPLCodeJXBlockManager.get_jx_blocks_cache(code, by=True): |
| | | blocks = kpl_api.getCodeJingXuanBlocks(code) |
| | | self.__KPLCodeJXBlockManager.save_jx_blocks(code, blocks, by=True) |
| | | |
| | | # 更新板块信息 |
| | | yesterday_codes = kpl_data_manager.get_yesterday_limit_up_codes() |
| | | CodePlateKeyBuyManager.update_can_buy_blocks(code, |
| | | kpl_data_manager.KPLLimitUpDataRecordManager.latest_origin_datas, |
| | | kpl_data_manager.KPLLimitUpDataRecordManager.total_datas, |
| | | yesterday_codes, |
| | | block_info.get_before_blocks_dict()) |
| | | |
| | | hx_logger_l2_market_data.info(f"{code}#{data}") |
| | | TradeServerProcessor.l2_market_data(code, data) |
| | | finally: |
| | | sk.sendall(socket_util.load_header(json.dumps({"code": 0}).encode(encoding='utf-8'))) |
| | | elif data_json["type"] == "l2_subscript_codes": |
| | |
| | | |
| | | # 交易服务处理器 |
| | | class TradeServerProcessor: |
| | | __TradeBuyQueue = transaction_progress.TradeBuyQueue() |
| | | __KPLCodeJXBlockManager = KPLCodeJXBlockManager() |
| | | |
| | | # 设置目标代码 |
| | | @classmethod |
| | | def set_target_codes(cls, data_json): |
| | |
| | | f"{code}#耗时:{int(time.time() * 1000) - timestamp}-{now_timestamp}#{_datas}") |
| | | l2_log.threadIds[code] = random.randint(0, 100000) |
| | | l2_data_manager_new.L2TradeDataProcessor.process_huaxin(code, _datas) |
| | | |
| | | @classmethod |
| | | def l2_transaction(cls, code, datas): |
| | | hx_logger_l2_transaction.info(f"{code}#{datas}") |
| | | if datas: |
| | | # 设置成交价 |
| | | current_price_process_manager.set_trade_price(code, datas[-1][1]) |
| | | try: |
| | | buyno_map = l2_data_util.local_today_buyno_map.get(code) |
| | | hx_logger_l2_transaction.info( |
| | | f"{code}的买入订单号数量:{len(buyno_map.keys()) if buyno_map else 0}") |
| | | buy_progress_index = None |
| | | for i in range(len(datas) - 1, -1, -1): |
| | | d = datas[i] |
| | | buy_no = f"{d[6]}" |
| | | if buyno_map and buy_no in buyno_map: |
| | | hx_logger_l2_transaction.info(f"{code}成交进度:{buyno_map[buy_no]}") |
| | | buy_progress_index = buyno_map[buy_no]["index"] |
| | | break |
| | | |
| | | # 获取执行位时间 |
| | | buy_single_index, buy_exec_index, compute_index, num, count, max_num_set, volume_rate = l2_data_manager.TradePointManager().get_buy_compute_start_data_cache( |
| | | code) |
| | | if True: |
| | | if buy_progress_index is not None: |
| | | cls.__TradeBuyQueue.set_traded_index(code, buy_progress_index) |
| | | total_datas = l2_data_util.local_today_datas.get(code) |
| | | num_operate_map = l2_data_util.local_today_num_operate_map.get( |
| | | code) |
| | | logger_l2_trade_buy_queue.info("获取成交位置成功: code-{} index-{}", code, |
| | | buy_progress_index) |
| | | buy_time = total_datas[buy_progress_index]["val"]["time"] |
| | | limit_up_price = gpcode_manager.get_limit_up_price(code) |
| | | if buy_exec_index: |
| | | need_cancel, msg = DCancelBigNumComputer().set_trade_progress(code, |
| | | buy_progress_index, |
| | | buy_exec_index, |
| | | total_datas, |
| | | num_operate_map, |
| | | num * 100 * float( |
| | | limit_up_price), |
| | | limit_up_price) |
| | | if need_cancel: |
| | | L2TradeDataProcessor.cancel_buy(code, f"D撤:{msg}", source="d_cancel") |
| | | |
| | | f1 = dask.delayed(HourCancelBigNumComputer().set_trade_progress)(code, buy_time, |
| | | buy_exec_index, |
| | | buy_progress_index, |
| | | total_datas, |
| | | num_operate_map) |
| | | f2 = dask.delayed(LCancelBigNumComputer().set_trade_progress)(code, |
| | | buy_progress_index, |
| | | total_datas) |
| | | f3 = dask.delayed( |
| | | deal_big_money_manager.DealComputeProgressManager().set_trade_progress)( |
| | | code, |
| | | buy_progress_index, |
| | | total_datas, |
| | | num_operate_map) |
| | | |
| | | f4 = dask.delayed( |
| | | SecondCancelBigNumComputer().set_transaction_index)( |
| | | code, |
| | | buy_progress_index) |
| | | |
| | | dask.compute(f1, f2, f3, f4) |
| | | except Exception as e: |
| | | hx_logger_l2_transaction.exception(e) |
| | | |
| | | @classmethod |
| | | def l2_market_data(cls, code, data): |
| | | time_str = f"{data['dataTimeStamp']}" |
| | | if time_str.startswith("9"): |
| | | time_str = "0" + time_str |
| | | time_str = time_str[:6] |
| | | time_str = f"{time_str[0:2]}:{time_str[2:4]}:{time_str[4:6]}" |
| | | buy_1_price, buy_1_volume = data["buy"][0] |
| | | sell_1_price, sell_1_volume = data["sell"][0] |
| | | limit_up_price = gpcode_manager.get_limit_up_price(code) |
| | | |
| | | if limit_up_price is not None: |
| | | # 处理买1,卖1信息 |
| | | code_price_manager.Buy1PriceManager().process(code, buy_1_price, time_str, |
| | | limit_up_price, |
| | | sell_1_price, sell_1_volume // 100) |
| | | pre_close_price = round(float(limit_up_price) / 1.1, 2) |
| | | # 如果涨幅大于7%就读取板块 |
| | | price_rate = (buy_1_price - pre_close_price) / pre_close_price |
| | | if price_rate > 0.07: |
| | | if not cls.__KPLCodeJXBlockManager.get_jx_blocks_cache(code): |
| | | blocks = kpl_api.getCodeJingXuanBlocks(code) |
| | | cls.__KPLCodeJXBlockManager.save_jx_blocks(code, blocks) |
| | | elif price_rate > 0.03: |
| | | # 添加备用板块 |
| | | if not cls.__KPLCodeJXBlockManager.get_jx_blocks_cache(code, by=True): |
| | | blocks = kpl_api.getCodeJingXuanBlocks(code) |
| | | cls.__KPLCodeJXBlockManager.save_jx_blocks(code, blocks, by=True) |
| | | |
| | | # 更新板块信息 |
| | | yesterday_codes = kpl_data_manager.get_yesterday_limit_up_codes() |
| | | CodePlateKeyBuyManager.update_can_buy_blocks(code, |
| | | kpl_data_manager.KPLLimitUpDataRecordManager.latest_origin_datas, |
| | | kpl_data_manager.KPLLimitUpDataRecordManager.total_datas, |
| | | yesterday_codes, |
| | | block_info.get_before_blocks_dict()) |
| | | |
| | | hx_logger_l2_market_data.info(f"{code}#{data}") |
| | | |
| | | |
| | | def clear_invalid_client(): |
| | |
| | | def OnL2Order(self, code, datas, timestamp): |
| | | TradeServerProcessor.l2_order(code, datas, timestamp) |
| | | |
| | | def OnL2Transaction(self, code, datas, timestamp): |
| | | pass |
| | | def OnL2Transaction(self, code, datas): |
| | | TradeServerProcessor.l2_transaction(code, datas) |
| | | |
| | | def OnMarketData(self, code, datas, timestamp): |
| | | pass |
| | | def OnMarketData(self, code, datas): |
| | | TradeServerProcessor.l2_market_data(code, datas) |
| | | |
| | | |
| | | class MyTradeResponse(TradeResponse): |
| | | |
| | | def OnTradeCallback(self, data_json): |
| | | data_json = data_json["data"] |
| | | type_ = data_json["type"] |
| | | # 记录交易反馈日志 |
| | | hx_logger_trade_callback.info(data_json) |
| | | # 重新请求委托列表与资金 |
| | | huaxin_trade_data_update.add_delegate_list() |
| | | huaxin_trade_data_update.add_deal_list() |
| | | huaxin_trade_data_update.add_money_list() |
| | | |
| | | def OnTradeResponse(self, data_json): |
| | | hx_logger_trade_callback.info(f"response:request_id-{data_json['request_id']}") |
| | | # 设置响应内容 |
| | | trade_api.set_response(data_json["request_id"], data_json['data']) |
| | | |
| | | |
| | | # 回调 |
| | | my_l2_data_callback = MyL2DataCallback() |
| | | my_trade_response = MyTradeResponse() |
| | | |
| | | |
| | | def run(pipe_trade, pipe_l1, pipe_l2): |
| | | def run(pipe_trade, pipe_l1, pipe_l2, trade_cmd_callback): |
| | | # 执行一些初始化数据 |
| | | block_info.init() |
| | | |
| | |
| | | manager.run(blocking=False) |
| | | |
| | | # 启动交易服务 |
| | | huaxin_trade_api.run_pipe_trade(pipe_trade) |
| | | huaxin_trade_api.run_pipe_trade(pipe_trade, trade_cmd_callback) |
| | | |
| | | # 监听l1那边传过来的代码 |
| | | t1 = threading.Thread(target=lambda: __recv_pipe_l1(pipe_l1), daemon=True) |