| | |
| | | import threading |
| | | import time |
| | | |
| | | from huaxin_client import command_manager |
| | | from huaxin_client import command_manager, l2_data_transaction_protocol |
| | | from huaxin_client import constant |
| | | from huaxin_client import l2_data_manager |
| | | import lev2mdapi |
| | |
| | | for c in del_codes: |
| | | l2_data_manager.target_codes.discard(c) |
| | | for c in add_codes: |
| | | l2_data_manager.run_upload_task(c) |
| | | l2_data_manager.run_upload_task(c, pipe_strategy) |
| | | self.__subscribe(add_codes) |
| | | self.__unsubscribe(del_codes) |
| | | |
| | |
| | | data = json.loads(val) |
| | | if data["data"]["type"] == "l2_cmd": |
| | | l2CommandManager.process_command(command_manager.CLIENT_TYPE_CMD_L2, None, data) |
| | | else: |
| | | l2_data_transaction_protocol.set_write_rece_data(data) |
| | | except Exception as e: |
| | | logging.exception(e) |
| | | |
| | | |
| | | def run(pipe_trade, pipe_strategy): |
| | | pipe_strategy = None |
| | | |
| | | |
| | | def run(pipe_trade, _pipe_strategy): |
| | | logger_system.info("L2进程ID:{}", os.getpid()) |
| | | log.close_print() |
| | | if pipe_trade is not None: |
| | | t1 = threading.Thread(target=lambda: __receive_from_pipe_trade(pipe_trade), daemon=True) |
| | | t1.start() |
| | | if pipe_strategy is not None: |
| | | t1 = threading.Thread(target=__receive_from_pipe_strategy, args=(pipe_strategy,), daemon=True) |
| | | if _pipe_strategy is not None: |
| | | global pipe_strategy |
| | | pipe_strategy = _pipe_strategy |
| | | t1 = threading.Thread(target=__receive_from_pipe_strategy, args=(_pipe_strategy,), daemon=True) |
| | | t1.start() |
| | | __init_l2() |
| | | l2_data_manager.run_upload_common() |
| | | l2_data_manager.run_upload_trading_canceled() |
| | | l2_data_manager.run_log() |
| | | # l2_data_manager.run_test() |
| | | l2_data_manager.run_test() |
| | | global l2CommandManager |
| | | l2CommandManager = command_manager.L2CommandManager() |
| | | l2CommandManager.init(MyL2ActionCallback()) |
| | |
| | | # -*- coding: utf-8 -*- |
| | | import contextlib |
| | | import json |
| | | import logging |
| | | import mmap |
| | | import queue |
| | | import random |
| | | import threading |
| | | import time |
| | | from huaxin_client import socket_util |
| | | from huaxin_client import socket_util, l2_data_transaction_protocol |
| | | |
| | | from huaxin_client.client_network import SendResponseSkManager |
| | | |
| | | # 活动时间 |
| | | from log_module import log_export |
| | | from log_module.log import logger_local_huaxin_l2_error, logger_local_huaxin_l2_upload, logger_local_huaxin_l2_buy_no, \ |
| | | logger_local_huaxin_g_cancel |
| | | logger_local_huaxin_g_cancel, hx_logger_contact_debug |
| | | |
| | | order_detail_upload_active_time_dict = {} |
| | | transaction_upload_active_time_dict = {} |
| | |
| | | |
| | | |
| | | # 循环读取上传数据 |
| | | def __run_upload_order(code): |
| | | def __run_upload_order(code, pipe): |
| | | if code not in tmep_order_detail_queue_dict: |
| | | tmep_order_detail_queue_dict[code] = queue.Queue() |
| | | while True: |
| | | # print("order task") |
| | | try: |
| | | if code not in target_codes: |
| | | break |
| | | order_detail_upload_active_time_dict[code] = time.time() |
| | | udatas = [] |
| | | while not tmep_order_detail_queue_dict[code].empty(): |
| | | temp = tmep_order_detail_queue_dict[code].get() |
| | | udatas.append(temp) |
| | | if udatas: |
| | | start_time = time.time() |
| | | upload_data(code, "l2_order", udatas) |
| | | use_time = int((time.time() - start_time) * 1000) |
| | | if use_time > 20: |
| | | logger_local_huaxin_l2_upload.info(f"{code}-上传代码耗时:{use_time}ms") |
| | | |
| | | time.sleep(0.01) |
| | | with contextlib.closing(mmap.mmap(-1, 1000 * 100, tagname=l2_data_transaction_protocol.get_mmap_tag_name_for_l2_order(code), access=mmap.ACCESS_WRITE)) as _mmap: |
| | | while True: |
| | | # print("order task") |
| | | try: |
| | | if code not in target_codes: |
| | | break |
| | | # 打开共享内存 |
| | | order_detail_upload_active_time_dict[code] = time.time() |
| | | udatas = [] |
| | | while not tmep_order_detail_queue_dict[code].empty(): |
| | | temp = tmep_order_detail_queue_dict[code].get() |
| | | udatas.append(temp) |
| | | if udatas: |
| | | start_time = time.time() |
| | | # upload_data(code, "l2_order", udatas) |
| | | l2_data_transaction_protocol.send_l2_order_detail(pipe, _mmap, code, udatas) |
| | | use_time = int((time.time() - start_time) * 1000) |
| | | if use_time > 20: |
| | | logger_local_huaxin_l2_upload.info(f"{code}-上传代码耗时:{use_time}ms") |
| | | |
| | | except Exception as e: |
| | | logger_local_huaxin_l2_error.error(f"上传订单数据出错:{str(e)}") |
| | | pass |
| | | time.sleep(0.01) |
| | | |
| | | except Exception as e: |
| | | hx_logger_contact_debug.exception(e) |
| | | logger_local_huaxin_l2_error.error(f"上传订单数据出错:{str(e)}") |
| | | pass |
| | | |
| | | |
| | | def __run_upload_trans(code): |
| | |
| | | |
| | | |
| | | # 运行上传任务 |
| | | def run_upload_task(code): |
| | | def run_upload_task(code, pipe_strategy): |
| | | # 如果代码没有在目标代码中就不需要运行 |
| | | if code not in target_codes: |
| | | return |
| | | # 如果最近的活动时间小于2s就不需要运行 |
| | | if code not in order_detail_upload_active_time_dict or time.time() - order_detail_upload_active_time_dict[code] > 2: |
| | | t = threading.Thread(target=lambda: __run_upload_order(code), daemon=True) |
| | | t = threading.Thread(target=lambda: __run_upload_order(code, pipe_strategy), daemon=True) |
| | | t.start() |
| | | |
| | | if code not in transaction_upload_active_time_dict or time.time() - transaction_upload_active_time_dict[code] > 2: |
| | |
| | | |
| | | |
| | | def __test(): |
| | | code = "002073" |
| | | if code not in tmep_order_detail_queue_dict: |
| | | tmep_order_detail_queue_dict[code] = queue.Queue() |
| | | target_codes.add(code) |
| | | while True: |
| | | try: |
| | | trading_order_canceled("603106", 6114878) |
| | | tmep_order_detail_queue_dict[code].put_nowait(['002073', 0.0, 88100, '1', '2', 103831240, 2011, 18190761, 18069131, 'D', 1693276711224]) |
| | | time.sleep(5) |
| | | except: |
| | | pass |
New file |
| | |
| | | """ |
| | | L2数据传输协议 |
| | | """ |
| | | import json |
| | | |
| | | # 写入结果字典 |
| | | import random |
| | | import time |
| | | |
| | | from log_module.log import hx_logger_contact_debug |
| | | |
| | | __write_result_dict = {} |
| | | |
| | | |
| | | def __create_request_id(code, type_): |
| | | return f"{code}_{type_}_{time.time() * 1000}_{random.randint(0, 100000)}" |
| | | |
| | | |
| | | TYPE_L2_ORDER = "l2_order" |
| | | |
| | | |
| | | def __get_mmap_tag_name(code, type): |
| | | return f"{type}_{code}" |
| | | |
| | | |
| | | def get_mmap_tag_name_for_l2_order(code): |
| | | return __get_mmap_tag_name(code, TYPE_L2_ORDER) |
| | | |
| | | |
| | | # 发送l2订单明细 |
| | | def send_l2_order_detail(pipe, _mmap, code, data_json): |
| | | |
| | | _mmap.seek(0) |
| | | _mmap.write(json.dumps({"code": code, "data": data_json, "time": int(time.time() * 1000)}).encode("utf-8")) |
| | | _mmap.flush() |
| | | request_id = __create_request_id(code, "l2_order") |
| | | hx_logger_contact_debug.info("L2客户端(code-{} request_id-{}):开始发送数据", code, request_id) |
| | | pipe.send(json.dumps({"type": TYPE_L2_ORDER, "data": {"code": code}, "request_id": request_id})) |
| | | count = 0 |
| | | try: |
| | | while True: |
| | | if request_id in __write_result_dict: |
| | | hx_logger_contact_debug.info("L2客户端(code-{} request_id-{}):读取发送结果成功", code, request_id) |
| | | break |
| | | time.sleep(0.001) |
| | | count += 1 |
| | | if count >= 1000: |
| | | hx_logger_contact_debug.info("L2客户端(code-{} request_id-{}):读取发送结果超时", code, request_id) |
| | | raise Exception("读取内容超时") |
| | | finally: |
| | | # 清空共享内存数据 |
| | | _mmap.seek(0) |
| | | _mmap.write(b"") |
| | | _mmap.flush() |
| | | if request_id in __write_result_dict: |
| | | __write_result_dict.pop(request_id) |
| | | |
| | | |
| | | # 设置接受数据 |
| | | def set_write_rece_data(data_json): |
| | | request_id = data_json.get("request_id") |
| | | if request_id: |
| | | hx_logger_contact_debug.info("L2客户端(code- request_id-{}):获取到反馈结果", request_id) |
| | | __write_result_dict[request_id] = data_json |
| | | |
| | | |
| | | def set_read_l2_order(pipe, request_id): |
| | | hx_logger_contact_debug.info("策略客户端(code- request_id-{}):开始反馈结果", request_id) |
| | | pipe.send(json.dumps({"request_id": request_id, "data": {"type": "l2_order"}})) |
| | | pass |
| | |
| | | data = datas[2].split(" - ")[1].strip() |
| | | contents = data.split("#") |
| | | code = contents[0] |
| | | use_time = int(contents[1].strip().split(":")[1]) |
| | | use_time = int(contents[1].strip().split(":")[1].split("-")[-2]) |
| | | l2_data = contents[2] |
| | | l2_data = eval(l2_data) |
| | | max_time_data = None |
| | |
| | | |
| | | |
| | | if __name__ == "__main__": |
| | | analyze_l2_data_transformation("D:\\logs\\huaxin_l2\\orderdetail.2023-08-24.log") |
| | | analyze_l2_data_transformation("D:\\logs\\huaxin_l2\\orderdetail.2023-08-29.log") |
| | |
| | | import concurrent.futures |
| | | import contextlib |
| | | import datetime |
| | | import hashlib |
| | | import io |
| | | import json |
| | | import logging |
| | | import mmap |
| | | import queue |
| | | import random |
| | | import socket |
| | |
| | | from code_attribute import gpcode_manager, code_volumn_manager |
| | | from db import mysql_data_delegate as mysql_data, redis_manager_delegate as redis_manager |
| | | from db.redis_manager_delegate import RedisUtils |
| | | from huaxin_client import l1_subscript_codes_manager |
| | | from huaxin_client import l1_subscript_codes_manager, l2_data_transaction_protocol |
| | | from huaxin_client.client_network import SendResponseSkManager |
| | | from l2 import l2_data_manager_new, l2_log, code_price_manager, l2_data_util, l2_data_manager, transaction_progress |
| | | from l2.cancel_buy_strategy import HourCancelBigNumComputer, LCancelBigNumComputer, DCancelBigNumComputer, \ |
| | |
| | | code = data["code"] |
| | | timestamp = data.get("time") |
| | | datas = data["data"] |
| | | now_timestamp = int(time.time() * 1000) |
| | | async_log_util.info(hx_logger_l2_orderdetail, |
| | | f"{code}#耗时:{int(time.time() * 1000) - timestamp}-{now_timestamp}#{datas}") |
| | | l2_log.threadIds[code] = random.randint(0, 100000) |
| | | l2_data_manager_new.L2TradeDataProcessor.process_huaxin(code, datas) |
| | | TradeServerProcessor.l2_order(code, datas, timestamp) |
| | | finally: |
| | | sk.sendall(socket_util.load_header(json.dumps({"code": 0}).encode(encoding='utf-8'))) |
| | | |
| | |
| | | is_normal = l2_data_util.load_l2_data(code, load_latest=False) |
| | | volume_rate = code_volumn_manager.get_volume_rate(code) |
| | | volume_rate_index = code_volumn_manager.get_volume_rate_index(volume_rate) |
| | | m_val = L2PlaceOrderParamsManager(code, True, volume_rate, volume_rate_index, None).get_m_val()[0] |
| | | m_val = \ |
| | | L2PlaceOrderParamsManager(code, True, volume_rate, volume_rate_index, None).get_m_val()[ |
| | | 0] |
| | | limit_up_price = gpcode_manager.get_limit_up_price(code) |
| | | m_val_num = int(m_val / (float(limit_up_price) * 100)) |
| | | |
| | |
| | | datas = data["data"] |
| | | HuaXinL1TargetCodesManager.set_level_1_codes_datas(datas) |
| | | |
| | | @classmethod |
| | | def l2_order(cls, code, _datas, timestamp): |
| | | now_timestamp = int(time.time() * 1000) |
| | | async_log_util.info(hx_logger_l2_orderdetail, |
| | | f"{code}#耗时:{int(time.time() * 1000) - timestamp}-{now_timestamp}#{_datas}") |
| | | l2_log.threadIds[code] = random.randint(0, 100000) |
| | | l2_data_manager_new.L2TradeDataProcessor.process_huaxin(code, _datas) |
| | | |
| | | |
| | | def clear_invalid_client(): |
| | | while True: |
| | |
| | | type_ = val["type"] |
| | | if type_ == "set_target_codes": |
| | | TradeServerProcessor.set_target_codes(val) |
| | | except Exception as e: |
| | | logging.exception(e) |
| | | |
| | | |
| | | l2_order_thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=20) |
| | | |
| | | |
| | | def __recv_pipe_l2(pipe_l2): |
| | | def process_l2_order(code, request_id): |
| | | # 读取共享内存中的数据 |
| | | with contextlib.closing( |
| | | mmap.mmap(-1, 1000 * 100, tagname=l2_data_transaction_protocol.get_mmap_tag_name_for_l2_order(code), |
| | | access=mmap.ACCESS_READ)) as m: |
| | | s = m.read(1000 * 100) |
| | | s = s.decode('utf-8').replace('\x00', '') |
| | | hx_logger_contact_debug.info("策略客户端(code-{} request_id-{}):读取到共享内存数据", code, request_id) |
| | | if s: |
| | | print(len(s), s) |
| | | data = json.loads(s) |
| | | code = data["code"] |
| | | timestamp = data.get("time") |
| | | datas = data["data"] |
| | | try: |
| | | # TradeServerProcessor.l2_order(code, datas, timestamp) |
| | | pass |
| | | finally: |
| | | hx_logger_contact_debug.info("策略客户端(code-{} request_id-{}):数据处理完毕", code, request_id) |
| | | l2_data_transaction_protocol.set_read_l2_order(pipe_l2, request_id) |
| | | |
| | | if pipe_l2 is not None: |
| | | while True: |
| | | try: |
| | | val = pipe_l2.recv() |
| | | if val: |
| | | val = json.loads(val) |
| | | print("收到来自L2的数据:", val["type"]) |
| | | # 处理数据 |
| | | type_ = val["type"] |
| | | if type_ == l2_data_transaction_protocol.TYPE_L2_ORDER: |
| | | request_id = val["request_id"] |
| | | # 处理l2数据 |
| | | code = val["data"]["code"] |
| | | hx_logger_contact_debug.info("策略客户端(code-{} request_id-{}):接受到来自L2客户端的数据", code, request_id) |
| | | l2_order_thread_pool.submit(process_l2_order, code, request_id) |
| | | |
| | | except Exception as e: |
| | | logging.exception(e) |
| | | |
| | |
| | | logger_debug.error(e) |
| | | |
| | | |
| | | def run(pipe_trade, pipe_l1): |
| | | def run(pipe_trade, pipe_l1, pipe_l2): |
| | | # 执行一些初始化数据 |
| | | block_info.init() |
| | | |