| | |
| | | "passwd": "Yeshi2016@" |
| | | } |
| | | |
| | | MAX_L2_CHANNEL_COUNT = 10 |
| | | |
| | | # 获取根路径 |
| | | def get_path_prefix(): |
New file |
| | |
| | | # -*- coding: utf-8 -*- |
| | | import json |
| | | import logging |
| | | import multiprocessing |
| | | import os |
| | | import queue |
| | | import threading |
| | | import time |
| | | import concurrent.futures |
| | | from typing import List |
| | | |
| | | from huaxin_client import command_manager, l2_data_transform_protocol |
| | | from huaxin_client import constant |
| | | from huaxin_client import l2_data_manager |
| | | import lev2mdapi |
| | | from huaxin_client.code_queue_distribute_manager import CodeQueueDistributeManager, CodeDataCallbackDistributeManager |
| | | from huaxin_client.command_manager import L2ActionCallback |
| | | from huaxin_client.l2_data_manager import L2DataUploadManager |
| | | from huaxin_client.l2_data_transform_protocol import L2DataCallBack |
| | | from log_module import log, async_log_util |
| | | from log_module.async_log_util import huaxin_l2_log |
| | | from log_module.log import logger_local_huaxin_l2_subscript, logger_system, logger_local_huaxin_l2_transaction, \ |
| | | logger_local_huaxin_g_cancel, logger_l2_codes_subscript |
| | | from utils import tool |
| | | |
| | | ###B类### |
| | | Front_Address = "tcp://10.0.1.101:6900" |
| | | Multicast_Address = "udp://224.224.2.19:7889" |
| | | Multicast_Address2 = "udp://224.224.224.234:7890" |
| | | Local_Interface_Address = constant.LOCAL_IP |
| | | |
| | | g_SubMarketData = False |
| | | g_SubTransaction = False |
| | | g_SubOrderDetail = False |
| | | g_SubXTSTick = False |
| | | g_SubXTSMarketData = False |
| | | g_SubNGTSTick = False |
| | | g_SubBondMarketData = False |
| | | g_SubBondTransaction = False |
| | | g_SubBondOrderDetail = False |
| | | |
| | | SH_Securities = [b"603000", b"600225", b"600469", b"600616", b"600059", b"002849", b"605188", b"603630", b"600105", |
| | | b"603773", b"603915", b"603569", b"603322", b"603798", b"605198", b"603079", b"600415", b"600601"] |
| | | SH_XTS_Securities = [b"018003", b"113565"] |
| | | |
| | | SZ_Securities = [b"002456", b"002849", b"002281", b"002336", b"000936", b"000920", b"000757", b"002896", b"002725", |
| | | b"000952", b"000526", b"000753", b"000681", b"002088", b"002436"] |
| | | SZ_Bond_Securities = [b"100303", b"109559", b"112617"] |
| | | set_codes_data_queue = queue.Queue() |
| | | market_code_dict = {} |
| | | |
| | | |
| | | class Lev2MdSpi(lev2mdapi.CTORATstpLev2MdSpi): |
| | | latest_codes_set = set() |
| | | |
| | | special_code_volume_for_order_dict = {} |
| | | # 已经订阅的代码 |
| | | subscripted_codes = set() |
| | | # 代码的上次成交的订单唯一索引 |
| | | __last_transaction_keys_dict = {} |
| | | |
| | | # 买入的大单订单号 |
| | | |
| | | def __init__(self, api, l2_data_upload_manager: L2DataUploadManager): |
| | | lev2mdapi.CTORATstpLev2MdSpi.__init__(self) |
| | | self.__api = api |
| | | self.is_login = False |
| | | self.l2_data_upload_manager = l2_data_upload_manager |
| | | |
| | | def __split_codes(self, codes): |
| | | szse_codes = [] |
| | | sse_codes = [] |
| | | for code in codes: |
| | | if code.find("00") == 0: |
| | | szse_codes.append(code.encode()) |
| | | elif code.find("60") == 0: |
| | | sse_codes.append(code.encode()) |
| | | return sse_codes, szse_codes |
| | | |
| | | # 新增订阅 |
| | | |
| | | # 取消订阅 |
| | | def __unsubscribe(self, _codes): |
| | | sh, sz = self.__split_codes(_codes) |
| | | logger_local_huaxin_l2_subscript.info(f"取消订阅上证:{sh}") |
| | | logger_local_huaxin_l2_subscript.info(f"取消订阅深证:{sz}") |
| | | if sh: |
| | | # 取消订阅逐笔委托 |
| | | self.__api.UnSubscribeOrderDetail(sh, lev2mdapi.TORA_TSTP_EXD_SSE) |
| | | # 取消订阅逐笔成交 |
| | | self.__api.UnSubscribeTransaction(sh, lev2mdapi.TORA_TSTP_EXD_SSE) |
| | | if sz: |
| | | self.__api.UnSubscribeOrderDetail(sz, lev2mdapi.TORA_TSTP_EXD_SZSE) |
| | | self.__api.UnSubscribeTransaction(sz, lev2mdapi.TORA_TSTP_EXD_SZSE) |
| | | |
| | | def __subscribe(self, _codes): |
| | | sh, sz = self.__split_codes(_codes) |
| | | logger_local_huaxin_l2_subscript.info(f"订阅上证:{sh}") |
| | | logger_local_huaxin_l2_subscript.info(f"订阅深证:{sz}") |
| | | if sh: |
| | | # 订阅逐笔委托 |
| | | result = self.__api.SubscribeOrderDetail(sh, lev2mdapi.TORA_TSTP_EXD_SSE) |
| | | logger_local_huaxin_l2_subscript.info(f"逐笔委托订阅结果sh:{result}") |
| | | # 订阅逐笔成交 |
| | | result = self.__api.SubscribeTransaction(sh, lev2mdapi.TORA_TSTP_EXD_SSE) |
| | | logger_local_huaxin_l2_subscript.info(f"逐笔成交订阅结果sh:{result}") |
| | | if sz: |
| | | result = self.__api.SubscribeOrderDetail(sz, lev2mdapi.TORA_TSTP_EXD_SZSE) |
| | | logger_local_huaxin_l2_subscript.info(f"逐笔委托订阅结果sz:{result}") |
| | | result = self.__api.SubscribeTransaction(sz, lev2mdapi.TORA_TSTP_EXD_SZSE) |
| | | logger_local_huaxin_l2_subscript.info(f"逐笔成交订阅结果sz:{result}") |
| | | |
| | | def __process_codes_data(self, codes, from_cache=False, delay=0.0): |
| | | codes = set(codes) |
| | | if not self.is_login and not constant.TEST: |
| | | raise Exception("L2尚未登录") |
| | | if delay > 0: |
| | | time.sleep(delay) |
| | | add_codes = codes - self.subscripted_codes |
| | | del_codes = self.subscripted_codes - codes |
| | | print("add del codes", add_codes, del_codes) |
| | | try: |
| | | for c in del_codes: |
| | | self.l2_data_upload_manager.release_distributed_upload_queue(c) |
| | | l2_data_manager.del_target_code(c) |
| | | for c in codes: |
| | | self.l2_data_upload_manager.distribute_upload_queue(c) |
| | | l2_data_manager.add_target_code(c) |
| | | except Exception as e: |
| | | logger_system.error(f"L2代码分配上传队列出错:{str(e)}") |
| | | logger_system.exception(e) |
| | | self.__subscribe(add_codes) |
| | | self.__unsubscribe(del_codes) |
| | | if add_codes: |
| | | logger_system.info(f"新增L2订阅代码数量({'缓存' if from_cache else ''}):{len(add_codes)}") |
| | | logger_l2_codes_subscript.info("华鑫L2订阅结束,add-{} del-{}", len(add_codes), len(del_codes)) |
| | | # 设置最近的代码列表 |
| | | self.latest_codes_set = codes |
| | | |
| | | # 订阅代码,[代码,...] |
| | | def set_codes_data(self, codes): |
| | | try: |
| | | self.__process_codes_data(codes) |
| | | except Exception as e: |
| | | logging.exception(e) |
| | | logger_l2_codes_subscript.exception(e) |
| | | finally: |
| | | # 保存一份最新的数据 |
| | | self.__set_latest_datas(codes) |
| | | |
| | | @classmethod |
| | | def __set_latest_datas(cls, codes_data): |
| | | data_str = json.dumps([tool.get_now_date_str(), codes_data]) |
| | | with open(constant.L2_CODES_INFO_PATH, mode='w') as f: |
| | | f.write(data_str) |
| | | |
| | | @classmethod |
| | | def __get_latest_datas(cls): |
| | | if os.path.exists(constant.L2_CODES_INFO_PATH): |
| | | with open(constant.L2_CODES_INFO_PATH, mode='r') as f: |
| | | str_ = f.readline() |
| | | data_json = json.loads(str_) |
| | | if data_json[0] == tool.get_now_date_str(): |
| | | return data_json[1] |
| | | return [] |
| | | |
| | | def OnFrontConnected(self): |
| | | print("OnFrontConnected") |
| | | logger_system.info(f"l2_client OnFrontConnected 线程ID:{tool.get_thread_id()}") |
| | | logout_req = lev2mdapi.CTORATstpUserLogoutField() |
| | | self.__api.ReqUserLogout(logout_req, 1) |
| | | time.sleep(1) |
| | | # 请求登录 |
| | | login_req = lev2mdapi.CTORATstpReqUserLoginField() |
| | | self.__api.ReqUserLogin(login_req, 2) |
| | | |
| | | def OnRspUserLogin(self, pRspUserLogin, pRspInfo, nRequestID, bIsLast): |
| | | print("OnRspUserLogin: ErrorID[%d] ErrorMsg[%s] RequestID[%d] IsLast[%d]" % ( |
| | | pRspInfo['ErrorID'], pRspInfo['ErrorMsg'], nRequestID, bIsLast)) |
| | | if pRspInfo['ErrorID'] == 0: |
| | | print("----L2行情登录成功----") |
| | | self.is_login = True |
| | | logger_system.info(f"L2行情登录成功") |
| | | # 初始设置值 |
| | | threading.Thread( |
| | | target=lambda: self.__process_codes_data(self.__get_latest_datas(), from_cache=True, delay=6.0), |
| | | daemon=True).start() |
| | | |
| | | def OnRspSubOrderDetail(self, pSpecificSecurity, pRspInfo, nRequestID, bIsLast): |
| | | print("OnRspSubOrderDetail", pRspInfo) |
| | | # try: |
| | | print("订阅结果:", pSpecificSecurity["ExchangeID"], pSpecificSecurity["SecurityID"], pRspInfo["ErrorID"], |
| | | pRspInfo["ErrorMsg"]) |
| | | async_log_util.info(logger_local_huaxin_l2_subscript, |
| | | f"订阅结果:{pSpecificSecurity['SecurityID']} {pRspInfo['ErrorID']} {pRspInfo['ErrorMsg']}") |
| | | if pRspInfo["ErrorID"] == 0: |
| | | print("订阅成功") |
| | | self.subscripted_codes.add(pSpecificSecurity['SecurityID']) |
| | | if bIsLast == 1: |
| | | print("订阅响应结束", self.subscripted_codes) |
| | | l2_data_manager.add_subscript_codes(self.subscripted_codes) |
| | | |
| | | def OnRspUnSubOrderDetail(self, pSpecificSecurity, pRspInfo, nRequestID, bIsLast): |
| | | print("OnRspUnSubOrderDetail", bIsLast) |
| | | try: |
| | | code = pSpecificSecurity['SecurityID'] |
| | | self.subscripted_codes.discard(code) |
| | | if bIsLast == 1: |
| | | print("取消订阅响应结束", self.subscripted_codes) |
| | | l2_data_manager.add_subscript_codes(self.subscripted_codes) |
| | | except Exception as e: |
| | | logging.exception(e) |
| | | |
| | | def OnRtnTransaction(self, pTransaction): |
| | | # 输出逐笔成交数据 |
| | | if pTransaction['ExecType'] == b"2": |
| | | # 撤单 |
| | | item = {"SecurityID": pTransaction['SecurityID'], "Price": pTransaction['TradePrice'], |
| | | "Volume": pTransaction['TradeVolume'], |
| | | "OrderType": "2", |
| | | "OrderTime": pTransaction['TradeTime'], "MainSeq": pTransaction['MainSeq'], |
| | | "SubSeq": pTransaction['SubSeq'], |
| | | "OrderStatus": "D"} |
| | | buyNo = pTransaction['BuyNo'] |
| | | sellNo = pTransaction['SellNo'] |
| | | if buyNo > 0: |
| | | # 买 |
| | | item["OrderNO"] = buyNo |
| | | item["Side"] = "1" |
| | | elif sellNo > 0: |
| | | # 卖 |
| | | item["OrderNO"] = sellNo |
| | | item["Side"] = "2" |
| | | self.l2_data_upload_manager.add_l2_order_detail(item, 0, True) |
| | | else: |
| | | item = {"SecurityID": pTransaction['SecurityID'], "TradePrice": pTransaction['TradePrice'], |
| | | "TradeVolume": pTransaction['TradeVolume'], |
| | | "OrderTime": pTransaction['TradeTime'], "MainSeq": pTransaction['MainSeq'], |
| | | "SubSeq": pTransaction['SubSeq'], "BuyNo": pTransaction['BuyNo'], |
| | | "SellNo": pTransaction['SellNo'], |
| | | "ExecType": pTransaction['ExecType'].decode()} |
| | | self.l2_data_upload_manager.add_transaction_detail(item) |
| | | |
| | | def OnRtnOrderDetail(self, pOrderDetail): |
| | | # 输出逐笔委托数据 |
| | | # 上证OrderStatus=b"D"表示撤单 |
| | | item = {"SecurityID": pOrderDetail['SecurityID'], "Price": pOrderDetail['Price'], |
| | | "Volume": pOrderDetail['Volume'], |
| | | "Side": pOrderDetail['Side'].decode(), "OrderType": pOrderDetail['OrderType'].decode(), |
| | | "OrderTime": pOrderDetail['OrderTime'], "MainSeq": pOrderDetail['MainSeq'], |
| | | "SubSeq": pOrderDetail['SubSeq'], "OrderNO": pOrderDetail['OrderNO'], |
| | | "OrderStatus": pOrderDetail['OrderStatus'].decode()} |
| | | self.l2_data_upload_manager.add_l2_order_detail(item, 0) |
| | | |
| | | |
| | | class MyL2ActionCallback(L2ActionCallback): |
| | | |
| | | def OnSetL2Position(self, codes): |
| | | huaxin_l2_log.info(logger_l2_codes_subscript, "华鑫L2代码处理队列获取到数据:数量-{}", len(codes)) |
| | | try: |
| | | spi.set_codes_data(codes) |
| | | except Exception as e: |
| | | logging.exception(e) |
| | | |
| | | |
| | | def __init_l2(l2_data_upload_manager): |
| | | print(lev2mdapi.CTORATstpLev2MdApi_GetApiVersion()) |
| | | # case 1: Tcp方式 |
| | | # g_SubMode=lev2mdapi.TORA_TSTP_MST_TCP |
| | | # case 2: 组播方式 |
| | | g_SubMode = lev2mdapi.TORA_TSTP_MST_MCAST |
| | | |
| | | # case 1缓存模式 |
| | | api = lev2mdapi.CTORATstpLev2MdApi_CreateTstpLev2MdApi(g_SubMode, True) |
| | | # case 2非缓存模式 |
| | | # api = lev2mdapi.CTORATstpLev2MdApi_CreateTstpLev2MdApi(g_SubMode, False) |
| | | global spi |
| | | spi = Lev2MdSpi(api, l2_data_upload_manager) |
| | | api.RegisterSpi(spi) |
| | | # -------------------正式模式------------------------------------- |
| | | if g_SubMode != lev2mdapi.TORA_TSTP_MST_MCAST: |
| | | api.RegisterFront(Front_Address) |
| | | else: |
| | | # case 1 从一个组播地址收取行情 |
| | | api.RegisterMulticast(Multicast_Address, Local_Interface_Address, "") |
| | | # api.RegisterMulticast(Multicast_Address2, Local_Interface_Address, "") |
| | | |
| | | # case 2:注册多个组播地址同时收行情 |
| | | # api.RegisterMulticast(Multicast_Address, Local_Interface_Address, ""); |
| | | # api.RegisterMulticast(Multicast_Address2, Local_Interface_Address, ""); |
| | | |
| | | # case 3:efvi模式收行情 |
| | | # api.RegisterMulticast(Multicast_Address, Local_Interface_Address, "", "enp101s0f0",4096, True); |
| | | |
| | | # case 1 不绑核运行 |
| | | api.Init() |
| | | |
| | | |
| | | __l2_cmd_thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=3) |
| | | |
| | | |
| | | def __receive_from_queue_trade(queue_trade_w_l2_r: multiprocessing.Queue): |
| | | logger_system.info(f"l2_client __receive_from_pipe_trade 线程ID:{tool.get_thread_id()}") |
| | | while True: |
| | | try: |
| | | value = queue_trade_w_l2_r.get() |
| | | if value: |
| | | if type(value) == bytes: |
| | | value = value.decode("utf-8") |
| | | data = json.loads(value) |
| | | _type = data["type"] |
| | | if _type == "l2_cmd": |
| | | __start_time = time.time() |
| | | # 线程池 |
| | | __l2_cmd_thread_pool.submit( |
| | | lambda: l2CommandManager.process_command(command_manager.CLIENT_TYPE_CMD_L2, None, data)) |
| | | use_time = time.time() - __start_time |
| | | if use_time > 0.005: |
| | | huaxin_l2_log.info(logger_local_huaxin_l2_subscript, f"l2_cmd耗时:{use_time}s") |
| | | |
| | | except Exception as e: |
| | | logging.exception(e) |
| | | |
| | | |
| | | pipe_strategy = None |
| | | |
| | | |
| | | def test_add_codes(queue_r): |
| | | time.sleep(10) |
| | | # if value: |
| | | # if type(value) == bytes: |
| | | # value = value.decode("utf-8") |
| | | # data = json.loads(value) |
| | | # _type = data["type"] |
| | | # if _type == "listen_volume": |
| | | # volume = data["data"]["volume"] |
| | | # code = data["data"]["code"] |
| | | # spi.set_code_special_watch_volume(code, volume) |
| | | # elif _type == "l2_cmd": |
| | | # l2CommandManager.process_command(command_manager.CLIENT_TYPE_CMD_L2, None, data) |
| | | |
| | | time.sleep(2) |
| | | demo_datas = ["603002", |
| | | "002654", |
| | | "603701", |
| | | "002908"] |
| | | queue_r.put_nowait(json.dumps({"type": "l2_cmd", "data": demo_datas})) |
| | | time.sleep(10) |
| | | while True: |
| | | try: |
| | | spi.l2_data_upload_manager.add_l2_order_detail( |
| | | {'SecurityID': '603002', 'Price': 6.35, 'Volume': 275000, 'Side': "1", 'OrderType': '0', |
| | | 'OrderTime': '13000015', |
| | | 'MainSeq': 2, 'SubSeq': 6739147, 'OrderNO': 5512466, 'OrderStatus': 'D'}, 0) |
| | | spi.l2_data_upload_manager.add_l2_order_detail( |
| | | {'SecurityID': '603002', 'Price': 6.35, 'Volume': 200, 'Side': "1", 'OrderType': '0', |
| | | 'OrderTime': '13000015', |
| | | 'MainSeq': 2, 'SubSeq': 6739148, 'OrderNO': 5512467, 'OrderStatus': 'D'}, 0) |
| | | # queue_r.put_nowait(json.dumps({"type": "listen_volume", "data": {"code": "603002", "volume": 100}})) |
| | | time.sleep(0.1) |
| | | spi.l2_data_upload_manager.add_l2_order_detail( |
| | | {'SecurityID': '603002', 'Price': 6.35, 'Volume': 100, 'Side': "1", 'OrderType': '0', |
| | | 'OrderTime': '13000015', |
| | | 'MainSeq': 2, 'SubSeq': 6739148, 'OrderNO': 5512467, 'OrderStatus': 'D'}, 0) |
| | | except Exception as e: |
| | | logging.exception(e) |
| | | finally: |
| | | time.sleep(10) |
| | | |
| | | |
| | | def run(queue_r: multiprocessing.Queue, data_callbacks: list) -> None: |
| | | logger_system.info("L2进程ID:{}", os.getpid()) |
| | | logger_system.info(f"l2_client 线程ID:{tool.get_thread_id()}") |
| | | try: |
| | | log.close_print() |
| | | if queue_r is not None: |
| | | t1 = threading.Thread(target=lambda: __receive_from_queue_trade(queue_r), daemon=True) |
| | | t1.start() |
| | | |
| | | # 初始化 |
| | | data_callback_distribute_manager = CodeDataCallbackDistributeManager(data_callbacks) |
| | | l2_data_upload_manager = L2DataUploadManager(data_callback_distribute_manager) |
| | | __init_l2(l2_data_upload_manager) |
| | | # TODO 测试 |
| | | # threading.Thread(target=lambda: test_add_codes(queue_r), daemon=True).start() |
| | | global l2CommandManager |
| | | l2CommandManager = command_manager.L2CommandManager() |
| | | l2CommandManager.init(MyL2ActionCallback()) |
| | | logger_system.info("L2订阅服务启动成功") |
| | | except Exception as e: |
| | | logger_system.exception(e) |
| | | while True: |
| | | time.sleep(2) |
| | | |
| | | |
| | | if __name__ == "__main__": |
| | | input() |
New file |
| | |
| | | # -*- coding: utf-8 -*- |
| | | import logging |
| | | import marshal |
| | | import queue |
| | | import threading |
| | | import time |
| | | |
| | | # 活动时间 |
| | | from huaxin_client.code_queue_distribute_manager import CodeDataCallbackDistributeManager |
| | | from log_module.async_log_util import huaxin_l2_log |
| | | from log_module.log import logger_local_huaxin_l2_error |
| | | import collections |
| | | |
| | | order_detail_upload_active_time_dict = {} |
| | | transaction_upload_active_time_dict = {} |
| | | # 临时数据 |
| | | tmep_order_detail_queue_dict = {} |
| | | tmep_transaction_queue_dict = {} |
| | | target_codes = set() |
| | | target_codes_add_time = {} |
| | | common_queue = queue.Queue() |
| | | |
| | | |
| | | # L2上传数据管理器 |
| | | class L2DataUploadManager: |
| | | def __init__(self, data_callback_distribute_manager: CodeDataCallbackDistributeManager): |
| | | self.data_callback_distribute_manager = data_callback_distribute_manager |
| | | # 代码分配的对象 |
| | | self.temp_order_queue_dict = {} |
| | | self.temp_transaction_queue_dict = {} |
| | | self.temp_log_queue_dict = {} |
| | | |
| | | self.filter_order_condition_dict = {} |
| | | self.upload_l2_data_task_dict = {} |
| | | self.l2_order_codes = set() |
| | | self.l2_transaction_codes = set() |
| | | |
| | | # 过滤订单 |
| | | def __filter_order(self, item): |
| | | if item[1] * item[2] < 500000: |
| | | return None |
| | | return item |
| | | # 过滤订单 |
| | | |
| | | def __filter_transaction(self, item): |
| | | return item |
| | | |
| | | # 添加委托详情 |
| | | def add_l2_order_detail(self, data, start_time=0, istransaction=False): |
| | | code = data["SecurityID"] |
| | | q: collections.deque = self.temp_order_queue_dict.get(code) |
| | | q.append((data['SecurityID'], data['Price'], data['Volume'], data['Side'], data['OrderType'], data['OrderTime'], |
| | | data['MainSeq'], data['SubSeq'], data['OrderNO'], data['OrderStatus'], time.time(), start_time)) |
| | | |
| | | # 添加逐笔成交 |
| | | def add_transaction_detail(self, data): |
| | | code = data["SecurityID"] |
| | | q: collections.deque = self.temp_transaction_queue_dict.get(code) |
| | | q.append((data['SecurityID'], data['TradePrice'], data['TradeVolume'], |
| | | data['OrderTime'], data['MainSeq'], data['SubSeq'], data['BuyNo'], |
| | | data['SellNo'], data['ExecType'])) |
| | | |
| | | # 分配上传队列 |
| | | def distribute_upload_queue(self, code): |
| | | if not self.data_callback_distribute_manager.get_distributed_callback(code): |
| | | self.data_callback_distribute_manager.distribute_callback(code) |
| | | |
| | | if code not in self.temp_order_queue_dict: |
| | | self.temp_order_queue_dict[code] = collections.deque() |
| | | if code not in self.temp_transaction_queue_dict: |
| | | self.temp_transaction_queue_dict[code] = collections.deque() |
| | | if code not in self.temp_log_queue_dict: |
| | | self.temp_log_queue_dict[code] = queue.Queue() |
| | | if code not in self.upload_l2_data_task_dict: |
| | | t1 = threading.Thread(target=lambda: self.__run_upload_order_task(code), daemon=True) |
| | | t1.start() |
| | | t2 = threading.Thread(target=lambda: self.__run_upload_transaction_task(code), daemon=True) |
| | | t2.start() |
| | | self.upload_l2_data_task_dict[code] = (t1, t2) |
| | | # 释放已经分配的队列 |
| | | |
| | | def release_distributed_upload_queue(self, code): |
| | | self.data_callback_distribute_manager.release_distribute_callback(code) |
| | | if code in self.temp_order_queue_dict: |
| | | self.temp_order_queue_dict[code].clear() |
| | | self.temp_order_queue_dict.pop(code) |
| | | if code in self.temp_transaction_queue_dict: |
| | | self.temp_transaction_queue_dict[code].clear() |
| | | self.temp_transaction_queue_dict.pop(code) |
| | | if code in self.temp_log_queue_dict: |
| | | self.temp_log_queue_dict.pop(code) |
| | | |
| | | if code in self.upload_l2_data_task_dict: |
| | | self.upload_l2_data_task_dict.pop(code) |
| | | |
| | | def __upload_l2_data(self, code, _queue, datas): |
| | | _queue.put_nowait(marshal.dumps([code, datas, time.time()])) |
| | | |
| | | # 处理订单数据并上传 |
| | | def __run_upload_order_task(self, code): |
| | | q: collections.deque = self.temp_order_queue_dict.get(code) |
| | | temp_list = [] |
| | | while True: |
| | | try: |
| | | while len(q) > 0: |
| | | data = q.popleft() |
| | | # 前置数据处理,过滤掉无用的数据 |
| | | data = self.__filter_order(data) |
| | | if data: |
| | | temp_list.append(data) |
| | | |
| | | if temp_list: |
| | | # 上传数据 |
| | | # self.__upload_l2_data(code, upload_queue, temp_list) |
| | | # self.__upload_l2_order_data(code, temp_list) |
| | | __start_time = time.time() |
| | | last_data = temp_list[-1] |
| | | self.data_callback_distribute_manager.get_distributed_callback(code).OnL2Order(code, temp_list, |
| | | time.time()) |
| | | use_time = time.time() - __start_time |
| | | if use_time > 0.01: |
| | | # 记录10ms以上的数据 |
| | | huaxin_l2_log.info(logger_local_huaxin_l2_error, f"耗时:{use_time}s 结束数据:{last_data}") |
| | | temp_list = [] |
| | | else: |
| | | if code not in self.temp_order_queue_dict: |
| | | self.l2_order_codes.discard(code) |
| | | break |
| | | self.l2_order_codes.add(code) |
| | | time.sleep(0.001) |
| | | |
| | | except Exception as e: |
| | | logging.exception(e) |
| | | finally: |
| | | pass |
| | | |
| | | # 处理成交数据并上传 |
| | | def __run_upload_transaction_task(self, code): |
| | | q: collections.deque = self.temp_transaction_queue_dict.get(code) |
| | | temp_list = [] |
| | | while True: |
| | | try: |
| | | while len(q) > 0: |
| | | data = q.popleft() |
| | | data = self.__filter_transaction(data) |
| | | if data: |
| | | temp_list.append(data) |
| | | if temp_list: |
| | | # 上传数据 |
| | | # self.__upload_l2_data(code, upload_queue, temp_list) |
| | | self.data_callback_distribute_manager.get_distributed_callback(code).OnL2Transaction(code, |
| | | temp_list) |
| | | temp_list = [] |
| | | else: |
| | | if code not in self.temp_transaction_queue_dict: |
| | | self.l2_transaction_codes.discard(code) |
| | | break |
| | | self.l2_transaction_codes.add(code) |
| | | time.sleep(0.001) |
| | | except: |
| | | pass |
| | | finally: |
| | | pass |
| | | |
| | | |
| | | def add_target_code(code): |
| | | target_codes.add(code) |
| | | # 记录代码加入时间 |
| | | target_codes_add_time[code] = time.time() |
| | | |
| | | |
| | | def del_target_code(code): |
| | | target_codes.discard(code) |
| | | if code in target_codes_add_time: |
| | | target_codes_add_time.pop(code) |
| | | |
| | | |
| | | def add_subscript_codes(codes): |
| | | # 加入上传队列 |
| | | common_queue.put(('', "l2_subscript_codes", list(codes))) |
New file |
| | |
| | | """ |
| | | L2数据传输协议 |
| | | """ |
| | | |
| | | |
| | | class L2DataCallBack: |
| | | # L2委托明细 |
| | | def OnL2Order(self, code, datas, timestamp): |
| | | pass |
| | | |
| | | def OnL2Transaction(self, code, datas): |
| | | pass |
| | | |
| | | def OnMarketData(self, code, datas): |
| | | pass |
| | | |
| | | def OnTradingOrderCancel(self, code, buy_no): |
| | | pass |
New file |
| | |
| | | """ |
| | | 华鑫LV2处理工具类 |
| | | """ |
| | | |
| | | |
| | | # 处理逐笔委托 |
| | | # item逐笔委托 |
| | | # (data['SecurityID'], data['Price'], data['Volume'], data['Side'], data['OrderType'], |
| | | # data['OrderTime'],data['MainSeq'], data['SubSeq'], data['OrderNO'], data['OrderStatus']) |
| | | |
| | | |
| | | def convert_time(time_str, with_ms=False): |
| | | time_str = str(time_str) |
| | | if time_str.startswith("9"): |
| | | time_str = f"0{time_str}" |
| | | ms = "{:0<3}".format(time_str[6:]) |
| | | time_ = f"{time_str[0:2]}:{time_str[2:4]}:{time_str[4:6]}" |
| | | if with_ms: |
| | | return f"{time_}.{ms}" |
| | | return time_ |
| | | |
| | | |
| | | def __convert_order(item, limit_up_price): |
| | | time_str = f"{item[5]}" |
| | | if time_str.startswith("9"): |
| | | time_str = f"0{time_str}" |
| | | ms = "{:0<3}".format(time_str[6:]) |
| | | time_ = f"{time_str[0:2]}:{time_str[2:4]}:{time_str[4:6]}" |
| | | price = item[1] |
| | | if price <= 0: |
| | | # 深证的买撤无价格数据,需要去查找价格数据,暂时设置为涨停价 |
| | | price = limit_up_price |
| | | |
| | | limitPrice = 1 if abs(limit_up_price - price) < 0.001 else 0 |
| | | operateType = 0 |
| | | if item[9] == 'D': |
| | | if item[3] == '1': |
| | | # 买撤 |
| | | operateType = 1 |
| | | else: |
| | | # 卖撤 |
| | | operateType = 3 |
| | | else: |
| | | if item[3] == '1': |
| | | # 买 |
| | | operateType = 0 |
| | | else: |
| | | # 卖 |
| | | operateType = 2 |
| | | return {"time": time_, "tms": ms, "price": price, "num": item[2] // 100, "limitPrice": limitPrice, |
| | | "operateType": operateType, "cancelTime": 0, "cancelTimeUnit": 0, "orderNo": item[8], |
| | | "mainSeq": item[6], "subSeq": item[7]} |
| | | |
| | | |
| | | # 处理l2数据 |
| | | # filter_not_limit_up : 过滤掉非涨停数据 |
| | | def __format_l2_data(origin_datas, code, limit_up_price, filter_not_limit_up=True): |
| | | datas = [] |
| | | dataIndexs = {} |
| | | same_time_num = {} |
| | | for i in range(0, len(origin_datas)): |
| | | item = origin_datas[i] |
| | | # 解析数据 |
| | | time = item["time"] |
| | | if time in same_time_num: |
| | | same_time_num[time] = same_time_num[time] + 1 |
| | | else: |
| | | same_time_num[time] = 1 |
| | | |
| | | price = item["price"] |
| | | num = item["num"] |
| | | limitPrice = item["limitPrice"] |
| | | # 涨停价 |
| | | if limit_up_price is not None: |
| | | if abs(price - limit_up_price) < 0.001: |
| | | limitPrice = 1 |
| | | else: |
| | | limitPrice = 0 |
| | | item["limitPrice"] = "{}".format(limitPrice) |
| | | operateType = item["operateType"] |
| | | # 不需要非涨停买与买撤 |
| | | if filter_not_limit_up and int(item["limitPrice"]) != 1 and ( |
| | | int(operateType) == 0 or int(operateType) == 1) and num != 1: |
| | | continue |
| | | key = "{}-{}-{}".format(code, item["mainSeq"], item["subSeq"]) |
| | | if key in dataIndexs: |
| | | # 数据重复次数+1 |
| | | datas[dataIndexs[key]]['re'] = datas[dataIndexs[key]]['re'] + 1 |
| | | else: |
| | | # 数据重复次数默认为1 |
| | | datas.append({"key": key, "val": item, "re": 1}) |
| | | dataIndexs.setdefault(key, len(datas) - 1) |
| | | return datas |
| | | |
| | | |
| | | def get_format_l2_datas(code, origin_datas, limit_up_price, start_index): |
| | | # 先转变数据格式 |
| | | datas = [__convert_order(x, float(limit_up_price)) for x in origin_datas] |
| | | # 在9:25之前不过滤非涨停金额 |
| | | filter_not_limit_up = True |
| | | if int(datas[0]["time"][:5].replace(":", "")) <= 925: |
| | | filter_not_limit_up = False |
| | | fdatas = __format_l2_data(datas, code, float(limit_up_price), filter_not_limit_up=filter_not_limit_up) |
| | | for i in range(0, len(fdatas)): |
| | | fdatas[i]["index"] = start_index + i |
| | | return fdatas |
| | | |
| | | |
| | | if __name__ == "__main__": |
| | | ds = ["('605167', 10.08, 68500, '1', '0', 9303108, 2, 439438, 436472, 'D', 1695864632451)", |
| | | "('603439', 17.97, 27800, '1', '0', 9304966, 6, 435127, 407524, 'D', 1695864649883)", |
| | | "('002369', 0.0, 100800, '1', '2', 93051880, 2011, 1431910, 1160638, 'D', 1695864651875)" |
| | | ] |
| | | for d in ds: |
| | | d = eval(d) |
| | | print(__convert_order(d, 15.55)) |
New file |
| | |
| | | """ |
| | | L2数据溯源 |
| | | """ |
| | | import constant |
| | | from log_module.log import logger_l2_error |
| | | from utils import tool |
| | | |
| | | |
| | | class L2DataSourceUtils(object): |
| | | __cancel_and_buy_map = {} |
| | | __buy_and_cancel_map = {} |
| | | |
| | | @classmethod |
| | | def __save_map(cls, code, buy_index, cancel_index): |
| | | if cls.__cancel_and_buy_map.get(code) is None: |
| | | cls.__cancel_and_buy_map[code] = {} |
| | | cls.__cancel_and_buy_map[code][cancel_index] = buy_index |
| | | |
| | | if cls.__buy_and_cancel_map.get(code) is None: |
| | | cls.__buy_and_cancel_map[code] = {} |
| | | cls.__buy_and_cancel_map[code][buy_index] = cancel_index |
| | | |
| | | @classmethod |
| | | def __get_cancel_index_cache(cls, code, buy_index): |
| | | if code not in cls.__buy_and_cancel_map: |
| | | return None |
| | | return cls.__buy_and_cancel_map[code].get(buy_index) |
| | | |
| | | @classmethod |
| | | def __get_buy_index_cache(cls, code, cancel_index): |
| | | if code not in cls.__cancel_and_buy_map: |
| | | return None |
| | | return cls.__cancel_and_buy_map[code].get(cancel_index) |
| | | |
| | | @classmethod |
| | | def __compare_time(cls, time1, time2): |
| | | result = int(time1.replace(":", "", 2)) - int(time2.replace(":", "", 2)) |
| | | return result |
| | | |
| | | # 计算时间的区间 |
| | | @classmethod |
| | | def __compute_time_space_as_second(cls, cancel_time, cancel_time_unit): |
| | | __time = int(cancel_time) |
| | | if int(cancel_time) == 0: |
| | | return 0, 0 |
| | | unit = int(cancel_time_unit) |
| | | if unit == 0: |
| | | # 秒 |
| | | return __time, (__time + 1) |
| | | elif unit == 1: |
| | | # 分钟 |
| | | return __time * 60, (__time + 1) * 60 |
| | | elif unit == 2: |
| | | # 小时 |
| | | return __time * 3600, (__time + 1) * 3600 |
| | | |
| | | # 华鑫渠道的L2,根据买撤数据查找买入数据 |
| | | @classmethod |
| | | def __get_buy_index_with_cancel_data_by_huaxin_l2(cls, code, cancel_data, local_today_num_operate_map): |
| | | buy_datas = local_today_num_operate_map.get( |
| | | "{}-{}-{}".format(cancel_data["val"]["num"], "0", cancel_data["val"]["price"])) |
| | | if buy_datas is None: |
| | | # 无数据 |
| | | return None |
| | | # orderNo |
| | | for bd in buy_datas: |
| | | # 根据订单号做匹配 |
| | | if bd["val"]["orderNo"] == cancel_data["val"]["orderNo"]: |
| | | return bd["index"] |
| | | return None |
| | | |
| | | # 同花顺渠道的L2,根据买撤数据查找买入数据 |
| | | @classmethod |
| | | def __get_buy_index_with_cancel_data_by_ths_l2(cls, code, cancel_data, local_today_num_operate_map): |
| | | buy_datas = local_today_num_operate_map.get( |
| | | "{}-{}-{}".format(cancel_data["val"]["num"], "0", cancel_data["val"]["price"])) |
| | | if buy_datas is None: |
| | | # 无数据 |
| | | return None |
| | | min_space, max_space = cls.__compute_time_space_as_second(cancel_data["val"]["cancelTime"], |
| | | cancel_data["val"]["cancelTimeUnit"]) |
| | | max_time = tool.trade_time_add_second(cancel_data["val"]["time"], 0 - min_space) |
| | | min_time = tool.trade_time_add_second(cancel_data["val"]["time"], 0 - max_space) |
| | | # 匹配到的index |
| | | suit_indexes = [] |
| | | for i in range(0, len(buy_datas)): |
| | | data = buy_datas[i] |
| | | |
| | | if int(data["val"]["operateType"]) != 0: |
| | | continue |
| | | if int(data["val"]["num"]) != int(cancel_data["val"]["num"]): |
| | | continue |
| | | # 如果能找到对应的买撤就需要返回 |
| | | cancel_index = cls.__get_cancel_index_cache(code, data["index"]) |
| | | if cancel_index is not None and cancel_index != cancel_data["index"]: |
| | | continue |
| | | |
| | | if min_space == 0 and max_space == 0: |
| | | if cls.__compare_time(data["val"]["time"], min_time) == 0: |
| | | suit_indexes.append(data["index"]) |
| | | elif cls.__compare_time(data["val"]["time"], min_time) > 0 and cls.__compare_time(data["val"]["time"], |
| | | max_time) <= 0: |
| | | suit_indexes.append(data["index"]) |
| | | if len(suit_indexes) >= 2: |
| | | # 多个匹配项,优先溯源离取消位置最近的数据 |
| | | suit_indexes.sort(key=lambda t: cancel_data["index"] - t) |
| | | |
| | | if len(suit_indexes) >= 1: |
| | | cls.__save_map(code, suit_indexes[0], cancel_data["index"]) |
| | | return suit_indexes[0] |
| | | return None |
| | | |
| | | @classmethod |
| | | def __get_buy_index_with_cancel_data(cls, code, cancel_data, local_today_num_operate_map): |
| | | buy_index = cls.__get_buy_index_cache(code, cancel_data["index"]) |
| | | if buy_index is not None: |
| | | return buy_index |
| | | if constant.L2_SOURCE_TYPE == constant.L2_SOURCE_TYPE_HUAXIN: |
| | | return cls.__get_buy_index_with_cancel_data_by_huaxin_l2(code, cancel_data, local_today_num_operate_map) |
| | | else: |
| | | return cls.__get_buy_index_with_cancel_data_by_ths_l2(code, cancel_data, local_today_num_operate_map) |
| | | |
| | | # 根据买撤数据(与今日总的数据)计算买入数据 |
| | | @classmethod |
| | | def get_buy_index_with_cancel_data(cls, code, cancel_data, local_today_num_operate_map): |
| | | key = "{}-{}-{}".format(cancel_data["val"]["num"], "1", cancel_data["val"]["price"]) |
| | | cancel_datas = local_today_num_operate_map.get(key) |
| | | if cancel_datas: |
| | | try: |
| | | cancel_datas.sort(key=lambda t: t["index"]) |
| | | except Exception as e: |
| | | print("测试") |
| | | for item in cancel_datas: |
| | | # 提前做计算 |
| | | cls.__get_buy_index_with_cancel_data(code, item, local_today_num_operate_map) |
| | | |
| | | return cls.__get_buy_index_with_cancel_data(code, cancel_data, local_today_num_operate_map) |
| | | |
| | | # 根据买撤数据计算买入数据(华鑫原生L2数据适用) |
| | | @classmethod |
| | | def get_buy_index_with_cancel_data_v2(cls, cancel_data, buyno_map): |
| | | order_no = str(cancel_data["val"]["orderNo"]) |
| | | buy_data = buyno_map.get(order_no) |
| | | if buy_data: |
| | | return buy_data["index"] |
| | | return None |
| | | |
| | | # 获取没撤的笔数 |
| | | @classmethod |
| | | def get_limit_up_buy_no_canceled_count(cls, code, index, total_data, local_today_num_operate_map): |
| | | data = None |
| | | try: |
| | | data = total_data[index] |
| | | except Exception as e: |
| | | logger_l2_error.error(f"未找到买入索引对应的数据:index-{index} total_data长度-{len(total_data) if total_data else 0} 错误原因:{str(e)}") |
| | | val = data["val"] |
| | | # 判断当前买是否已经买撤 |
| | | cancel_datas = local_today_num_operate_map.get( |
| | | "{}-{}-{}".format(val["num"], "1", val["price"])) |
| | | canceled = False |
| | | if cancel_datas: |
| | | for cancel_data in cancel_datas: |
| | | buy_index = cls.get_buy_index_with_cancel_data(code, cancel_data, local_today_num_operate_map) |
| | | if buy_index == index: |
| | | canceled = True |
| | | count = data["re"] - cancel_data["re"] |
| | | if count > 0: |
| | | return count |
| | | break |
| | | if not canceled: |
| | | count = data["re"] |
| | | return count |
| | | return 0 |
| | | |
| | | # 获取没撤的笔数 |
| | | |
| | | # 获取涨停买没有撤单的数量 |
| | | @classmethod |
| | | def get_limit_up_buy_no_canceled_count_v2(cls, code, index, total_data, canceled_buyno_map): |
| | | data = None |
| | | try: |
| | | data = total_data[index] |
| | | except Exception as e: |
| | | logger_l2_error.error( |
| | | f"未找到买入索引对应的数据:index-{index} total_data长度-{len(total_data) if total_data else 0} 错误原因:{str(e)}") |
| | | val = data["val"] |
| | | order_no = str(val["orderNo"]) |
| | | canceled_data = canceled_buyno_map.get(order_no) |
| | | if canceled_data: |
| | | return data["re"] - canceled_data["re"] |
| | | else: |
| | | return data["re"] |
| | | |
| | | @classmethod |
| | | def get_limit_up_buy_canceled_data_v2(cls, code, index, total_data, canceled_buyno_map): |
| | | data = None |
| | | try: |
| | | data = total_data[index] |
| | | except Exception as e: |
| | | logger_l2_error.error( |
| | | f"未找到买入索引对应的数据:index-{index} total_data长度-{len(total_data) if total_data else 0} 错误原因:{str(e)}") |
| | | val = data["val"] |
| | | order_no = str(val["orderNo"]) |
| | | canceled_data = canceled_buyno_map.get(order_no) |
| | | if canceled_data: |
| | | return canceled_data |
| | | else: |
| | | return None |
| | | |
| | | |
| | | # if __name__ == "__main__": |
| | | # code = "000925" |
| | | # l2_data_util.load_l2_data(code) |
| | | # total_datas = l2_data_util.local_today_datas.get(code) |
| | | # local_today_num_operate_map = l2_data_util.local_today_num_operate_map.get(code) |
| | | # cancel_index = 900 |
| | | # index = L2DataSourceUtils.get_buy_index_with_cancel_data(code, total_datas[cancel_index], |
| | | # l2_data_util.local_today_num_operate_map.get(code)) |
| | | # print("溯源位置:", index) |
New file |
| | |
| | | """ |
| | | L2相关数据处理 |
| | | """ |
| | | |
| | | # L2交易队列 |
| | | import datetime |
| | | import decimal |
| | | import json |
| | | import logging |
| | | import time |
| | | |
| | | import numpy |
| | | |
| | | import constant |
| | | from db.redis_manager_delegate import RedisUtils |
| | | from l2 import l2_data_source_util |
| | | from log_module import log, async_log_util, log_export |
| | | from db import redis_manager_delegate as redis_manager |
| | | from utils import tool |
| | | |
| | | __db = 1 |
| | | _redisManager = redis_manager.RedisManager(1) |
| | | # l2数据管理 |
| | | # 本地最新一次上传的数据 |
| | | local_latest_datas = {} |
| | | # 本地今日数据 |
| | | local_today_datas = {} |
| | | # 本地手数+操作那类型组成的临时变量 |
| | | # 用于加快数据处理,用空换时间 |
| | | local_today_num_operate_map = {} |
| | | |
| | | # 买入订单号映射,只有原生的L2数据才有 |
| | | local_today_buyno_map = {} |
| | | |
| | | # 已经撤单的订单号 |
| | | local_today_canceled_buyno_map = {} |
| | | |
| | | |
| | | def load_l2_data(code, load_latest=True, force=False): |
| | | # 加载最近的l2数据 |
| | | if load_latest: |
| | | if local_latest_datas.get(code) is None or force: |
| | | # 获取最近的数据 |
| | | _data = RedisUtils.get(_redisManager.getRedis(), "l2-data-latest-{}".format(code)) |
| | | if _data is not None: |
| | | if code in local_latest_datas: |
| | | local_latest_datas[code] = json.loads(_data) |
| | | else: |
| | | local_latest_datas.setdefault(code, json.loads(_data)) |
| | | # 获取今日的数据 |
| | | |
| | | if local_today_datas.get(code) is None or force: |
| | | datas = log_export.load_l2_from_log() |
| | | datas = datas.get(code) |
| | | if datas is None: |
| | | datas = [] |
| | | local_today_datas[code] = datas |
| | | data_normal = True |
| | | if datas and len(datas) < datas[-1]["index"] + 1: |
| | | data_normal = False |
| | | |
| | | # 从数据库加载 |
| | | # datas = [] |
| | | # keys = redis.keys("l2-{}-*".format(code)) |
| | | # for k in keys: |
| | | # value = redis.get(k) |
| | | # _data = l2_data_util.l2_data_key_2_obj(k, value) |
| | | # datas.append(_data) |
| | | # # 排序 |
| | | # new_datas = sorted(datas, |
| | | # key=lambda e: (int(e.__getitem__('val')["time"].replace(":", "")), e.__getitem__('index'))) |
| | | # local_today_datas[code] = new_datas |
| | | # 根据今日数据加载 |
| | | load_num_operate_map(local_today_num_operate_map, code, local_today_datas.get(code), force) |
| | | load_buy_no_map(local_today_buyno_map, code, local_today_datas.get(code), force) |
| | | load_canceled_buy_no_map(local_today_canceled_buyno_map, code, local_today_datas.get(code), force) |
| | | return data_normal |
| | | return True |
| | | |
| | | |
| | | # 加载所有的l2数据 |
| | | def load_l2_data_all(force=False): |
| | | datas = log_export.load_l2_from_log() |
| | | for code in datas: |
| | | if force: |
| | | local_today_datas[code] = datas[code] |
| | | else: |
| | | if code not in local_today_datas: |
| | | local_today_datas[code] = datas[code] |
| | | load_num_operate_map(local_today_num_operate_map, code, local_today_datas.get(code), force) |
| | | load_buy_no_map(local_today_buyno_map, code, local_today_datas.get(code), force) |
| | | load_canceled_buy_no_map(local_today_canceled_buyno_map, code, local_today_datas.get(code), force) |
| | | |
| | | |
| | | # 将数据根据num-operate分类 |
| | | def load_num_operate_map(local_today_num_operate_map, code, source_datas, clear=False): |
| | | if local_today_num_operate_map.get(code) is None: |
| | | local_today_num_operate_map[code] = {} |
| | | if clear: |
| | | local_today_num_operate_map[code] = {} |
| | | |
| | | for data in source_datas: |
| | | key = "{}-{}-{}".format(data["val"]["num"], data["val"]["operateType"], data["val"]["price"]) |
| | | if local_today_num_operate_map[code].get(key) is None: |
| | | local_today_num_operate_map[code].setdefault(key, []) |
| | | local_today_num_operate_map[code].get(key).append(data) |
| | | |
| | | |
| | | # 将数据根据orderNo分类,原生数据才有 |
| | | def load_buy_no_map(local_today_buyno_map, code, source_datas, clear=False): |
| | | # 只有原生L2数据才会有此操作 |
| | | if local_today_buyno_map.get(code) is None: |
| | | local_today_buyno_map[code] = {} |
| | | if clear: |
| | | local_today_buyno_map[code] = {} |
| | | |
| | | for data in source_datas: |
| | | if data["val"]["operateType"] != 0: |
| | | continue |
| | | # 只填充买入数据 |
| | | key = "{}".format(data["val"]["orderNo"]) |
| | | if local_today_buyno_map[code].get(key) is None: |
| | | local_today_buyno_map[code].setdefault(key, data) |
| | | |
| | | |
| | | # 将数据根据orderNo分类已撤订单,原生数据才有 |
| | | def load_canceled_buy_no_map(local_today_canceled_buyno_map, code, source_datas, clear=False): |
| | | # 只有原生L2数据才会有此操作 |
| | | if local_today_canceled_buyno_map.get(code) is None: |
| | | local_today_canceled_buyno_map[code] = {} |
| | | if clear: |
| | | local_today_canceled_buyno_map[code] = {} |
| | | |
| | | for data in source_datas: |
| | | # 只留下买撤 |
| | | if data["val"]["operateType"] != 1: |
| | | continue |
| | | # 只填充买入数据 |
| | | key = "{}".format(data["val"]["orderNo"]) |
| | | if local_today_canceled_buyno_map[code].get(key) is None: |
| | | local_today_canceled_buyno_map[code].setdefault(key, data) |
| | | |
| | | |
| | | # 保存l2数据 |
| | | def save_l2_data(code, datas, add_datas): |
| | | # 只有有新曾数据才需要保存 |
| | | if add_datas: |
| | | # 保存最近的数据 |
| | | __start_time = round(time.time() * 1000) |
| | | if datas: |
| | | RedisUtils.setex_async(__db, "l2-data-latest-{}".format(code), tool.get_expire(), |
| | | json.dumps(datas)) |
| | | # l2_data_log.l2_time(code, round(time.time() * 1000) - __start_time, "保存最近l2数据用时") |
| | | # 设置进内存 |
| | | local_latest_datas[code] = datas |
| | | set_l2_data_latest_count(code, len(datas)) |
| | | try: |
| | | async_log_util.l2_data_log.info(log.logger_l2_data, f"{code}-{add_datas}") |
| | | except Exception as e: |
| | | logging.exception(e) |
| | | |
| | | |
| | | # 设置最新的l2数据采集的数量 |
| | | def set_l2_data_latest_count(code, count): |
| | | key = "latest-l2-count-{}".format(code) |
| | | RedisUtils.setex(_redisManager.getRedis(), key, 2, count) |
| | | pass |
| | | |
| | | |
| | | # 获取代码最近的l2数据数量 |
| | | def get_l2_data_latest_count(code): |
| | | if code is None or len(code) < 1: |
| | | return 0 |
| | | key = "latest-l2-count-{}".format(code) |
| | | |
| | | result = RedisUtils.get(_redisManager.getRedis(), key) |
| | | if result is None: |
| | | return 0 |
| | | else: |
| | | return int(result) |
| | | |
| | | |
| | | def parseL2Data(str): |
| | | day = datetime.datetime.now().strftime("%Y%m%d") |
| | | dict = json.loads(str) |
| | | data = dict["data"] |
| | | client = dict["client"] |
| | | code = data["code"] |
| | | channel = data["channel"] |
| | | capture_time = data["captureTime"] |
| | | process_time = data["processTime"] |
| | | count = data["count"] |
| | | data = data["data"] |
| | | # 获取涨停价 |
| | | return day, client, channel, code, capture_time, process_time, data, count |
| | | |
| | | |
| | | # 元数据是否有差异 |
| | | def is_origin_data_diffrent(data1, data2): |
| | | if data1 is None or data2 is None: |
| | | return True |
| | | if len(data1) != len(data2): |
| | | return True |
| | | # 比较 |
| | | data_length = len(data1) |
| | | step = len(data1) // 10 |
| | | for i in range(0, data_length, step): |
| | | if json.dumps(data1[i]) != json.dumps(data2[i]): |
| | | return True |
| | | return False |
| | | |
| | | |
| | | # 是否为大单 |
| | | def is_big_money(val): |
| | | price = float(val["price"]) |
| | | money = price * int(val["num"]) |
| | | if price > 3.0: |
| | | if money >= 29900: |
| | | return True |
| | | else: |
| | | return False |
| | | else: |
| | | max_money = price * 10000 |
| | | if money >= max_money * 0.95: |
| | | return True |
| | | else: |
| | | return False |
| | | |
| | | |
| | | class L2DataUtil: |
| | | @classmethod |
| | | def is_same_time(cls, time1, time2): |
| | | if constant.TEST: |
| | | return True |
| | | time1_s = time1.split(":") |
| | | time1_second = int(time1_s[0]) * 3600 + int(time1_s[1]) * 60 + int(time1_s[2]) |
| | | time2_s = time2.split(":") |
| | | time2_second = int(time2_s[0]) * 3600 + int(time2_s[1]) * 60 + int(time2_s[2]) |
| | | if abs(time2_second - time1_second) < 3: |
| | | return True |
| | | else: |
| | | return False |
| | | |
| | | # 获取增量数据 |
| | | @classmethod |
| | | def get_add_data(cls, code, latest_datas, datas, _start_index): |
| | | if datas is not None and len(datas) < 1: |
| | | return [] |
| | | last_data = None |
| | | latest_datas_ = latest_datas |
| | | if latest_datas_ is not None and len(latest_datas_) > 0: |
| | | last_data = latest_datas_[-1] |
| | | |
| | | count = 0 |
| | | start_index = -1 |
| | | # 如果原来没有数据 |
| | | # 设置add_data的序号 |
| | | for n in reversed(datas): |
| | | count += 1 |
| | | if n["key"] == (last_data["key"] if last_data is not None else ""): |
| | | start_index = len(datas) - count |
| | | break |
| | | |
| | | _add_datas = [] |
| | | if last_data is not None: |
| | | if start_index < 0: |
| | | if L2DataUtil.get_time_as_second(datas[0]["val"]["time"]) >= L2DataUtil.get_time_as_second( |
| | | last_data["val"]["time"]): |
| | | _add_datas = datas |
| | | else: |
| | | _add_datas = [] |
| | | elif start_index + 1 >= len(datas): |
| | | _add_datas = [] |
| | | else: |
| | | _add_datas = datas[start_index + 1:] |
| | | else: |
| | | _add_datas = datas[start_index + 1:] |
| | | for i in range(0, len(_add_datas)): |
| | | _add_datas[i]["index"] = _start_index + i |
| | | |
| | | return _add_datas |
| | | |
| | | # 纠正数据,将re字段替换为较大值 |
| | | @classmethod |
| | | def correct_data(cls, code, latest_datas, _datas): |
| | | latest_data = latest_datas |
| | | if latest_data is None: |
| | | latest_data = [] |
| | | save_list = [] |
| | | for data in _datas: |
| | | for _ldata in latest_data: |
| | | # 新数据条数比旧数据多才保存 |
| | | if _ldata["key"] == data["key"] and _ldata["re"] < data["re"]: |
| | | max_re = max(_ldata["re"], data["re"]) |
| | | _ldata["re"] = max_re |
| | | data["re"] = max_re |
| | | # 保存到数据库,更新re的数据 |
| | | save_list.append(_ldata) |
| | | if len(save_list) > 0: |
| | | # 暂时不将数据保存到redis |
| | | # saveL2Data(code, save_list, "保存纠正数据") |
| | | local_latest_datas[code] = latest_data |
| | | return _datas |
| | | |
| | | # 处理l2数据 |
| | | @classmethod |
| | | def format_l2_data(cls, data, code, limit_up_price): |
| | | datas = [] |
| | | dataIndexs = {} |
| | | same_time_num = {} |
| | | for item in data: |
| | | # 解析数据 |
| | | time = item["time"] |
| | | if time in same_time_num: |
| | | same_time_num[time] = same_time_num[time] + 1 |
| | | else: |
| | | same_time_num[time] = 1 |
| | | |
| | | price = float(item["price"]) |
| | | num = item["num"] |
| | | limitPrice = item["limitPrice"] |
| | | # 涨停价 |
| | | if limit_up_price is not None: |
| | | if limit_up_price == tool.to_price(decimal.Decimal(price)): |
| | | limitPrice = 1 |
| | | else: |
| | | limitPrice = 0 |
| | | item["limitPrice"] = "{}".format(limitPrice) |
| | | operateType = item["operateType"] |
| | | # 不需要非涨停买与买撤 |
| | | if int(item["limitPrice"]) != 1 and (int(operateType) == 0 or int(operateType) == 1): |
| | | continue |
| | | |
| | | cancelTime = item["cancelTime"] |
| | | cancelTimeUnit = item["cancelTimeUnit"] |
| | | key = "{}-{}-{}-{}-{}-{}-{}-{}".format(code, operateType, time, num, price, limitPrice, cancelTime, |
| | | cancelTimeUnit) |
| | | if key in dataIndexs: |
| | | # 数据重复次数+1 |
| | | datas[dataIndexs[key]]['re'] = datas[dataIndexs[key]]['re'] + 1 |
| | | else: |
| | | # 数据重复次数默认为1 |
| | | datas.append({"key": key, "val": item, "re": 1}) |
| | | dataIndexs.setdefault(key, len(datas) - 1) |
| | | # 测试的时候开启,方便记录大单数据 |
| | | # l2_data_util.save_big_data(code, same_time_num, data) |
| | | return datas |
| | | |
| | | @classmethod |
| | | def get_time_as_second(cls, time_str): |
| | | ts = time_str.split(":") |
| | | return int(ts[0]) * 3600 + int(ts[1]) * 60 + int(ts[2]) |
| | | |
| | | # @classmethod |
| | | # def get_time_as_str(cls, time_seconds): |
| | | # ts = time_str.split(":") |
| | | # return int(ts[0]) * 3600 + int(ts[1]) * 60 + int(ts[2]) |
| | | |
| | | # 是否是涨停价买 |
| | | @classmethod |
| | | def is_limit_up_price_buy(cls, val): |
| | | if int(val["limitPrice"]) != 1: |
| | | return False |
| | | |
| | | if int(val["operateType"]) != 0: |
| | | return False |
| | | |
| | | price = float(val["price"]) |
| | | num = int(val["num"]) |
| | | # if price * num * 100 < 50 * 10000: |
| | | # return False |
| | | return True |
| | | |
| | | # 是否为涨停卖 |
| | | @classmethod |
| | | def is_limit_up_price_sell(cls, val): |
| | | if int(val["limitPrice"]) != 1: |
| | | return False |
| | | |
| | | if int(val["operateType"]) != 2: |
| | | return False |
| | | return True |
| | | |
| | | # 涨停卖撤 |
| | | @classmethod |
| | | def is_limit_up_price_sell_cancel(cls, val): |
| | | if int(val["limitPrice"]) != 1: |
| | | return False |
| | | |
| | | if int(val["operateType"]) != 3: |
| | | return False |
| | | return True |
| | | |
| | | # 是否涨停买撤 |
| | | @classmethod |
| | | def is_limit_up_price_buy_cancel(cls, val): |
| | | if int(val["limitPrice"]) != 1: |
| | | return False |
| | | |
| | | if int(val["operateType"]) != 1: |
| | | return False |
| | | |
| | | return True |
| | | |
| | | @classmethod |
| | | def is_buy_cancel(cls, val): |
| | | if int(val["operateType"]) != 1: |
| | | return False |
| | | return True |
| | | |
| | | # 是否卖撤 |
| | | @classmethod |
| | | def is_sell_cancel(cls, val): |
| | | if int(val["operateType"]) == 3: |
| | | return True |
| | | return False |
| | | |
| | | # 是否为卖 |
| | | @classmethod |
| | | def is_sell(cls, val): |
| | | if int(val["operateType"]) == 2: |
| | | return True |
| | | return False |
| | | |
| | | # 是否为买 |
| | | @classmethod |
| | | def is_buy(cls, val): |
| | | if int(val["operateType"]) == 0: |
| | | return True |
| | | return False |
| | | |
| | | # l2时间差值 |
| | | @classmethod |
| | | def time_sub_as_ms(cls, val1, val2): |
| | | # 计算时间差值 |
| | | sub_s = tool.trade_time_sub(val1["time"], val2["time"]) |
| | | sub_ms = int(val1["tms"]) - int(val2["tms"]) |
| | | fs = sub_s * 1000 + sub_ms |
| | | return fs |
| | | |
| | | @classmethod |
| | | def get_time_with_ms(cls, val): |
| | | return val["time"] + "." + "{:0>3}".format(int(val["tms"])) |
| | | |
| | | |
| | | class L2TradeQueueUtils(object): |
| | | # 买入数据是否已撤 |
| | | @classmethod |
| | | def __is_cancel(cls, code, data, total_datas, local_today_num_operate_map): |
| | | val = data["val"] |
| | | cancel_datas = local_today_num_operate_map.get( |
| | | "{}-{}-{}".format(val["num"], "1", val["price"])) |
| | | # 是否有买撤数据 |
| | | if cancel_datas: |
| | | for cancel_data in cancel_datas: |
| | | buy_index = l2_data_source_util.L2DataSourceUtils.get_buy_index_with_cancel_data_v2(cancel_data, |
| | | local_today_buyno_map.get( |
| | | code)) |
| | | if buy_index == data["index"]: |
| | | return True |
| | | return False |
| | | |
| | | # 获取成交进度索引 |
| | | @classmethod |
| | | def find_traded_progress_index(cls, code, buy_1_price, total_datas, local_today_num_operate_map, queueList, |
| | | last_index, |
| | | latest_not_limit_up_time=None): |
| | | |
| | | def find_traded_progress_index_simple(queues): |
| | | index_set = set() |
| | | for num in queues: |
| | | buy_datas = local_today_num_operate_map.get( |
| | | "{}-{}-{}".format(num, "0", buy_1_price_format)) |
| | | if buy_datas is not None and len(buy_datas) > 0: |
| | | for data in buy_datas: |
| | | # 在最近一次非涨停买1更新的时间之后才有效 |
| | | if latest_not_limit_up_time is None or tool.trade_time_sub(data["val"]["time"], |
| | | latest_not_limit_up_time) >= 0: |
| | | if data["index"] >= last_index: |
| | | index_set.add(data["index"]) |
| | | index_list = list(index_set) |
| | | index_list.sort() |
| | | num_list = [] |
| | | new_index_list = [] |
| | | for index in index_list: |
| | | for i in range(0, total_datas[index]["re"]): |
| | | num_list.append(total_datas[index]["val"]["num"]) |
| | | new_index_list.append(index) |
| | | index_list_str = ",".join(list(map(str, num_list))) |
| | | queue_list_str = ",".join(list(map(str, queues))) |
| | | find_index = index_list_str.find(queue_list_str) |
| | | if find_index >= 0: |
| | | temp_str = index_list_str[0:find_index] |
| | | if temp_str.endswith(","): |
| | | temp_str = temp_str[:-1] |
| | | if temp_str == "": |
| | | return new_index_list[0], new_index_list[0:len(queues)] |
| | | start_index = len(temp_str.split(",")) |
| | | return new_index_list[start_index], new_index_list[start_index:start_index + len(queues)] |
| | | return None, None |
| | | |
| | | # 3个数据以上的不需要判断最近的一次未涨停时间 |
| | | if len(queueList) >= 3: |
| | | latest_not_limit_up_time = None |
| | | |
| | | # 判断匹配的位置是否可信 |
| | | def is_trust(indexes): |
| | | cha = [] |
| | | for i in range(1, len(indexes)): |
| | | cha.append(indexes[i] - indexes[i - 1] - 1) |
| | | if len(cha) <= 1: |
| | | return True |
| | | # 标准差小于1 |
| | | std_result = numpy.std(cha) |
| | | if std_result < 10: |
| | | # 绝对可信 |
| | | return True |
| | | |
| | | for i in range(0, len(cha)): |
| | | if abs(cha[i]) > 10: |
| | | # 有超过10 的需要判断两个相临数据间的未撤的买入数量 |
| | | buy_count = 0 |
| | | for index in range(indexes[i] + 1, indexes[i + 1] - 1): |
| | | if L2DataUtil.is_limit_up_price_buy(total_datas[index]["val"]): |
| | | if not cls.__is_cancel(code, total_datas[index], total_datas, local_today_num_operate_map): |
| | | buy_count += total_datas[index]["re"] |
| | | # 暂定3个误差范围 |
| | | if buy_count >= 3: |
| | | return False |
| | | return True |
| | | |
| | | if len(queueList) == 0: |
| | | return None |
| | | # last_index不能撤,如果已撤就清零 |
| | | if cls.__is_cancel(code, total_datas[last_index], total_datas, local_today_num_operate_map): |
| | | last_index = 0 |
| | | # 补齐整数位5位 |
| | | buy_1_price_format = f"{buy_1_price}" |
| | | while buy_1_price_format.find(".") < 4: |
| | | buy_1_price_format = "0" + buy_1_price_format |
| | | |
| | | # --------因子查找法(因子的窗口最大为:len(queueList) ,最小为:len(queueList)/2)--------- |
| | | max_win_len = len(queueList) |
| | | min_win_len = len(queueList) // 2 |
| | | if max_win_len == min_win_len: |
| | | min_win_len = max_win_len - 1 |
| | | for win_len in range(max_win_len, min_win_len, -1): |
| | | # 窗口移动 |
| | | for i in range(0, max_win_len - win_len + 1): |
| | | queues = queueList[i:i + win_len] |
| | | f_start_index, f_indexs = find_traded_progress_index_simple(queues) |
| | | if f_start_index and is_trust(f_indexs): |
| | | return f_start_index |
| | | |
| | | raise Exception("尚未找到成交进度") |
| | | |
| | | |
| | | if __name__ == "__main__": |
| | | print(L2DataUtil.get_time_with_ms({"time": "10:00:00", "tms": 490})) |
New file |
| | |
| | | """ |
| | | L2成交数据处理器 |
| | | """ |
| | | import json |
| | | |
| | | from db import redis_manager |
| | | from db.redis_manager_delegate import RedisUtils |
| | | # from l2 import l2_log |
| | | from l2.huaxin import l2_huaxin_util |
| | | |
| | | from log_module import async_log_util |
| | | from log_module.log import hx_logger_l2_transaction_desc, hx_logger_l2_transaction_sell_order |
| | | |
| | | from utils import tool |
| | | |
| | | |
| | | # 成交数据统计 |
| | | class HuaXinBuyOrderManager: |
| | | __db = 0 |
| | | __instance = None |
| | | __redis_manager = redis_manager.RedisManager(0) |
| | | |
| | | # 正在成交的订单 |
| | | __dealing_order_info_dict = {} |
| | | # 最近成交的订单{"code":(订单号,是否成交完成)} |
| | | __latest_deal_order_info_dict = {} |
| | | |
| | | def __new__(cls, *args, **kwargs): |
| | | if not cls.__instance: |
| | | cls.__instance = super(HuaXinBuyOrderManager, cls).__new__(cls, *args, **kwargs) |
| | | cls.__load_datas() |
| | | return cls.__instance |
| | | |
| | | @classmethod |
| | | def __get_redis(cls): |
| | | return cls.__redis_manager.getRedis() |
| | | |
| | | @classmethod |
| | | def __load_datas(cls): |
| | | __redis = cls.__get_redis() |
| | | try: |
| | | keys = RedisUtils.keys(__redis, "dealing_order_info-*") |
| | | for k in keys: |
| | | code = k.split("-")[-1] |
| | | val = RedisUtils.get(__redis, k) |
| | | val = json.loads(val) |
| | | tool.CodeDataCacheUtil.set_cache(cls.__dealing_order_info_dict, code, val) |
| | | finally: |
| | | RedisUtils.realse(__redis) |
| | | |
| | | # 将数据持久化到数据库 |
| | | def sync_dealing_data_to_db(self): |
| | | for code in self.__dealing_order_info_dict: |
| | | RedisUtils.setex(self.__get_redis(), f"dealing_order_info-{code}", tool.get_expire(), |
| | | json.dumps(self.__dealing_order_info_dict[code])) |
| | | |
| | | # 获取代码正在成交的信息 |
| | | # 返回数据:[订单号,总股数,开始成交时间,结束成交时间, 总买] |
| | | @classmethod |
| | | def get_dealing_order_info(cls, code): |
| | | return cls.__dealing_order_info_dict.get(code) |
| | | |
| | | # 统计成交的情况 |
| | | @classmethod |
| | | def statistic_deal_desc(cls, code, data, total_buy_num): |
| | | if code not in cls.__dealing_order_info_dict: |
| | | # 数据格式[订单号,总股数,开始成交时间,结束成交时间, 总买] |
| | | cls.__dealing_order_info_dict[code] = [data[6], 0, data[3], data[3], total_buy_num] |
| | | if cls.__dealing_order_info_dict[code][0] == data[6]: |
| | | # 成交同一个订单号 |
| | | cls.__dealing_order_info_dict[code][1] += data[2] |
| | | cls.__dealing_order_info_dict[code][3] = data[3] |
| | | else: |
| | | # 保存上一条数据 |
| | | async_log_util.info(hx_logger_l2_transaction_desc, f"{code}#{cls.__dealing_order_info_dict[code]}") |
| | | # 设置最近成交完成的一条数据 |
| | | deal_info = ( |
| | | cls.__dealing_order_info_dict[code][0], |
| | | cls.__dealing_order_info_dict[code][4] == cls.__dealing_order_info_dict[code][1]) |
| | | cls.__latest_deal_order_info_dict[code] = deal_info |
| | | # 初始化本条数据 |
| | | cls.__dealing_order_info_dict[code] = [data[6], data[2], data[3], data[3], total_buy_num] |
| | | return deal_info |
| | | return None |
| | | |
| | | |
| | | # 卖单统计数据 |
| | | class HuaXinSellOrderStatisticManager: |
| | | # 最近的大卖单成交,格式:{code:[卖单信息,...]} |
| | | __latest_sell_order_info_list_dict = {} |
| | | |
| | | # 大单卖单号的集合,格式:{code:{卖单号}} |
| | | __big_sell_order_ids_dict = {} |
| | | |
| | | # 大卖单的卖单号->卖单信息映射 |
| | | __big_sell_order_info_dict = {} |
| | | |
| | | # 大单列表 |
| | | __big_sell_order_info_list_dict = {} |
| | | |
| | | # 最近的卖单, 格式{code:[卖单号,总手数,价格,('开始时间',买单号),('结束时间',买单号)]} |
| | | __latest_sell_order_dict = {} |
| | | |
| | | # 返回最近1s的大单卖:(总卖金额,[(卖单号,总手数,价格,('开始时间',买单号),('结束时间',买单号)),...]) |
| | | @classmethod |
| | | def add_transaction_datas(cls, code, datas): |
| | | # q.append((data['SecurityID'], data['TradePrice'], data['TradeVolume'], |
| | | # data['OrderTime'], data['MainSeq'], data['SubSeq'], data['BuyNo'], |
| | | # data['SellNo'], data['ExecType'])) |
| | | if code not in cls.__latest_sell_order_info_list_dict: |
| | | cls.__latest_sell_order_info_list_dict[code] = [] |
| | | if code not in cls.__big_sell_order_ids_dict: |
| | | cls.__big_sell_order_ids_dict[code] = set() |
| | | if code not in cls.__big_sell_order_info_dict: |
| | | cls.__big_sell_order_info_dict[code] = {} |
| | | |
| | | if code not in cls.__big_sell_order_info_list_dict: |
| | | cls.__big_sell_order_info_list_dict[code] = [] |
| | | |
| | | for d in datas: |
| | | cls.__latest_sell_order_info_list_dict[code].append(d) |
| | | if code not in cls.__latest_sell_order_dict: |
| | | cls.__latest_sell_order_dict[code] = [d[7], d[2], d[1], (d[3], d[6]), (d[3], d[6])] |
| | | else: |
| | | if cls.__latest_sell_order_dict[code][0] == d[7]: |
| | | cls.__latest_sell_order_dict[code][1] += d[2] |
| | | cls.__latest_sell_order_dict[code][2] = d[1] |
| | | cls.__latest_sell_order_dict[code][4] = (d[3], d[6]) |
| | | else: |
| | | info = cls.__latest_sell_order_dict[code] |
| | | # 上个卖单成交完成 |
| | | # 封存数据,计算新起点 |
| | | # 大于50w的卖单才会保存 |
| | | # 大于50w加入卖单 |
| | | if info[1] * info[2] >= 500000: |
| | | async_log_util.info(hx_logger_l2_transaction_sell_order, f"{code}#{cls.__latest_sell_order_dict[code]}") |
| | | cls.__big_sell_order_ids_dict[code].add(info[0]) |
| | | cls.__big_sell_order_info_dict[code][info[0]] = info |
| | | cls.__big_sell_order_info_list_dict[code].append(info) |
| | | |
| | | cls.__latest_sell_order_dict[code] = [d[7], d[2], d[1], (d[3], d[6]), (d[3], d[6])] |
| | | latest_time = l2_huaxin_util.convert_time(datas[-1][3], with_ms=True) |
| | | min_time = tool.trade_time_add_millionsecond(latest_time, -1000) |
| | | min_time_int = int(min_time.replace(":", "").replace(".", "")) |
| | | # 计算最近1s的大单成交 |
| | | total_big_sell_datas = cls.__big_sell_order_info_list_dict.get(code) |
| | | start_index = 0 |
| | | |
| | | total_sell_info = [0, None] # 总资金,开始成交信息,结束成交信息 |
| | | latest_sell_order_info = cls.__latest_sell_order_dict[code] |
| | | big_sell_order_ids = cls.__big_sell_order_ids_dict[code] |
| | | # print("大卖单", big_sell_order_ids) |
| | | big_sell_orders = [] |
| | | temp_sell_order_ids = set() |
| | | # 统计已经结算出的大单 |
| | | print(f"总大单数量:{len(total_big_sell_datas)}") |
| | | for i in range(len(total_big_sell_datas) - 1, -1, -1): |
| | | bd = total_big_sell_datas[i] |
| | | if bd[0] != latest_sell_order_info[0]: |
| | | # 不是最近的成交且不是大单直接过滤 |
| | | if bd[0] not in big_sell_order_ids: |
| | | continue |
| | | else: |
| | | if bd[0] not in temp_sell_order_ids: |
| | | big_sell_orders.append(cls.__big_sell_order_info_dict[code].get(bd[0])) |
| | | temp_sell_order_ids.add(bd[0]) |
| | | else: |
| | | # 是最近的但不是大单需要过滤 |
| | | if latest_sell_order_info[1] * latest_sell_order_info[2] < 500000: |
| | | continue |
| | | else: |
| | | if latest_sell_order_info[0] not in temp_sell_order_ids: |
| | | big_sell_orders.append(latest_sell_order_info) |
| | | temp_sell_order_ids.add(latest_sell_order_info[0]) |
| | | |
| | | if min_time_int > int( |
| | | l2_huaxin_util.convert_time(bd[3][0], with_ms=True).replace(":", "").replace(".", "")): |
| | | start_index = i |
| | | break |
| | | # 统计最近1s的大卖单数据 |
| | | total_sell_info[0] += int(bd[1] * bd[2]) |
| | | # 统计最近的大单 |
| | | if latest_sell_order_info[1] * latest_sell_order_info[2] >= 500000: |
| | | if latest_sell_order_info[0] not in temp_sell_order_ids: |
| | | big_sell_orders.append(latest_sell_order_info) |
| | | temp_sell_order_ids.add(latest_sell_order_info[0]) |
| | | |
| | | big_sell_orders.reverse() |
| | | total_sell_info[1] = big_sell_orders |
| | | cls.__latest_sell_order_info_list_dict[code] = cls.__latest_sell_order_info_list_dict[code][start_index:] |
| | | return total_sell_info |
New file |
| | |
| | | import datetime |
| | | import hashlib |
| | | import logging |
| | | import os |
| | | import time |
| | | |
| | | import constant |
| | | from utils import tool |
| | | |
| | | |
| | | class LogUtil: |
| | | @classmethod |
| | | def extract_log_from_key(cls, key, path, target_path): |
| | | fw = open(target_path, mode='w', encoding="utf-8") |
| | | try: |
| | | with open(path, 'r', encoding="utf-8") as f: |
| | | lines = f.readlines() |
| | | for line in lines: |
| | | if line.find("{}".format(key)) > 0: |
| | | fw.write(line) |
| | | finally: |
| | | fw.close() |
| | | |
| | | |
| | | # 导出数据处理位置日志 |
| | | def __export_l2_pos_range(code, date, dir): |
| | | LogUtil.extract_log_from_key("{} 处理数据范围".format(code), |
| | | "{}/sell_logs/gp/l2/l2_process.{}.log".format(constant.get_path_prefix(), date), |
| | | "{}/l2_process_{}.log".format(dir, date)) |
| | | |
| | | |
| | | # 导出交易日志 |
| | | def __export_l2_trade_log(code, date, dir): |
| | | LogUtil.extract_log_from_key(code, "{}/sell_logs/gp/l2/l2_trade.{}.log".format(constant.get_path_prefix(), date), |
| | | "{}/l2_trade_{}.log".format(dir, date)) |
| | | |
| | | |
| | | # 导出交易取消日志 |
| | | def __export_l2_trade_cancel_log(code, date, dir): |
| | | LogUtil.extract_log_from_key(code, "{}/sell_logs/gp/l2/l2_trade_cancel.{}.log".format(constant.get_path_prefix(), date), |
| | | "{}/l2_trade_cancel_{}.log".format(dir, date)) |
| | | |
| | | |
| | | def __analyse_pricess_time(): |
| | | date = datetime.datetime.now().strftime("%Y-%m-%d") |
| | | file_path = f"{constant.get_path_prefix()}/sell_logs/gp/l2/l2_process.{date}.log" |
| | | with open(file_path, encoding="utf-8") as f: |
| | | line = f.readline() |
| | | while line: |
| | | time_ = line.split(":")[-1] |
| | | if int(time_) > 150: |
| | | print(line) |
| | | line = f.readline() |
| | | |
| | | |
| | | def export_l2_log(code): |
| | | if len(code) < 6: |
| | | return |
| | | date = datetime.datetime.now().strftime("%Y-%m-%d") |
| | | dir_ = "{}/sell_logs/gp/l2/{}".format(constant.get_path_prefix(), code) |
| | | if not os.path.exists(dir_): |
| | | os.mkdir(dir_) |
| | | __export_l2_pos_range(code, date, dir_) |
| | | __export_l2_trade_cancel_log(code, date, dir_) |
| | | __export_l2_trade_log(code, date, dir_) |
| | | |
| | | |
| | | def compute_buy1_real_time(time_): |
| | | ts = time_.split(":") |
| | | s = int(ts[0]) * 3600 + int(ts[1]) * 60 + int(ts[2]) |
| | | cha = (s - 2) % 3 |
| | | return tool.time_seconds_format(s - 2 - cha) |
| | | |
| | | |
| | | def load_l2_from_log(date=None): |
| | | today_data = {} |
| | | if date is None: |
| | | date = tool.get_now_date_str() |
| | | try: |
| | | with open("{}/sell_logs/gp/l2/l2_data.{}.log".format(constant.get_path_prefix(), date), mode='r') as f: |
| | | while True: |
| | | data = f.readline() |
| | | if not data: |
| | | break |
| | | index = data.find(' - ') + 2 |
| | | if data.find('async_log_util') > 0: |
| | | index = data.find(']', index) + 1 |
| | | data = data[index + 1:].strip() |
| | | code = data[0:6] |
| | | data = data[7:] |
| | | dict_ = eval(data) |
| | | if code not in today_data: |
| | | today_data[code] = dict_ |
| | | else: |
| | | today_data[code].extend(dict_) |
| | | for key in today_data: |
| | | # news = sorted(today_data[key], key=lambda x: x["index"]) |
| | | # today_data[key] = news |
| | | print(key, len(today_data[key]) - 1, today_data[key][-1]["index"]) |
| | | except: |
| | | pass |
| | | return today_data |
| | | |
| | | |
| | | # 获取日志时间 |
| | | def __get_log_time(line): |
| | | time_ = line.split("|")[0].split(" ")[1].split(".")[0] |
| | | return time_ |
| | | |
| | | |
| | | def __get_async_log_time(line): |
| | | line = line.split(" - ")[1] |
| | | time_str = line[line.find("[") + 1:line.find("[") + 9] |
| | | return time_str |
| | | |
| | | |
| | | # 获取L2每次批量处理数据的位置范围 |
| | | def get_l2_process_position(code, date=None): |
| | | if not date: |
| | | date = datetime.datetime.now().strftime("%Y-%m-%d") |
| | | pos_list = [] |
| | | with open("{}/sell_logs/gp/l2/l2_process.{}.log".format(constant.get_path_prefix(), date), mode='r', |
| | | encoding="utf-8") as f: |
| | | while True: |
| | | line = f.readline() |
| | | if not line: |
| | | break |
| | | if line.find("code:{}".format(code)) < 0: |
| | | continue |
| | | time_ = __get_log_time(line) |
| | | line = line[line.find("处理数据范围") + len("处理数据范围") + 1:line.find("处理时间")].strip() |
| | | if len(pos_list) == 0 or pos_list[-1][1] < int(line.split("-")[0]): |
| | | if int("093000") <= int(time_.replace(":", "")) <= int("150000"): |
| | | try: |
| | | pos_list.append((int(line.split("-")[0]), int(line.split("-")[1]))) |
| | | except Exception as e: |
| | | logging.exception(e) |
| | | return pos_list |
| | | |
| | | |
| | | # 获取L2每次批量处理数据的位置范围 |
| | | def get_l2_trade_position(code, date=None): |
| | | if not date: |
| | | date = datetime.datetime.now().strftime("%Y-%m-%d") |
| | | pos_list = [] |
| | | with open("{}/sell_logs/gp/l2/l2_trade.{}.log".format(constant.get_path_prefix(), date), mode='r', |
| | | encoding="utf-8") as f: |
| | | while True: |
| | | line = f.readline() |
| | | if not line: |
| | | break |
| | | if line.find("code={}".format(code)) < 0: |
| | | continue |
| | | # print(line) |
| | | time_ = __get_log_time(line) |
| | | if int("093000") > int(time_.replace(":", "")) or int(time_.replace(":", "")) > int("150000"): |
| | | continue |
| | | |
| | | if line.find("获取到买入信号起始点") > 0: |
| | | str_ = line.split("获取到买入信号起始点:")[1].strip() |
| | | index = str_[0:str_.find(" ")].strip() |
| | | # print("信号起始位置:", index) |
| | | pos_list.append((0, int(index), "")) |
| | | |
| | | elif line.find("获取到买入执行位置") > 0: |
| | | str_ = line.split("获取到买入执行位置:")[1].strip() |
| | | index = str_[0:str_.find(" ")].strip() |
| | | # print("买入执行位置:", index) |
| | | pos_list.append((1, int(index), "")) |
| | | elif line.find("触发撤单,撤单位置:") > 0: |
| | | str_ = line.split("触发撤单,撤单位置:")[1].strip() |
| | | index = str_[0:str_.find(" ")].strip() |
| | | # print("撤单位置:", index) |
| | | pos_list.append((2, int(index), line.split("撤单原因:")[1])) |
| | | pass |
| | | else: |
| | | continue |
| | | return pos_list |
| | | |
| | | |
| | | |
| | | __log_file_contents = {} |
| | | |
| | | |
| | | # 加载文件内容 |
| | | def __load_file_content(path_str, expire_timespace=20): |
| | | md5 = hashlib.md5(path_str.encode(encoding='utf-8')).hexdigest() |
| | | if md5 in __log_file_contents and time.time() - __log_file_contents[md5][0] < expire_timespace: |
| | | return __log_file_contents[md5][1] |
| | | contents = [] |
| | | if os.path.exists(path_str): |
| | | with open(path_str, 'r', encoding="utf-8") as f: |
| | | lines = f.readlines() |
| | | for line in lines: |
| | | contents.append(line) |
| | | __log_file_contents[md5] = (time.time(), contents) |
| | | return contents |
| | | |
| | | |
| | | |
| | | # 加载l2订单成交数据 |
| | | def load_huaxin_deal_record(code, date=tool.get_now_date_str()): |
| | | path = f"{constant.get_path_prefix()}/sell_logs/huaxin/l2/transaction_desc.{date}.log" |
| | | # 格式:[(订单号,手数,开始成交时间,成交结束时间,下单手数)] |
| | | fdatas = [] |
| | | lines = __load_file_content(path) |
| | | for line in lines: |
| | | data_index = line.find(f"{code}#") |
| | | if data_index > 0: |
| | | line = line.split(" - ")[1] |
| | | time_str = line[line.find("[") + 1:line.find("[") + 9] |
| | | data = line[line.find("]") + 1:].strip() |
| | | code = data.split("#")[0] |
| | | data = data.split("#")[1] |
| | | data = eval(data) |
| | | fdatas.append(data) |
| | | return fdatas |
| | | |
| | | |
| | | |
| | | # 加载华鑫成交的卖单 |
| | | def load_huaxin_transaction_sell_no(code=None, date=tool.get_now_date_str()): |
| | | path = f"{constant.get_path_prefix()}/sell_logs/huaxin/l2/transaction_sell_order.{date}.log" |
| | | fdatas = {} |
| | | if os.path.exists(path): |
| | | with open(path, 'r', encoding="utf-8") as f: |
| | | lines = f.readlines() |
| | | for line in lines: |
| | | if line: |
| | | data = line.split(" - ")[1].strip() |
| | | if data.startswith("["): |
| | | data = data[data.find("]") + 1:].strip() |
| | | data = data.split("code=")[1] |
| | | code_ = data[:6] |
| | | if code and code != code_: |
| | | continue |
| | | data = data[6:].strip() |
| | | if code_ not in fdatas: |
| | | fdatas[code_] = [] |
| | | fdatas[code_].append(eval(data)) |
| | | return fdatas |
| | | |
| | | |
| | | |
| | | |
| | | # 读取系统日志 |
| | | def load_system_log(): |
| | | path = f"{constant.get_path_prefix()}/sell_logs/gp/system/system.{tool.get_now_date_str()}.log" |
| | | fdatas = [] |
| | | if os.path.exists(path): |
| | | with open(path, 'r', encoding="utf-8") as f: |
| | | lines = f.readlines() |
| | | for line in lines: |
| | | if line: |
| | | try: |
| | | time_str = line.split("|")[0].strip() |
| | | level = line.split("|")[1].strip() |
| | | if level != "INFO" and level != "ERROR": |
| | | continue |
| | | data = line.split("|")[2].split(" - ")[1].strip() |
| | | fdatas.append((time_str, level, data)) |
| | | except: |
| | | pass |
| | | return fdatas |
| | | |
| | | |
| | | # 读取系统日志 |
| | | def load_huaxin_transaction_map(date=tool.get_now_date_str()): |
| | | path = f"{constant.get_path_prefix()}/sell_logs/huaxin/l2/transaction.{date}.log" |
| | | fdatas = {} |
| | | if os.path.exists(path): |
| | | with open(path, 'r', encoding="utf-8") as f: |
| | | lines = f.readlines() |
| | | for line in lines: |
| | | if line: |
| | | try: |
| | | data = line.split(" - ")[1].strip() |
| | | if data.startswith("["): |
| | | data = data[data.find("]") + 1:].strip() |
| | | code = data.split("#")[0] |
| | | l2_data = eval(data.split("#")[1]) |
| | | if code not in fdatas: |
| | | fdatas[code] = [] |
| | | fdatas[code].append(l2_data) |
| | | except: |
| | | pass |
| | | return fdatas |
| | | |
| | |
| | | import multiprocessing |
| | | import threading |
| | | |
| | | from huaxin_client import l1_client_for_trade, trade_client |
| | | from huaxin_client import l1_client_for_trade, trade_client, l2_client |
| | | from trade import trade_strategy |
| | | import huaxin_client |
| | | |
| | |
| | | # 策略读交易写 |
| | | queue_strategy_r_trade_w = multiprocessing.Queue() |
| | | |
| | | queue_strategy_w_l2_r = multiprocessing.Queue() |
| | | |
| | | # 启动L1 |
| | | l1TradeProcess = multiprocessing.Process(target=l1_client_for_trade.run, |
| | | args=(queue_l1_trade_w_strategy_r, queue_l1_trade_r_strategy_w,)) |
| | |
| | | target=trade_client.run, |
| | | args=(queue_strategy_r_trade_w, queue_strategy_w_trade_r)) |
| | | tradeProcess.start() |
| | | trade_strategy.run(queue_l1_trade_w_strategy_r, queue_l1_trade_r_strategy_w, queue_strategy_r_trade_w, queue_strategy_w_trade_r) |
| | | |
| | | trade_strategy.init_l2_data_callbacks() |
| | | threading.Thread(target=lambda: l2_client.run(queue_strategy_w_l2_r, trade_strategy.l2_data_callbacks), |
| | | daemon=True).start() |
| | | trade_strategy.run(queue_l1_trade_w_strategy_r, queue_l1_trade_r_strategy_w, queue_strategy_r_trade_w, |
| | | queue_strategy_w_trade_r,queue_strategy_w_l2_r) |
| | |
| | | need_update_codes.append(d["securityID"]) |
| | | if need_update_codes: |
| | | gpcode_manager.request_price_pre(need_update_codes) |
| | | |
| | | queue_l1_trade_r_strategy_w.put_nowait( |
| | | {"type": "set_target_codes", "data": list(position_codes)}) |
| | | queue_strategy_w_l2_r.put_nowait( |
| | | {"type": "l2_cmd", "data": list(position_codes)}) |
| | | # 9点25之前需要订阅持仓票 |
| | | __process_thread_pool.submit(huaxin_trade_record_manager.PositionManager.add, datas) |
| | | async_log_util.info(hx_logger_trade_debug, f"获取交易数据成功:{type_}") |
| | |
| | | |
| | | def test(): |
| | | time.sleep(2) |
| | | position_codes =["000333"] |
| | | position_codes = ["000333"] |
| | | queue_l1_trade_r_strategy_w.put_nowait( |
| | | {"type": "set_target_codes", "data": list(position_codes)}) |
| | | |
| | | |
| | | # 运行 |
| | | def run(queue_l1_trade_r_strategy_w_): |
| | | global queue_l1_trade_r_strategy_w |
| | | def run(queue_l1_trade_r_strategy_w_, queue_strategy_w_l2_r_): |
| | | global queue_l1_trade_r_strategy_w, queue_strategy_w_l2_r |
| | | queue_l1_trade_r_strategy_w = queue_l1_trade_r_strategy_w_ |
| | | queue_strategy_w_l2_r = queue_strategy_w_l2_r_ |
| | | t1 = threading.Thread(target=lambda: __read_update_task_queue(), daemon=True) |
| | | t1.start() |
| | | # threading.Thread(target=lambda: test(), daemon=True).start() |
| | |
| | | |
| | | import schedule |
| | | |
| | | import constant |
| | | from code_atrribute import gpcode_manager |
| | | from code_atrribute.history_k_data_util import HistoryKDatasUtils |
| | | from huaxin_client.client_network import SendResponseSkManager |
| | | from huaxin_client.l2_data_transform_protocol import L2DataCallBack |
| | | from l2 import l2_data_util |
| | | from l2.huaxin import l2_huaxin_util |
| | | from l2.l2_data_util import local_today_datas, local_today_num_operate_map, local_today_buyno_map, \ |
| | | local_today_canceled_buyno_map, L2DataUtil |
| | | from log_module import async_log_util |
| | | from log_module.log import logger_trade, logger_debug, logger_system, logger_local_huaxin_l1_trade_info, \ |
| | | logger_trade_position_api_request |
| | | logger_trade_position_api_request, logger_l2_error, hx_logger_l2_transaction |
| | | from trade import huaxin_trade_data_update, huaxin_sell_util, huaxin_trade_api |
| | | from trade.huaxin_trade_record_manager import PositionManager |
| | | from trade.sell_rule_manager import TradeRuleManager, SellRule |
| | |
| | | codes = L1DataProcessor.get_latest_update_codes() |
| | | result = {"code": 0, "data": list(codes)} |
| | | self.send_response(result, client_id, request_id) |
| | | elif ctype == "get_l2_datas": |
| | | # 获取L2数据 |
| | | code = data["code"] |
| | | max_time = data["max_time"] |
| | | min_money = data["min_money"] |
| | | if not local_today_datas: |
| | | l2_data_util.load_l2_data_all(True) |
| | | total_datas = l2_data_util.local_today_datas.get(code) |
| | | # 只获取买与卖 |
| | | for d in total_datas: |
| | | val = d['val'] |
| | | if tool.trade_time_sub(d['val']['time'], max_time) > 0: |
| | | break |
| | | if val['num'] * float(val['price']) * 100 < min_money: |
| | | continue |
| | | if L2DataUtil.is_buy(val): |
| | | pass |
| | | if L2DataUtil.is_sell(val): |
| | | pass |
| | | |
| | | codes = L1DataProcessor.get_latest_update_codes() |
| | | result = {"code": 0, "data": list(codes)} |
| | | self.send_response(result, client_id, request_id) |
| | | |
| | | |
| | | class L1DataProcessor: |
| | |
| | | datas = data["data"] |
| | | cls.__save_l1_current_price(datas) |
| | | cls.process_for_sell(datas) |
| | | |
| | | |
| | | |
| | | @classmethod |
| | | def process_for_sell(cls, datas): |
| | |
| | | logging.exception(e) |
| | | |
| | | |
| | | class MyL2DataCallback(L2DataCallBack): |
| | | def OnL2Order(self, code, origin_datas, timestamp): |
| | | # 保存L2数据 |
| | | datas = None |
| | | try: |
| | | # 转换数据格式 |
| | | _start_index = 0 |
| | | total_datas = local_today_datas.get(code) |
| | | if code not in local_today_datas: |
| | | local_today_datas[code] = [] |
| | | if total_datas: |
| | | _start_index = total_datas[-1]["index"] + 1 |
| | | datas = l2_huaxin_util.get_format_l2_datas(code, origin_datas, |
| | | gpcode_manager.get_limit_up_price(code), _start_index) |
| | | if len(datas) > 0: |
| | | local_today_datas[code].extend(datas) |
| | | l2_data_util.load_num_operate_map(local_today_num_operate_map, code, datas) |
| | | l2_data_util.load_buy_no_map(local_today_buyno_map, code, datas) |
| | | l2_data_util.load_canceled_buy_no_map(local_today_canceled_buyno_map, code, datas) |
| | | except Exception as e: |
| | | async_log_util.error(logger_l2_error, f"code:{code}") |
| | | logger_l2_error.exception(e) |
| | | finally: |
| | | if datas: |
| | | l2_data_util.save_l2_data(code, None, datas) |
| | | origin_datas.clear() |
| | | |
| | | def OnL2Transaction(self, code, datas): |
| | | async_log_util.info(hx_logger_l2_transaction, f"{code}#{datas}") |
| | | |
| | | |
| | | # 做一些初始化的操作 |
| | | def __init(): |
| | | def run_pending(): |
| | |
| | | threading.Thread(target=request_position, daemon=True).start() |
| | | |
| | | |
| | | def run(queue_l1_trade_w_strategy_r, queue_l1_trade_r_strategy_w, queue_strategy_r_trade_w, queue_strategy_w_trade_r): |
| | | l2_data_callbacks = [] |
| | | |
| | | |
| | | def init_l2_data_callbacks(): |
| | | for i in range(constant.MAX_L2_CHANNEL_COUNT): |
| | | l2_data_callbacks.append(MyL2DataCallback()) |
| | | |
| | | |
| | | def run(queue_l1_trade_w_strategy_r, queue_l1_trade_r_strategy_w, queue_strategy_r_trade_w, queue_strategy_w_trade_r, |
| | | queue_strategy_w_l2_r): |
| | | try: |
| | | # 初始化 |
| | | __init() |
| | |
| | | |
| | | threading.Thread(target=lambda: async_log_util.run_sync(), daemon=True).start() |
| | | |
| | | huaxin_trade_data_update.run(queue_l1_trade_r_strategy_w) |
| | | huaxin_trade_data_update.run(queue_l1_trade_r_strategy_w, queue_strategy_w_l2_r) |
| | | |
| | | huaxin_trade_api.run_pipe_trade(queue_strategy_r_trade_w, queue_strategy_w_trade_r) |
| | | |