| | |
| | | |
| | | global l2_data_callback |
| | | l2_data_callback = _l2_data_callback |
| | | |
| | | l2_data_manager.run_upload_common() |
| | | l2_data_manager.run_upload_trading_canceled() |
| | | l2_data_manager.run_upload_common(l2_data_callback) |
| | | l2_data_manager.run_upload_trading_canceled(l2_data_callback) |
| | | l2_data_manager.run_log() |
| | | # l2_data_manager.run_test(l2_data_callback) |
| | | global l2CommandManager |
| | |
| | | |
| | | |
| | | if __name__ == "__main__": |
| | | run(None, None) |
| | | run(None, None, None) |
| | | # spi.set_codes_data([("000333", 12000)]) |
| | | input() |
| | |
| | | if udatas: |
| | | start_time = time.time() |
| | | # upload_data(code, "l2_order", udatas) |
| | | l2_data_callback.OnL2Order(code, udatas, int(time.time() * 1000)) |
| | | l2_data_callback.OnL2Order(code, udatas, int(time.time() * 1000)) |
| | | # l2_data_transaction_protocol.send_l2_order_detail(pipe, _mmap, code, udatas) |
| | | use_time = int((time.time() - start_time) * 1000) |
| | | if use_time > 20: |
| | |
| | | pass |
| | | |
| | | |
| | | def __run_upload_trans(code): |
| | | def __run_upload_trans(code, l2_data_callback: L2DataCallBack): |
| | | if code not in tmep_transaction_queue_dict: |
| | | tmep_transaction_queue_dict[code] = queue.Queue() |
| | | while True: |
| | |
| | | temp = tmep_transaction_queue_dict[code].get() |
| | | udatas.append(temp) |
| | | if udatas: |
| | | upload_data(code, "l2_trans", udatas) |
| | | # upload_data(code, "l2_trans", udatas) |
| | | l2_data_callback.OnL2Transaction(code, udatas) |
| | | time.sleep(0.01) |
| | | except Exception as e: |
| | | logger_local_huaxin_l2_error.error(f"上传成交数据出错:{str(e)}") |
| | | |
| | | |
| | | def __run_upload_common(): |
| | | def __run_upload_common(l2_data_callback: L2DataCallBack): |
| | | print("__run_upload_common") |
| | | while True: |
| | | try: |
| | | while not common_queue.empty(): |
| | | temp = common_queue.get() |
| | | upload_data(temp[0], temp[1], temp[2]) |
| | | if temp[1] == "l2_market_data": |
| | | l2_data_callback.OnMarketData(temp[0], temp[2]) |
| | | else: |
| | | upload_data(temp[0], temp[1], temp[2]) |
| | | time.sleep(0.01) |
| | | except Exception as e: |
| | | logger_local_huaxin_l2_error.exception(e) |
| | | logger_local_huaxin_l2_error.error(f"上传普通数据出错:{str(e)}") |
| | | |
| | | |
| | | def __run_upload_trading_canceled(): |
| | | def __run_upload_trading_canceled(l2_data_callback: L2DataCallBack): |
| | | print("__run_upload_trading_canceled") |
| | | while True: |
| | | try: |
| | | temp = trading_canceled_queue.get() |
| | | if temp: |
| | | logger_local_huaxin_g_cancel.info(f"准备上报:{temp}") |
| | | upload_data(temp[0], "trading_order_canceled", temp[1], new_sk=True) |
| | | # upload_data(temp[0], "trading_order_canceled", temp[1], new_sk=True) |
| | | l2_data_callback.OnTradingOrderCancel(temp[0], temp[1]) |
| | | logger_local_huaxin_g_cancel.info(f"上报成功:{temp}") |
| | | except Exception as e: |
| | | logger_local_huaxin_l2_error.exception(e) |
| | |
| | | t.start() |
| | | |
| | | if code not in transaction_upload_active_time_dict or time.time() - transaction_upload_active_time_dict[code] > 2: |
| | | t = threading.Thread(target=lambda: __run_upload_trans(code), daemon=True) |
| | | t = threading.Thread(target=lambda: __run_upload_trans(code, l2_data_callback), daemon=True) |
| | | t.start() |
| | | |
| | | |
| | | def run_upload_common(): |
| | | t = threading.Thread(target=lambda: __run_upload_common(), daemon=True) |
| | | def run_upload_common(l2_data_callback: L2DataCallBack): |
| | | t = threading.Thread(target=lambda: __run_upload_common(l2_data_callback), daemon=True) |
| | | t.start() |
| | | |
| | | |
| | | def run_upload_trading_canceled(): |
| | | t = threading.Thread(target=lambda: __run_upload_trading_canceled(), daemon=True) |
| | | def run_upload_trading_canceled(l2_data_callback: L2DataCallBack): |
| | | t = threading.Thread(target=lambda: __run_upload_trading_canceled(l2_data_callback), daemon=True) |
| | | t.start() |
| | | |
| | | |
| | |
| | | def OnL2Order(self, code, datas, timestamp): |
| | | pass |
| | | |
| | | def OnL2Transaction(self, code, datas, timestamp): |
| | | def OnL2Transaction(self, code, datas): |
| | | pass |
| | | |
| | | def OnMarketData(self, code, datas, timestamp): |
| | | def OnMarketData(self, code, datas): |
| | | pass |
| | | |
| | | def OnTradingOrderCancel(self, code, buy_no): |
| | | pass |
| | |
| | | from huaxin_client.log import logger |
| | | |
| | | # 正式账号 |
| | | from huaxin_client.trade_transform_protocol import TradeRequest, TradeResponse |
| | | from log_module.log import logger_local_huaxin_trade_debug, logger_system |
| | | |
| | | UserID = '388000013349' |
| | |
| | | |
| | | logger_local_huaxin_trade_debug.info("API回调 request_id-{}", request_id) |
| | | # 测试 |
| | | send_response( |
| | | json.dumps({"type": "response", "data": {"code": 0, "data": data}, "client_id": client_id, |
| | | "request_id": request_id}), type, client_id, request_id, temp_params[2]) |
| | | # send_response( |
| | | # json.dumps({"type": "response", "data": {"code": 0, "data": data}, "client_id": client_id, |
| | | # "request_id": request_id}), type, client_id, request_id, temp_params[2]) |
| | | |
| | | trade_response.OnTradeResponse({"type": "response", "data": {"code": 0, "data": data}, "client_id": client_id, "request_id": request_id}) |
| | | |
| | | logger_local_huaxin_trade_debug.info("API回调结束 req_id-{} request_id-{}", req_id, request_id) |
| | | print("API回调结束") |
| | | else: |
| | | logger_local_huaxin_trade_debug.info("非API回调 req_id-{}", req_id) |
| | | print("非API回调") |
| | | # 非API回调 |
| | | send_response( |
| | | json.dumps({"type": "trade_callback", "data": {"code": 0, "data": data, "type": type}}), |
| | | type, |
| | | None, |
| | | req_id) |
| | | trade_response.OnTradeCallback({"type": "trade_callback", "data": {"code": 0, "data": data, "type": type}}) |
| | | # # 非API回调 |
| | | # send_response( |
| | | # json.dumps({"type": "trade_callback", "data": {"code": 0, "data": data, "type": type}}), |
| | | # type, |
| | | # None, |
| | | # req_id) |
| | | print("非API结束") |
| | | except Exception as e: |
| | | logging.exception(e) |
| | |
| | | addr, port = constant.SERVER_IP, constant.SERVER_PORT |
| | | |
| | | |
| | | def run(pipe_l2=None, pipe_strategy=None): |
| | | def process_cmd(tradeRequest: TradeRequest): |
| | | tradeCommandManager.process_command(tradeRequest.type_, None, tradeRequest.data) |
| | | |
| | | |
| | | def run(trade_response_: TradeResponse, pipe_l2=None, pipe_strategy=None): |
| | | logger_system.info("交易进程ID:{}", os.getpid()) |
| | | __init_trade_data_server() |
| | | global l2pipe |
| | |
| | | global strategy_pipe |
| | | strategy_pipe = pipe_strategy |
| | | |
| | | global trade_response |
| | | trade_response = trade_response_ |
| | | |
| | | t1 = threading.Thread(target=lambda: trade_client_server.run(), daemon=True) |
| | | t1.start() |
| | | |
| | |
| | | tradeCommandManager = command_manager.TradeCommandManager() |
| | | tradeCommandManager.init(MyTradeActionCallback(), l2pipe, pipe_strategy) |
| | | logger_system.info("华鑫交易服务启动") |
| | | tradeCommandManager.run() |
| | | # 不需要运行命令解析 |
| | | # tradeCommandManager.run() |
| | | while True: |
| | | time.sleep(2) |
| | | |
New file |
| | |
| | | class TradeRequest: |
| | | def __init__(self, type_, data, request_id): |
| | | self.type_ = type_ |
| | | self.data = data |
| | | self.request_id = request_id |
| | | |
| | | |
| | | class TradeResponse: |
| | | def OnTradeResponse(self, data_json): |
| | | pass |
| | | |
| | | def OnTradeCallback(self, data_json): |
| | | pass |
| | |
| | | # from huaxin_api import trade_client, l2_client, l1_client |
| | | |
| | | |
| | | def createTradeServer(pipe_server, pipe_trade, pipe_l1, pipe_l2,ptl2_l2,psl2_l2): |
| | | def createTradeServer(pipe_server, pipe_trade, pipe_l1, pipe_l2, ptl2_l2, psl2_l2, ptl2_trade, pst_trade): |
| | | logger_system.info("策略进程ID:{}", os.getpid()) |
| | | log.close_print() |
| | | # 初始化参数 |
| | |
| | | t1 = threading.Thread(target=huaxin_client.l2_client.run, args=(ptl2_l2, psl2_l2, trade_server.my_l2_data_callback), daemon=True) |
| | | t1.start() |
| | | |
| | | # 启动华鑫交易服务 |
| | | t1 = threading.Thread(target=huaxin_client.trade_client.run, args=(trade_server.my_trade_response, ptl2_trade, pst_trade), |
| | | daemon=True) |
| | | t1.start() |
| | | |
| | | # 交易服务 |
| | | trade_server.run(pipe_trade, pipe_l1, pipe_l2) |
| | | trade_server.run(pipe_trade, pipe_l1, pipe_l2, huaxin_client.trade_client.process_cmd) |
| | | |
| | | |
| | | # 主服务 |
| | |
| | | logger_system.info("主进程ID:{}", os.getpid()) |
| | | |
| | | tradeServerProcess = multiprocessing.Process(target=createTradeServer, |
| | | args=(pss_strategy, pst_strategy, pl1t_strategy, psl2_strategy,ptl2_l2, psl2_l2)) |
| | | args=(pss_strategy, pst_strategy, pl1t_strategy, psl2_strategy,ptl2_l2, psl2_l2,ptl2_trade,pst_trade)) |
| | | tradeServerProcess.start() |
| | | |
| | | # |
| | | tradeProcess = multiprocessing.Process(target=huaxin_client.trade_client.run, args=(ptl2_trade, pst_trade,)) |
| | | tradeProcess.start() |
| | | # L1订阅数据 |
| | | huaxin_client.l1_client.run(pl1t_l1) |
| | | # 将tradeServer作为主进程 |
| | | tradeServerProcess.join() |
| | | tradeProcess.join() |
| | |
| | | import time |
| | | |
| | | from huaxin_client import trade_client_server |
| | | from huaxin_client.trade_transform_protocol import TradeRequest |
| | | from log_module import async_log_util |
| | | from log_module.log import hx_logger_trade_debug, hx_logger_trade_loop, hx_logger_trade_callback, logger_trade |
| | | from trade.huaxin import huaxin_trade_data_update |
| | |
| | | |
| | | |
| | | # 设置交易通信队列 |
| | | def run_pipe_trade(pipe_trade_): |
| | | def run_pipe_trade(pipe_trade_, trade_cmd_callback_): |
| | | global pipe_trade |
| | | pipe_trade = pipe_trade_ |
| | | global trade_cmd_callback |
| | | trade_cmd_callback = trade_cmd_callback_ |
| | | |
| | | t1 = threading.Thread(target=lambda: __run_recv_pipe_trade(), daemon=True) |
| | | t1.start() |
| | | |
| | |
| | | |
| | | # pipe的交易通道是否正常 |
| | | def is_pipe_channel_normal(): |
| | | return trade_pipe_channel_error_count < 3 |
| | | return False |
| | | |
| | | |
| | | # 测试交易通道 |
| | |
| | | if not request_id: |
| | | request_id = __get_request_id(_type) |
| | | try: |
| | | async_log_util.info(hx_logger_trade_loop, "请求发送开始:client_id-{} request_id-{} is_pipe-{}", 0, request_id, is_pipe) |
| | | async_log_util.info(hx_logger_trade_loop, "请求发送开始:client_id-{} request_id-{} is_pipe-{}", 0, request_id, |
| | | is_pipe) |
| | | root_data = {"type": _type, |
| | | "data": data, |
| | | "request_id": request_id} |
| | | root_data = socket_util.encryp_client_params_sign(root_data) |
| | | |
| | | if not is_pipe: |
| | | sk = socket_util.create_socket("127.0.0.1", trade_client_server.SERVER_PORT) |
| | | sk.sendall(socket_util.load_header(json.dumps(root_data).encode("utf-8"))) |
| | | try: |
| | | if blocking: |
| | | result_data, header_str = socket_util.recv_data(sk) |
| | | if result_data: |
| | | data_json = json.loads(result_data) |
| | | set_response(request_id, data_json["data"]) |
| | | finally: |
| | | sk.close() |
| | | trade_cmd_callback(TradeRequest(_type, root_data, request_id)) |
| | | else: |
| | | pipe_trade.send(json.dumps(root_data).encode("utf-8")) |
| | | async_log_util.info(hx_logger_trade_loop, "请求发送成功:request_id-{}", request_id) |
| | |
| | | __TradeOrderIdManager.remove_local_order_id(code, localOrderId) |
| | | except: |
| | | pass |
| | | |
| | | |
| | | |
| | | |
| | | |
| | | |
| | | |
| | | else: |
| | | # 被动触发 |
| | | pass |
| | |
| | | "local_order_id": local_order_id, |
| | | "volume": volume, |
| | | "price_type": price_type, |
| | | "price": price, "sinfo": sinfo}, request_id=request_id, blocking=blocking, is_pipe=is_pipe_channel_normal()) |
| | | "price": price, "sinfo": sinfo}, request_id=request_id, blocking=blocking, |
| | | is_pipe=is_pipe_channel_normal()) |
| | | if blocking: |
| | | try: |
| | | return __read_response(request_id, blocking) |
| | |
| | | "direction": direction, |
| | | "code": code, |
| | | "localOrderID": localOrderID, |
| | | "orderSysID": orderSysID, "sinfo": sinfo}, request_id=request_id, blocking=blocking, is_pipe=is_pipe_channel_normal()) |
| | | "orderSysID": orderSysID, "sinfo": sinfo}, request_id=request_id, blocking=blocking, |
| | | is_pipe=is_pipe_channel_normal()) |
| | | try: |
| | | return __read_response(request_id, blocking) |
| | | finally: |
| | |
| | | # 获取成交列表 |
| | | def get_deal_list(blocking=True): |
| | | request_id = __request(ClientSocketManager.CLIENT_TYPE_DEAL_LIST, |
| | | {"type": ClientSocketManager.CLIENT_TYPE_DEAL_LIST}, blocking=blocking, is_pipe=is_pipe_channel_normal()) |
| | | {"type": ClientSocketManager.CLIENT_TYPE_DEAL_LIST}, blocking=blocking, |
| | | is_pipe=is_pipe_channel_normal()) |
| | | return __read_response(request_id, blocking) |
| | | |
| | | |
| | | # 获取持仓列表 |
| | | def get_position_list(blocking=True): |
| | | request_id = __request(ClientSocketManager.CLIENT_TYPE_POSITION_LIST, |
| | | {"type": ClientSocketManager.CLIENT_TYPE_POSITION_LIST}, blocking=blocking, is_pipe=is_pipe_channel_normal()) |
| | | {"type": ClientSocketManager.CLIENT_TYPE_POSITION_LIST}, blocking=blocking, |
| | | is_pipe=is_pipe_channel_normal()) |
| | | return __read_response(request_id, blocking) |
| | | |
| | | |
| | | # 获取账户资金状况 |
| | | def get_money(blocking=True): |
| | | request_id = __request(ClientSocketManager.CLIENT_TYPE_MONEY, |
| | | {"type": ClientSocketManager.CLIENT_TYPE_MONEY}, blocking=blocking, is_pipe=is_pipe_channel_normal()) |
| | | {"type": ClientSocketManager.CLIENT_TYPE_MONEY}, blocking=blocking, |
| | | is_pipe=is_pipe_channel_normal()) |
| | | return __read_response(request_id, blocking) |
| | | |
| | | |
| | | # 设置L2订阅数据 |
| | | def set_l2_codes_data(codes_data, blocking=True): |
| | | request_id = __request(ClientSocketManager.CLIENT_TYPE_CMD_L2, |
| | | {"type": ClientSocketManager.CLIENT_TYPE_CMD_L2, "data": codes_data}, blocking=blocking, is_pipe=is_pipe_channel_normal()) |
| | | {"type": ClientSocketManager.CLIENT_TYPE_CMD_L2, "data": codes_data}, blocking=blocking, |
| | | is_pipe=is_pipe_channel_normal()) |
| | | return __read_response(request_id, blocking) |
| | | |
| | | |
| | |
| | | from db.redis_manager_delegate import RedisUtils |
| | | from huaxin_client import l1_subscript_codes_manager, l2_data_transform_protocol |
| | | from huaxin_client.client_network import SendResponseSkManager |
| | | from huaxin_client.trade_transform_protocol import TradeResponse |
| | | from l2 import l2_data_manager_new, l2_log, code_price_manager, l2_data_util, l2_data_manager, transaction_progress |
| | | from l2.cancel_buy_strategy import HourCancelBigNumComputer, LCancelBigNumComputer, DCancelBigNumComputer, \ |
| | | GCancelBigNumComputer, SecondCancelBigNumComputer |
| | |
| | | elif data_json["type"] == "response": |
| | | # 主动触发的响应 |
| | | try: |
| | | client_id = data_json.get("client_id") |
| | | hx_logger_trade_callback.info(f"response:request_id-{data_json['request_id']}") |
| | | # 设置响应内容 |
| | | trade_api.set_response(data_json["request_id"], data_json['data']) |
| | | my_trade_response.OnTradeResponse(data_json) |
| | | finally: |
| | | sk.sendall(socket_util.load_header(json.dumps({"code": 0}).encode(encoding='utf-8'))) |
| | | elif data_json["type"] == "trade_callback": |
| | | try: |
| | | # 交易回调 |
| | | data_json = data_json["data"] |
| | | type_ = data_json["type"] |
| | | # 记录交易反馈日志 |
| | | hx_logger_trade_callback.info(data_json) |
| | | # 重新请求委托列表与资金 |
| | | huaxin_trade_data_update.add_delegate_list() |
| | | huaxin_trade_data_update.add_deal_list() |
| | | huaxin_trade_data_update.add_money_list() |
| | | # print("响应结果:", data_json['data']) |
| | | my_trade_response.OnTradeCallback(data_json) |
| | | finally: |
| | | sk.sendall(socket_util.load_header(json.dumps({"code": 0}).encode(encoding='utf-8'))) |
| | | elif data_json["type"] == "l2_order": |
| | |
| | | data = data_json["data"] |
| | | code = data["code"] |
| | | datas = data["data"] |
| | | hx_logger_l2_transaction.info(f"{code}#{datas}") |
| | | if datas: |
| | | # 设置成交价 |
| | | current_price_process_manager.set_trade_price(code, datas[-1][1]) |
| | | try: |
| | | buyno_map = l2_data_util.local_today_buyno_map.get(code) |
| | | hx_logger_l2_transaction.info( |
| | | f"{code}的买入订单号数量:{len(buyno_map.keys()) if buyno_map else 0}") |
| | | buy_progress_index = None |
| | | for i in range(len(datas) - 1, -1, -1): |
| | | d = datas[i] |
| | | buy_no = f"{d[6]}" |
| | | if buyno_map and buy_no in buyno_map: |
| | | hx_logger_l2_transaction.info(f"{code}成交进度:{buyno_map[buy_no]}") |
| | | buy_progress_index = buyno_map[buy_no]["index"] |
| | | break |
| | | |
| | | # 获取执行位时间 |
| | | buy_single_index, buy_exec_index, compute_index, num, count, max_num_set, volume_rate = l2_data_manager.TradePointManager().get_buy_compute_start_data_cache( |
| | | code) |
| | | if True: |
| | | if buy_progress_index is not None: |
| | | self.__TradeBuyQueue.set_traded_index(code, buy_progress_index) |
| | | total_datas = l2_data_util.local_today_datas.get(code) |
| | | num_operate_map = l2_data_util.local_today_num_operate_map.get( |
| | | code) |
| | | logger_l2_trade_buy_queue.info("获取成交位置成功: code-{} index-{}", code, |
| | | buy_progress_index) |
| | | buy_time = total_datas[buy_progress_index]["val"]["time"] |
| | | limit_up_price = gpcode_manager.get_limit_up_price(code) |
| | | if buy_exec_index: |
| | | need_cancel, msg = DCancelBigNumComputer().set_trade_progress(code, |
| | | buy_progress_index, |
| | | buy_exec_index, |
| | | total_datas, |
| | | num_operate_map, |
| | | num * 100 * float( |
| | | limit_up_price), |
| | | limit_up_price) |
| | | if need_cancel: |
| | | L2TradeDataProcessor.cancel_buy(code, f"D撤:{msg}", source="d_cancel") |
| | | |
| | | f1 = dask.delayed(HourCancelBigNumComputer().set_trade_progress)(code, buy_time, |
| | | buy_exec_index, |
| | | buy_progress_index, |
| | | total_datas, |
| | | num_operate_map) |
| | | f2 = dask.delayed(LCancelBigNumComputer().set_trade_progress)(code, |
| | | buy_progress_index, |
| | | total_datas) |
| | | f3 = dask.delayed( |
| | | deal_big_money_manager.DealComputeProgressManager().set_trade_progress)( |
| | | code, |
| | | buy_progress_index, |
| | | total_datas, |
| | | num_operate_map) |
| | | |
| | | f4 = dask.delayed( |
| | | SecondCancelBigNumComputer().set_transaction_index)( |
| | | code, |
| | | buy_progress_index) |
| | | |
| | | dask.compute(f1, f2, f3, f4) |
| | | except Exception as e: |
| | | hx_logger_l2_transaction.exception(e) |
| | | TradeServerProcessor.l2_transaction(code, datas) |
| | | finally: |
| | | sk.sendall(socket_util.load_header(json.dumps({"code": 0}).encode(encoding='utf-8'))) |
| | | elif data_json["type"] == "l2_market_data": |
| | |
| | | data = data_json["data"] |
| | | code = data["code"] |
| | | data = data["data"] |
| | | time_str = f"{data['dataTimeStamp']}" |
| | | if time_str.startswith("9"): |
| | | time_str = "0" + time_str |
| | | time_str = time_str[:6] |
| | | time_str = f"{time_str[0:2]}:{time_str[2:4]}:{time_str[4:6]}" |
| | | buy_1_price, buy_1_volume = data["buy"][0] |
| | | sell_1_price, sell_1_volume = data["sell"][0] |
| | | limit_up_price = gpcode_manager.get_limit_up_price(code) |
| | | |
| | | if limit_up_price is not None: |
| | | # 处理买1,卖1信息 |
| | | code_price_manager.Buy1PriceManager().process(code, buy_1_price, time_str, |
| | | limit_up_price, |
| | | sell_1_price, sell_1_volume // 100) |
| | | pre_close_price = round(float(limit_up_price) / 1.1, 2) |
| | | # 如果涨幅大于7%就读取板块 |
| | | price_rate = (buy_1_price - pre_close_price) / pre_close_price |
| | | if price_rate > 0.07: |
| | | if not self.__KPLCodeJXBlockManager.get_jx_blocks_cache(code): |
| | | blocks = kpl_api.getCodeJingXuanBlocks(code) |
| | | self.__KPLCodeJXBlockManager.save_jx_blocks(code, blocks) |
| | | elif price_rate > 0.03: |
| | | # 添加备用板块 |
| | | if not self.__KPLCodeJXBlockManager.get_jx_blocks_cache(code, by=True): |
| | | blocks = kpl_api.getCodeJingXuanBlocks(code) |
| | | self.__KPLCodeJXBlockManager.save_jx_blocks(code, blocks, by=True) |
| | | |
| | | # 更新板块信息 |
| | | yesterday_codes = kpl_data_manager.get_yesterday_limit_up_codes() |
| | | CodePlateKeyBuyManager.update_can_buy_blocks(code, |
| | | kpl_data_manager.KPLLimitUpDataRecordManager.latest_origin_datas, |
| | | kpl_data_manager.KPLLimitUpDataRecordManager.total_datas, |
| | | yesterday_codes, |
| | | block_info.get_before_blocks_dict()) |
| | | |
| | | hx_logger_l2_market_data.info(f"{code}#{data}") |
| | | TradeServerProcessor.l2_market_data(code, data) |
| | | finally: |
| | | sk.sendall(socket_util.load_header(json.dumps({"code": 0}).encode(encoding='utf-8'))) |
| | | elif data_json["type"] == "l2_subscript_codes": |
| | |
| | | |
| | | # 交易服务处理器 |
| | | class TradeServerProcessor: |
| | | __TradeBuyQueue = transaction_progress.TradeBuyQueue() |
| | | __KPLCodeJXBlockManager = KPLCodeJXBlockManager() |
| | | |
| | | # 设置目标代码 |
| | | @classmethod |
| | | def set_target_codes(cls, data_json): |
| | |
| | | f"{code}#耗时:{int(time.time() * 1000) - timestamp}-{now_timestamp}#{_datas}") |
| | | l2_log.threadIds[code] = random.randint(0, 100000) |
| | | l2_data_manager_new.L2TradeDataProcessor.process_huaxin(code, _datas) |
| | | |
| | | @classmethod |
| | | def l2_transaction(cls, code, datas): |
| | | hx_logger_l2_transaction.info(f"{code}#{datas}") |
| | | if datas: |
| | | # 设置成交价 |
| | | current_price_process_manager.set_trade_price(code, datas[-1][1]) |
| | | try: |
| | | buyno_map = l2_data_util.local_today_buyno_map.get(code) |
| | | hx_logger_l2_transaction.info( |
| | | f"{code}的买入订单号数量:{len(buyno_map.keys()) if buyno_map else 0}") |
| | | buy_progress_index = None |
| | | for i in range(len(datas) - 1, -1, -1): |
| | | d = datas[i] |
| | | buy_no = f"{d[6]}" |
| | | if buyno_map and buy_no in buyno_map: |
| | | hx_logger_l2_transaction.info(f"{code}成交进度:{buyno_map[buy_no]}") |
| | | buy_progress_index = buyno_map[buy_no]["index"] |
| | | break |
| | | |
| | | # 获取执行位时间 |
| | | buy_single_index, buy_exec_index, compute_index, num, count, max_num_set, volume_rate = l2_data_manager.TradePointManager().get_buy_compute_start_data_cache( |
| | | code) |
| | | if True: |
| | | if buy_progress_index is not None: |
| | | cls.__TradeBuyQueue.set_traded_index(code, buy_progress_index) |
| | | total_datas = l2_data_util.local_today_datas.get(code) |
| | | num_operate_map = l2_data_util.local_today_num_operate_map.get( |
| | | code) |
| | | logger_l2_trade_buy_queue.info("获取成交位置成功: code-{} index-{}", code, |
| | | buy_progress_index) |
| | | buy_time = total_datas[buy_progress_index]["val"]["time"] |
| | | limit_up_price = gpcode_manager.get_limit_up_price(code) |
| | | if buy_exec_index: |
| | | need_cancel, msg = DCancelBigNumComputer().set_trade_progress(code, |
| | | buy_progress_index, |
| | | buy_exec_index, |
| | | total_datas, |
| | | num_operate_map, |
| | | num * 100 * float( |
| | | limit_up_price), |
| | | limit_up_price) |
| | | if need_cancel: |
| | | L2TradeDataProcessor.cancel_buy(code, f"D撤:{msg}", source="d_cancel") |
| | | |
| | | f1 = dask.delayed(HourCancelBigNumComputer().set_trade_progress)(code, buy_time, |
| | | buy_exec_index, |
| | | buy_progress_index, |
| | | total_datas, |
| | | num_operate_map) |
| | | f2 = dask.delayed(LCancelBigNumComputer().set_trade_progress)(code, |
| | | buy_progress_index, |
| | | total_datas) |
| | | f3 = dask.delayed( |
| | | deal_big_money_manager.DealComputeProgressManager().set_trade_progress)( |
| | | code, |
| | | buy_progress_index, |
| | | total_datas, |
| | | num_operate_map) |
| | | |
| | | f4 = dask.delayed( |
| | | SecondCancelBigNumComputer().set_transaction_index)( |
| | | code, |
| | | buy_progress_index) |
| | | |
| | | dask.compute(f1, f2, f3, f4) |
| | | except Exception as e: |
| | | hx_logger_l2_transaction.exception(e) |
| | | |
| | | @classmethod |
| | | def l2_market_data(cls, code, data): |
| | | time_str = f"{data['dataTimeStamp']}" |
| | | if time_str.startswith("9"): |
| | | time_str = "0" + time_str |
| | | time_str = time_str[:6] |
| | | time_str = f"{time_str[0:2]}:{time_str[2:4]}:{time_str[4:6]}" |
| | | buy_1_price, buy_1_volume = data["buy"][0] |
| | | sell_1_price, sell_1_volume = data["sell"][0] |
| | | limit_up_price = gpcode_manager.get_limit_up_price(code) |
| | | |
| | | if limit_up_price is not None: |
| | | # 处理买1,卖1信息 |
| | | code_price_manager.Buy1PriceManager().process(code, buy_1_price, time_str, |
| | | limit_up_price, |
| | | sell_1_price, sell_1_volume // 100) |
| | | pre_close_price = round(float(limit_up_price) / 1.1, 2) |
| | | # 如果涨幅大于7%就读取板块 |
| | | price_rate = (buy_1_price - pre_close_price) / pre_close_price |
| | | if price_rate > 0.07: |
| | | if not cls.__KPLCodeJXBlockManager.get_jx_blocks_cache(code): |
| | | blocks = kpl_api.getCodeJingXuanBlocks(code) |
| | | cls.__KPLCodeJXBlockManager.save_jx_blocks(code, blocks) |
| | | elif price_rate > 0.03: |
| | | # 添加备用板块 |
| | | if not cls.__KPLCodeJXBlockManager.get_jx_blocks_cache(code, by=True): |
| | | blocks = kpl_api.getCodeJingXuanBlocks(code) |
| | | cls.__KPLCodeJXBlockManager.save_jx_blocks(code, blocks, by=True) |
| | | |
| | | # 更新板块信息 |
| | | yesterday_codes = kpl_data_manager.get_yesterday_limit_up_codes() |
| | | CodePlateKeyBuyManager.update_can_buy_blocks(code, |
| | | kpl_data_manager.KPLLimitUpDataRecordManager.latest_origin_datas, |
| | | kpl_data_manager.KPLLimitUpDataRecordManager.total_datas, |
| | | yesterday_codes, |
| | | block_info.get_before_blocks_dict()) |
| | | |
| | | hx_logger_l2_market_data.info(f"{code}#{data}") |
| | | |
| | | |
| | | def clear_invalid_client(): |
| | |
| | | def OnL2Order(self, code, datas, timestamp): |
| | | TradeServerProcessor.l2_order(code, datas, timestamp) |
| | | |
| | | def OnL2Transaction(self, code, datas, timestamp): |
| | | pass |
| | | def OnL2Transaction(self, code, datas): |
| | | TradeServerProcessor.l2_transaction(code, datas) |
| | | |
| | | def OnMarketData(self, code, datas, timestamp): |
| | | pass |
| | | def OnMarketData(self, code, datas): |
| | | TradeServerProcessor.l2_market_data(code, datas) |
| | | |
| | | |
| | | class MyTradeResponse(TradeResponse): |
| | | |
| | | def OnTradeCallback(self, data_json): |
| | | data_json = data_json["data"] |
| | | type_ = data_json["type"] |
| | | # 记录交易反馈日志 |
| | | hx_logger_trade_callback.info(data_json) |
| | | # 重新请求委托列表与资金 |
| | | huaxin_trade_data_update.add_delegate_list() |
| | | huaxin_trade_data_update.add_deal_list() |
| | | huaxin_trade_data_update.add_money_list() |
| | | |
| | | def OnTradeResponse(self, data_json): |
| | | hx_logger_trade_callback.info(f"response:request_id-{data_json['request_id']}") |
| | | # 设置响应内容 |
| | | trade_api.set_response(data_json["request_id"], data_json['data']) |
| | | |
| | | |
| | | # 回调 |
| | | my_l2_data_callback = MyL2DataCallback() |
| | | my_trade_response = MyTradeResponse() |
| | | |
| | | |
| | | def run(pipe_trade, pipe_l1, pipe_l2): |
| | | def run(pipe_trade, pipe_l1, pipe_l2, trade_cmd_callback): |
| | | # 执行一些初始化数据 |
| | | block_info.init() |
| | | |
| | |
| | | manager.run(blocking=False) |
| | | |
| | | # 启动交易服务 |
| | | huaxin_trade_api.run_pipe_trade(pipe_trade) |
| | | huaxin_trade_api.run_pipe_trade(pipe_trade, trade_cmd_callback) |
| | | |
| | | # 监听l1那边传过来的代码 |
| | | t1 = threading.Thread(target=lambda: __recv_pipe_l1(pipe_l1), daemon=True) |