交易设置独立进程/删除trade_client_server/记录L2逐笔委托日志
| | |
| | | class TradeCommandManager: |
| | | trade_client_dict = {} |
| | | _instance = None |
| | | process_command_thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=20) |
| | | process_command_thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=30) |
| | | |
| | | def __new__(cls, *args, **kwargs): |
| | | if not cls._instance: |
| | |
| | | val = pipe_strategy.recv() |
| | | if val: |
| | | val = json.loads(val) |
| | | # print("run_process_command", val) |
| | | _type = val["type"] |
| | | threading.Thread(target=lambda: cls.process_command(_type, None, val), daemon=True).start() |
| | | cls.process_command_thread_pool.submit(lambda: cls.process_command(_type, None, val)) |
| | | |
| | | except Exception as e: |
| | | logger_local_huaxin_trade_debug.exception(e) |
| | | logging.exception(e) |
| | |
| | | "OrderTime": pOrderDetail['OrderTime'], "MainSeq": pOrderDetail['MainSeq'], |
| | | "SubSeq": pOrderDetail['SubSeq'], "OrderNO": pOrderDetail['OrderNO'], |
| | | "OrderStatus": pOrderDetail['OrderStatus'].decode()} |
| | | print("逐笔委托", item) |
| | | l2_data_manager.add_l2_order_detail(item) |
| | | |
| | | # logger_local_huaxin_l2_orderdetail.info( |
| | |
| | | from huaxin_client.l2_data_transform_protocol import L2DataCallBack |
| | | from log_module import log_export, async_log_util |
| | | from log_module.log import logger_local_huaxin_l2_error, logger_local_huaxin_l2_upload, logger_local_huaxin_l2_buy_no, \ |
| | | logger_local_huaxin_g_cancel, hx_logger_contact_debug, logger_system |
| | | logger_local_huaxin_g_cancel, hx_logger_contact_debug, logger_system, logger_local_huaxin_l2_orderdetail |
| | | from utils import tool |
| | | |
| | | order_detail_upload_active_time_dict = {} |
| | |
| | | # 添加委托详情 |
| | | def add_l2_order_detail(data, istransaction=False): |
| | | code = data["SecurityID"] |
| | | # 异步日志记录 |
| | | async_log_util.huaxin_l2_log.info(logger_local_huaxin_l2_orderdetail, data) |
| | | if code not in tmep_order_detail_queue_dict: |
| | | tmep_order_detail_queue_dict[code] = queue.Queue() |
| | | if istransaction: |
| | | pass |
| | | else: |
| | | pass |
| | | # 原来的格式 |
| | | # {"SecurityID": pOrderDetail['SecurityID'], "Price": pOrderDetail['Price'], |
| | | # "Volume": pOrderDetail['Volume'], |
| | |
| | | buy_order_nos_dict[data['SecurityID']] = set() |
| | | buy_order_nos_dict[data['SecurityID']].add(data['OrderNO']) |
| | | # 买入订单号需要记录日志 |
| | | log_buy_no_queue.put_nowait((data['SecurityID'], data['OrderNO'])) |
| | | async_log_util.huaxin_l2_log.info(logger_local_huaxin_l2_buy_no, f"{data['SecurityID']}#{data['OrderNO']}") |
| | | |
| | | tmep_order_detail_queue_dict[code].put( |
| | | (data['SecurityID'], data['Price'], data['Volume'], data['Side'], data['OrderType'], data['OrderTime'], |
| | |
| | | def __run_log(): |
| | | print("__run_log") |
| | | logger_system.info(f"l2_client __run_log 线程ID:{tool.get_thread_id()}") |
| | | while True: |
| | | try: |
| | | temp = log_buy_no_queue.get() |
| | | if temp: |
| | | logger_local_huaxin_l2_buy_no.info(f"{temp[0]}#{temp[1]}") |
| | | except: |
| | | pass |
| | | async_log_util.huaxin_l2_log.run_sync() |
| | | |
| | | |
| | | __upload_order_threads = {} |
| | |
| | | import json |
| | | import logging |
| | | import os |
| | | import threading |
| | | import time |
| | | |
| | | from huaxin_client import command_manager, trade_client_server |
| | | from huaxin_client import command_manager |
| | | from huaxin_client import constant |
| | | from huaxin_client import socket_util |
| | | import traderapi |
| | |
| | | return |
| | | |
| | | # 撤买 |
| | | def cancel_buy(self, code, sinfo, order_sys_id=None, order_ref=None): |
| | | def cancel_buy(self, code, sinfo, order_sys_id=None, order_ref=None, order_action_ref=None): |
| | | if sinfo in self.__cancel_buy_sinfo_set: |
| | | raise Exception(f'撤单请求已经提交:{sinfo}') |
| | | async_log_util.info(logger_local_huaxin_trade_debug, |
| | |
| | | req_field.OrderRef = order_ref |
| | | req_field.SessionID = self.__session_id |
| | | req_field.FrontID = self.__front_id |
| | | if order_action_ref: |
| | | req_field.OrderActionRef = order_action_ref |
| | | |
| | | # OrderActionRef报单操作引用,用法同报单引用,可根据需要选填 |
| | | |
| | |
| | | nRequestID: "int") -> "void": |
| | | try: |
| | | if pRspInfoField.ErrorID == 0: |
| | | async_log_util.info(logger_local_huaxin_trade_debug, '[%d] OnRspOrderInsert: OK! [%d]' % (round(time.time() * 1000), nRequestID)) |
| | | async_log_util.info(logger_local_huaxin_trade_debug, |
| | | '[%d] OnRspOrderInsert: OK! [%d]' % (round(time.time() * 1000), nRequestID)) |
| | | else: |
| | | async_log_util.error(logger_local_huaxin_trade_debug, |
| | | f"OnRspOrderInsert 报单出错:{pRspInfoField.ErrorID}-{pRspInfoField.ErrorMsg}") |
| | |
| | | code = data["code"] |
| | | orderSysID = data.get("orderSysID") |
| | | orderRef = data.get("orderRef") |
| | | orderActionRef = data.get("orderActionRef") |
| | | sinfo = data["sinfo"] |
| | | if direction == 1: |
| | | # 撤买 |
| | |
| | | req_rid_dict[sinfo] = (client_id, request_id, sk) |
| | | self.trade_thread_pool.submit( |
| | | lambda: self.__tradeSimpleApi.cancel_buy(code, sinfo, order_sys_id=orderSysID, |
| | | order_ref=orderRef)) |
| | | order_ref=orderRef, order_action_ref=orderActionRef)) |
| | | async_log_util.info(logger_local_huaxin_trade_debug, |
| | | f"撤单结束:code-{code} order_sys_id-{orderSysID} sinfo-{sinfo}") |
| | | except Exception as e: |
| | |
| | | # send_response( |
| | | # json.dumps({"type": "response", "data": {"code": 0, "data": data}, "client_id": client_id, |
| | | # "request_id": request_id}), type, client_id, request_id, temp_params[2]) |
| | | |
| | | if trade_response: |
| | | trade_response.OnTradeResponse( |
| | | {"type": "response", "data": {"code": 0, "data": data}, "client_id": client_id, |
| | | "request_id": request_id}) |
| | | else: |
| | | send_response( |
| | | json.dumps({"type": "response", "data": {"code": 0, "data": data}, "client_id": client_id, |
| | | "request_id": request_id}), type, client_id, request_id, temp_params[2]) |
| | | |
| | | async_log_util.info(logger_local_huaxin_trade_debug, "API回调结束 req_id-{} request_id-{}", req_id, request_id) |
| | | else: |
| | | async_log_util.info(logger_local_huaxin_trade_debug, "非API回调 req_id-{}", req_id) |
| | | if trade_response: |
| | | trade_response.OnTradeCallback({"type": "trade_callback", "data": {"code": 0, "data": data, "type": type}}) |
| | | # # 非API回调 |
| | | # send_response( |
| | | # json.dumps({"type": "trade_callback", "data": {"code": 0, "data": data, "type": type}}), |
| | | # type, |
| | | # None, |
| | | # req_id) |
| | | else: |
| | | send_response( |
| | | json.dumps({"type": "trade_callback", "data": {"code": 0, "data": data, "type": type}}), |
| | | type, |
| | | None, |
| | | req_id) |
| | | except Exception as e: |
| | | logging.exception(e) |
| | | |
| | |
| | | time.sleep(2) |
| | | |
| | | |
| | | def run(trade_response_: TradeResponse, pipe_l2=None, pipe_strategy=None): |
| | | def run(trade_response_: TradeResponse=None, pipe_l2=None, pipe_strategy=None): |
| | | try: |
| | | logger_system.info("交易进程ID:{}", os.getpid()) |
| | | logger_system.info(f"trade 线程ID:{tool.get_thread_id()}") |
| | |
| | | |
| | | global trade_response |
| | | trade_response = trade_response_ |
| | | |
| | | t1 = threading.Thread(target=lambda: trade_client_server.run(), daemon=True) |
| | | t1.start() |
| | | |
| | | global tradeCommandManager |
| | | tradeCommandManager = command_manager.TradeCommandManager() |
| | |
| | | # l2数据的日志 |
| | | import time |
| | | |
| | | from log_module import log |
| | | from log_module import log, async_log_util |
| | | from l2 import l2_log |
| | | |
| | | |
| | |
| | | timestamp = int(time.time() * 1000) |
| | | # 只记录耗时较长的信息 |
| | | if time_ > 1 or force: |
| | | log.logger_l2_process_time.info("{}-{} {}: {}-{}{}", l2_log.threadIds.get(code), timestamp, description, code, time_, |
| | | async_log_util.info(log.logger_l2_process_time, "{}-{} {}: {}-{}{}", l2_log.threadIds.get(code), timestamp, |
| | | description, code, time_, |
| | | "\n" if new_line else "") |
| | | return timestamp |
| | | |
| | | |
| | | def l2_time_log(code, description): |
| | | async_log_util.info(log.logger_l2_process_time, "{}-{}: {}", l2_log.threadIds.get(code), code, |
| | | description) |
| | | |
| | | |
| | | class TradeLog: |
| | | |
| | | def __init__(self, thread_id): |
| | |
| | | from trade import trade_manager, trade_queue_manager, l2_trade_factor, l2_trade_util, \ |
| | | trade_result_manager, current_price_process_manager, trade_data_manager, trade_huaxin |
| | | from l2 import safe_count_manager, l2_data_manager, l2_log, l2_data_source_util, code_price_manager, \ |
| | | transaction_progress, cancel_buy_strategy |
| | | transaction_progress, cancel_buy_strategy, l2_data_log |
| | | from l2.cancel_buy_strategy import SecondCancelBigNumComputer, HourCancelBigNumComputer, DCancelBigNumComputer, \ |
| | | LCancelBigNumComputer |
| | | from l2.l2_data_manager import L2DataException |
| | |
| | | # 处理华鑫L2数据 |
| | | @classmethod |
| | | def process_huaxin(cls, code, origin_datas): |
| | | print("process_huaxin", code, len(origin_datas)) |
| | | datas = None |
| | | origin_start_time = round(t.time() * 1000) |
| | | try: |
| | | l2_data_log.l2_time_log(code, "开始加载历史数据") |
| | | # 加载历史的L2数据 |
| | | is_normal = l2.l2_data_util.load_l2_data(code, load_latest=False) |
| | | if not is_normal: |
| | | print("历史数据异常:", code) |
| | | # 数据不正常需要禁止交易 |
| | | l2_trade_util.forbidden_trade(code, msg="L2历史数据异常") |
| | | origin_start_time = round(t.time() * 1000) |
| | | # 转换数据格式 |
| | | _start_index = 0 |
| | | total_datas = local_today_datas.get(code) |
| | | if total_datas: |
| | | _start_index = total_datas[-1]["index"] + 1 |
| | | l2_data_log.l2_time_log(code, "开始格式化原始数据") |
| | | datas = l2_huaxin_util.get_format_l2_datas(code, origin_datas, |
| | | gpcode_manager.get_limit_up_price(code), _start_index) |
| | | __start_time = round(t.time() * 1000) |
| | | l2_data_log.l2_time_log(code, "开始处理数据") |
| | | if len(datas) > 0: |
| | | cls.process_add_datas(code, datas, 0, __start_time) |
| | | else: |
| | | pass |
| | | # lp = LineProfiler() |
| | | # lp.enable() |
| | | # lp_wrap = lp(cls.process_add_datas) |
| | | # lp_wrap(code, datas, 0, __start_time) |
| | | # output = io.StringIO() |
| | | # lp.print_stats(stream=output) |
| | | # lp.disable() |
| | | # with open(f"/home/logs/profile/{code}_{datas[0]['index']}_{datas[-1]['index']}.txt", 'w') as f: |
| | | # f.write(output.getvalue()) |
| | | # lp.dump_stats(f"/home/logs/profile/{code}_{round(t.time() * 1000)}.txt") |
| | | except Exception as e: |
| | | async_log_util.error(logger_l2_error,f"code:{code}") |
| | | async_log_util.exception(logger_l2_error, e) |
| | | finally: |
| | | # l2_data_log.l2_time(code, round(t.time() * 1000) - origin_start_time, |
| | | # "l2数据处理总耗时", |
| | | # True) |
| | | if datas: |
| | | l2_data_log.l2_time_log(code, "开始保存数据") |
| | | l2.l2_data_util.save_l2_data(code, None, datas) |
| | | |
| | | @classmethod |
| | |
| | | end_index = len(total_datas) - 1 |
| | | if state == trade_manager.TRADE_STATE_BUY_DELEGATED or state == trade_manager.TRADE_STATE_BUY_PLACE_ORDER or state == trade_manager.TRADE_STATE_BUY_SUCCESS: |
| | | # 已挂单 |
| | | if True: # len(add_datas) < 10: |
| | | cls.__process_order(code, start_index, end_index, capture_timestamp, is_first_code) |
| | | else: |
| | | pass |
| | | # lp = LineProfiler() |
| | | # lp.enable() |
| | | # lp_wrap = lp(cls.__process_order) |
| | | # lp_wrap(code, start_index, end_index, capture_timestamp, is_first_code) |
| | | # output = io.StringIO() |
| | | # lp.print_stats(stream=output) |
| | | # lp.disable() |
| | | # with open( |
| | | # f"/home/logs/profile/{code}_process_order_{add_datas[0]['index']}_{add_datas[-1]['index']}.txt", |
| | | # 'w') as f: |
| | | # f.write(output.getvalue()) |
| | | else: |
| | | # 未挂单,时间相差不大才能挂单 |
| | | if l2.l2_data_util.L2DataUtil.is_same_time(now_time_str, latest_time): |
| | |
| | | add_datas[0]["index"], |
| | | add_datas[-1]["index"], round(t.time() * 1000) - __start_time, |
| | | capture_timestamp) |
| | | # __start_time = l2_data_log.l2_time(code, round(t.time() * 1000) - __start_time, |
| | | # "l2数据处理时间") |
| | | |
| | | # 处理未挂单 |
| | | @classmethod |
| | |
| | | return b_cancel_data, "S大单撤销比例触发阈值" |
| | | except Exception as e: |
| | | logging.exception(e) |
| | | async_log_util.error(logger_l2_error, f"参数:buy_single_index-{_buy_single_index} buy_exec_index-{_buy_exec_index}") |
| | | async_log_util.error(logger_l2_error, |
| | | f"参数:buy_single_index-{_buy_single_index} buy_exec_index-{_buy_exec_index}") |
| | | async_log_util.exception(logger_l2_error, e) |
| | | finally: |
| | | # l2_data_log.l2_time(code, round(t.time() * 1000) - _start_time, |
| | |
| | | if b_need_cancel and b_cancel_data: |
| | | return b_cancel_data, "H撤销比例触发阈值" |
| | | except Exception as e: |
| | | async_log_util.error(logger_l2_error, f"参数:buy_single_index-{_buy_single_index} buy_exec_index-{_buy_exec_index}") |
| | | async_log_util.error(logger_l2_error, |
| | | f"参数:buy_single_index-{_buy_single_index} buy_exec_index-{_buy_exec_index}") |
| | | async_log_util.exception(logger_l2_error, e) |
| | | finally: |
| | | # l2_data_log.l2_time(code, round(t.time() * 1000) - _start_time, "已下单-H撤大单计算") |
| | |
| | | if b_need_cancel and b_cancel_data: |
| | | return b_cancel_data, "L撤销比例触发阈值" |
| | | except Exception as e: |
| | | async_log_util.error(logger_l2_error, f"参数:buy_single_index-{_buy_single_index} buy_exec_index-{_buy_exec_index}") |
| | | async_log_util.error(logger_l2_error, |
| | | f"参数:buy_single_index-{_buy_single_index} buy_exec_index-{_buy_exec_index}") |
| | | async_log_util.exception(logger_l2_error, e) |
| | | finally: |
| | | # l2_data_log.l2_time(code, round(t.time() * 1000) - _start_time, "已下单-L撤大单计算") |
| | |
| | | |
| | | unique_key = f"{compute_start_index}-{compute_end_index}" |
| | | if cls.__latest_process_not_order_unique_keys.get(code) == unique_key: |
| | | async_log_util.error(logger_l2_error, f"重复处理数据:code-{code} start_index-{compute_start_index} end_index-{compute_end_index}") |
| | | async_log_util.error(logger_l2_error, |
| | | f"重复处理数据:code-{code} start_index-{compute_start_index} end_index-{compute_end_index}") |
| | | return |
| | | cls.__latest_process_not_order_unique_keys[code] = unique_key |
| | | |
| | |
| | | from db import redis_manager_delegate as redis_manager |
| | | from utils import tool |
| | | |
| | | __db = 1 |
| | | _redisManager = redis_manager.RedisManager(1) |
| | | # l2数据管理 |
| | | # 本地最新一次上传的数据 |
| | |
| | | # 保存l2数据 |
| | | def save_l2_data(code, datas, add_datas): |
| | | # 只有有新曾数据才需要保存 |
| | | if len(add_datas) > 0: |
| | | if add_datas: |
| | | # 保存最近的数据 |
| | | __start_time = round(time.time() * 1000) |
| | | if datas: |
| | | RedisUtils.setex_async(_redisManager.getRedis(), "l2-data-latest-{}".format(code), tool.get_expire(), |
| | | RedisUtils.setex_async(__db, "l2-data-latest-{}".format(code), tool.get_expire(), |
| | | json.dumps(datas)) |
| | | # l2_data_log.l2_time(code, round(time.time() * 1000) - __start_time, "保存最近l2数据用时") |
| | | # 设置进内存 |
| | |
| | | async_log_util.l2_data_log.info(log.logger_l2_data, f"{code}-{add_datas}") |
| | | except Exception as e: |
| | | logging.exception(e) |
| | | # 暂时不将数据保存到redis |
| | | # saveL2Data(code, add_datas) |
| | | |
| | | |
| | | # 设置最新的l2数据采集的数量 |
| | |
| | | l2_data_log = AsyncLogManager() |
| | | |
| | | |
| | | huaxin_l2_log = AsyncLogManager() |
| | | |
| | | |
| | | log_queue = queue.Queue() |
| | | |
| | | |
| | |
| | | for line in lines: |
| | | if line: |
| | | data = line.split(" - ")[1].strip() |
| | | if data.startswith("["): |
| | | data = data[data.find("]") + 1:].strip() |
| | | code = data.split("#")[0] |
| | | buy_no = int(data.split("#")[1]) |
| | | if code not in fdatas: |
| | |
| | | t1.start() |
| | | # |
| | | # 启动华鑫交易服务 |
| | | t1 = threading.Thread( |
| | | target=lambda: trade_server.run(pipe_trade, pipe_l1, pipe_l2, huaxin_client.trade_client.process_cmd), |
| | | name="trade_server", daemon=True) |
| | | t1.start() |
| | | |
| | | huaxin_client.trade_client.run(trade_server.my_trade_response, ptl2_trade, pst_trade) |
| | | trade_server.run(pipe_trade, pipe_l1, pipe_l2, huaxin_client.trade_client.process_cmd) |
| | | |
| | | |
| | | # 主服务 |
| | |
| | | l1Process = multiprocessing.Process(target=huaxin_client.l1_client.run, args=(pl1t_l1,)) |
| | | l1Process.start() |
| | | |
| | | # 交易进程 |
| | | tradeProcess = multiprocessing.Process( |
| | | target=lambda: huaxin_client.trade_client.run(None, ptl2_trade, pst_trade)) |
| | | tradeProcess.start() |
| | | |
| | | # 主进程 |
| | | createTradeServer(pss_strategy, pst_strategy, pl1t_strategy, psl2_strategy, ptl2_l2, psl2_l2, ptl2_trade, |
| | | pst_trade) |
| | | |
| | | # 将tradeServer作为主进程 |
| | | l1Process.join() |
| | | tradeProcess.join() |
| | | except Exception as e: |
| | | logger_system.exception(e) |
| | |
| | | |
| | | # pipe的交易通道是否正常 |
| | | def is_pipe_channel_normal(): |
| | | return False |
| | | return True |
| | | |
| | | |
| | | # 测试交易通道 |
| | |
| | | def cancel_order(direction, code, orderSysID, orderRef=None, blocking=False, sinfo=None, request_id=None): |
| | | if not sinfo: |
| | | sinfo = f"cb_{code}_{round(time.time() * 1000)}_{random.randint(0, 10000)}" |
| | | order_action_ref = huaxin_util.create_order_ref() |
| | | request_id = __request(ClientSocketManager.CLIENT_TYPE_TRADE, |
| | | {"type": ClientSocketManager.CLIENT_TYPE_TRADE, "trade_type": 2, |
| | | "direction": direction, |
| | | "code": code, |
| | | "orderRef": orderRef, |
| | | "orderActionRef": order_action_ref, |
| | | "orderSysID": orderSysID, "sinfo": sinfo}, request_id=request_id, blocking=blocking, |
| | | is_pipe=is_pipe_channel_normal()) |
| | | try: |
| | |
| | | from huaxin_client import l1_subscript_codes_manager, l2_data_transform_protocol |
| | | from huaxin_client.client_network import SendResponseSkManager |
| | | from huaxin_client.trade_transform_protocol import TradeResponse |
| | | from l2 import l2_data_manager_new, l2_log, code_price_manager, l2_data_util, l2_data_manager, transaction_progress |
| | | from l2 import l2_data_manager_new, l2_log, code_price_manager, l2_data_util, l2_data_manager, transaction_progress, \ |
| | | l2_data_log |
| | | from l2.cancel_buy_strategy import HourCancelBigNumComputer, LCancelBigNumComputer, DCancelBigNumComputer, \ |
| | | GCancelBigNumComputer, SecondCancelBigNumComputer |
| | | from l2.huaxin import huaxin_target_codes_manager |
| | |
| | | from log_module import async_log_util, log_export |
| | | from log_module.log import hx_logger_l2_upload, hx_logger_contact_debug, hx_logger_trade_callback, \ |
| | | hx_logger_l2_orderdetail, hx_logger_l2_transaction, hx_logger_l2_market_data, logger_l2_trade_buy_queue, \ |
| | | logger_l2_g_cancel, logger_debug, logger_system, logger_trade |
| | | logger_l2_g_cancel, logger_debug, logger_system, logger_trade, logger_l2_process |
| | | from third_data import block_info, kpl_api, kpl_data_manager |
| | | from third_data.code_plate_key_manager import KPLCodeJXBlockManager, CodePlateKeyBuyManager |
| | | from third_data.history_k_data_util import JueJinApi, HistoryKDatasUtils |
| | |
| | | f"{code}#耗时:{int(time.time() * 1000) - timestamp}-{now_timestamp}#{_datas}") |
| | | thread_id = random.randint(0, 100000) |
| | | l2_log.threadIds[code] = thread_id |
| | | # async_log_util.info(hx_logger_l2_upload, f"{code}数据处理开始:{thread_id}") |
| | | l2_data_log.l2_time_log(code, "开始处理L2逐笔委托") |
| | | try: |
| | | l2_data_manager_new.L2TradeDataProcessor.process_huaxin(code, _datas) |
| | | finally: |
| | | pass |
| | | # async_log_util.info(hx_logger_l2_upload, f"{code}数据处理结束:{thread_id}") |
| | | async_log_util.info(logger_l2_process, "code:{} 处理数据数量: {} 最终处理时间:{}", code, |
| | | len(_datas), |
| | | round(time.time() * 1000) - now_timestamp) |
| | | l2_data_log.l2_time_log(code, "处理L2逐笔委托结束") |
| | | |
| | | @classmethod |
| | | def l2_transaction(cls, code, datas): |