code_attribute/code_volumn_manager.py | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
l2/huaxin/huaxin_target_codes_manager.py | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
trade/huaxin/huaxin_trade_api.py | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 |
code_attribute/code_volumn_manager.py
@@ -8,6 +8,7 @@ import json from db.redis_manager_delegate import RedisUtils from log_module import async_log_util from utils import global_util, tool from db import redis_manager_delegate as redis_manager from log_module.log import logger_day_volumn @@ -53,7 +54,7 @@ # 设置今日量 def set_today_volumn(code, volumn): logger_day_volumn.info("code:{} volumn:{}".format(code, volumn)) async_log_util.info(logger_day_volumn, "code:{} volumn:{}".format(code, volumn)) global_util.today_volumn[code] = volumn # 有1000手的变化才保存 if code in __today_volumn_cache and volumn - __today_volumn_cache[code] < 100000: @@ -66,7 +67,7 @@ def set_today_volumns(datas): for d in datas: code, volumn = d logger_day_volumn.info("code:{} volumn:{}".format(code, volumn)) async_log_util.info(logger_day_volumn, "code:{} volumn:{}".format(code, volumn)) global_util.today_volumn[code] = volumn # 有1000手的变化才保存 if code in __today_volumn_cache and volumn - __today_volumn_cache[code] < 100000: l2/huaxin/huaxin_target_codes_manager.py
@@ -11,6 +11,7 @@ from code_attribute import global_data_loader, code_volumn_manager, first_target_code_data_processor from code_attribute.code_data_util import ZYLTGBUtil from db import redis_manager_delegate as redis_manager from log_module import async_log_util from log_module.log import logger_l2_codes_subscript from third_data import kpl_data_manager, kpl_api from trade import current_price_process_manager @@ -63,7 +64,7 @@ @classmethod def set_level_1_codes_datas(cls, datas, request_id=None): logger_l2_codes_subscript.info(f"({request_id})接受到L1的数据,开始预处理") async_log_util.info(logger_l2_codes_subscript, f"({request_id})接受到L1的数据,开始预处理") yesterday_codes = kpl_data_manager.get_yesterday_limit_up_codes() # 订阅的代码 flist = [] @@ -84,7 +85,10 @@ continue # 获取自由流通市值 if code not in global_util.zyltgb_map: __start_time = time.time() zylt = kpl_api.getZYLTAmount(code) async_log_util.info(logger_l2_codes_subscript, f"{request_id} {code}获取自由流通市值耗时-{round((time.time() - __start_time) * 1000)}ms") if zylt: # 保存自由流通股本 ZYLTGBUtil.save_async(code, zylt // 10000, 1) @@ -100,7 +104,7 @@ "zyltgb": zyltgb // 10000, "zyltgbUnit": 1} flist.append(fitem) code_volumn_manager.set_today_volumns(temp_volumns) logger_l2_codes_subscript.info(f"{request_id}接受到L1的数据,预处理完成") async_log_util.info(logger_l2_codes_subscript, f"{request_id}接受到L1的数据,预处理完成") try: tick_datas = first_target_code_data_processor.process_first_codes_datas(flist, request_id) current_price_process_manager.accept_prices(tick_datas,request_id) trade/huaxin/huaxin_trade_api.py
@@ -5,6 +5,7 @@ import json import logging import multiprocessing import queue import random import threading import time @@ -21,6 +22,18 @@ from utils import socket_util, huaxin_util, tool __response_thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=15) __save_data_queue = queue.Queue() def __run_save_data(): while True: try: data = __save_data_queue.get() huaxin_trade_record_manager.DelegateRecordManager.add_one(data) except: pass finally: time.sleep(0.1) def __run_recv_queue_trade(queue: multiprocessing.Queue): @@ -49,11 +62,6 @@ async_log_util.info(hx_logger_trade_callback, f"response:request_id-{request_id}") __response_thread_pool.submit(__set_response, data_json) if request_id.find("trade") > 0: data = data_json["data"].get("data") if data: __response_thread_pool.submit(huaxin_trade_record_manager.DelegateRecordManager.add_one, data) elif type_ == "trade_callback": try: # 交易回调 @@ -75,7 +83,6 @@ direction = data.get("direction") limitPrice = data.get("limitPrice") volume = data.get("volume") async_log_util.info(hx_logger_trade_callback, f"华鑫订单状态上报:code-{code} orderRef-{orderRef} orderSysID-{orderSysID} orderStatus-{orderStatus}") is_shadow_order = False # 获取涨停价 limit_up_price = gpcode_manager.get_limit_up_price(code) @@ -90,13 +97,9 @@ try: TradeResultProcessor.process_order(order) finally: try: # 加入2次,增大加入成功率 __response_thread_pool.submit( huaxin_trade_record_manager.DelegateRecordManager.add_one, data) async_log_util.info(hx_logger_trade_callback, f"华鑫订单状态处理完成:code-{code} orderRef-{orderRef} orderSysID-{orderSysID} orderStatus-{orderStatus}") __save_data_queue.put_nowait(data) except Exception as e: hx_logger_trade_debug.exception(e) @@ -120,6 +123,8 @@ queue_strategy_w_trade_r = queue_strategy_w_trade_r_ t1 = threading.Thread(target=lambda: __run_recv_queue_trade(queue_strategy_r_trade_w_), daemon=True) t1.start() t1 = threading.Thread(target=lambda: __run_save_data(), daemon=True) t1.start() t1 = threading.Thread(target=lambda: CancelOrderManager().run(cancel_order), daemon=True) t1.start()