Administrator
2023-10-26 97ff6ae457b2e3356e7122a54cc437dc2e917c56
bug修复
3个文件已修改
46 ■■■■■ 已修改文件
code_attribute/code_volumn_manager.py 5 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/huaxin/huaxin_target_codes_manager.py 14 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/huaxin/huaxin_trade_api.py 27 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
code_attribute/code_volumn_manager.py
@@ -8,6 +8,7 @@
import json
from db.redis_manager_delegate import RedisUtils
from log_module import async_log_util
from utils import global_util, tool
from db import redis_manager_delegate as redis_manager
from log_module.log import logger_day_volumn
@@ -53,7 +54,7 @@
# 设置今日量
def set_today_volumn(code, volumn):
    logger_day_volumn.info("code:{} volumn:{}".format(code, volumn))
    async_log_util.info(logger_day_volumn, "code:{} volumn:{}".format(code, volumn))
    global_util.today_volumn[code] = volumn
    # 有1000手的变化才保存
    if code in __today_volumn_cache and volumn - __today_volumn_cache[code] < 100000:
@@ -66,7 +67,7 @@
def set_today_volumns(datas):
    for d in datas:
        code, volumn = d
        logger_day_volumn.info("code:{} volumn:{}".format(code, volumn))
        async_log_util.info(logger_day_volumn, "code:{} volumn:{}".format(code, volumn))
        global_util.today_volumn[code] = volumn
        # 有1000手的变化才保存
        if code in __today_volumn_cache and volumn - __today_volumn_cache[code] < 100000:
l2/huaxin/huaxin_target_codes_manager.py
@@ -11,6 +11,7 @@
from code_attribute import global_data_loader, code_volumn_manager, first_target_code_data_processor
from code_attribute.code_data_util import ZYLTGBUtil
from db import redis_manager_delegate as redis_manager
from log_module import async_log_util
from log_module.log import logger_l2_codes_subscript
from third_data import kpl_data_manager, kpl_api
from trade import current_price_process_manager
@@ -36,8 +37,8 @@
        l2_codes_queue.clear()
    @classmethod
    def push(cls, datas,request_id=None):
        l2_codes_queue.put_nowait((int(time.time()), datas,request_id))
    def push(cls, datas, request_id=None):
        l2_codes_queue.put_nowait((int(time.time()), datas, request_id))
        logger_l2_codes_subscript.info("加入L2代码处理队列:数量-{}", len(datas))
        # cls.__get_redis().lpush(cls.__L2_CODE_KEY, json.dumps())
@@ -63,7 +64,7 @@
    @classmethod
    def set_level_1_codes_datas(cls, datas, request_id=None):
        logger_l2_codes_subscript.info(f"({request_id})接受到L1的数据,开始预处理")
        async_log_util.info(logger_l2_codes_subscript, f"({request_id})接受到L1的数据,开始预处理")
        yesterday_codes = kpl_data_manager.get_yesterday_limit_up_codes()
        # 订阅的代码
        flist = []
@@ -84,7 +85,10 @@
                continue
            # 获取自由流通市值
            if code not in global_util.zyltgb_map:
                __start_time = time.time()
                zylt = kpl_api.getZYLTAmount(code)
                async_log_util.info(logger_l2_codes_subscript,
                                    f"{request_id} {code}获取自由流通市值耗时-{round((time.time() - __start_time) * 1000)}ms")
                if zylt:
                    # 保存自由流通股本
                    ZYLTGBUtil.save_async(code, zylt // 10000, 1)
@@ -100,10 +104,10 @@
                     "zyltgb": zyltgb // 10000, "zyltgbUnit": 1}
            flist.append(fitem)
        code_volumn_manager.set_today_volumns(temp_volumns)
        logger_l2_codes_subscript.info(f"{request_id}接受到L1的数据,预处理完成")
        async_log_util.info(logger_l2_codes_subscript, f"{request_id}接受到L1的数据,预处理完成")
        try:
            tick_datas = first_target_code_data_processor.process_first_codes_datas(flist, request_id)
            current_price_process_manager.accept_prices(tick_datas,request_id)
            current_price_process_manager.accept_prices(tick_datas, request_id)
        except Exception as e:
            logging.exception(e)
trade/huaxin/huaxin_trade_api.py
@@ -5,6 +5,7 @@
import json
import logging
import multiprocessing
import queue
import random
import threading
import time
@@ -21,6 +22,18 @@
from utils import socket_util, huaxin_util, tool
__response_thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=15)
__save_data_queue = queue.Queue()
def __run_save_data():
    while True:
        try:
            data = __save_data_queue.get()
            huaxin_trade_record_manager.DelegateRecordManager.add_one(data)
        except:
            pass
        finally:
            time.sleep(0.1)
def __run_recv_queue_trade(queue: multiprocessing.Queue):
@@ -49,11 +62,6 @@
                        async_log_util.info(hx_logger_trade_callback,
                                            f"response:request_id-{request_id}")
                        __response_thread_pool.submit(__set_response, data_json)
                        if request_id.find("trade") > 0:
                            data = data_json["data"].get("data")
                            if data:
                                __response_thread_pool.submit(huaxin_trade_record_manager.DelegateRecordManager.add_one,
                                                              data)
                    elif type_ == "trade_callback":
                        try:
                            # 交易回调
@@ -75,7 +83,6 @@
                                direction = data.get("direction")
                                limitPrice = data.get("limitPrice")
                                volume = data.get("volume")
                                async_log_util.info(hx_logger_trade_callback, f"华鑫订单状态上报:code-{code} orderRef-{orderRef} orderSysID-{orderSysID} orderStatus-{orderStatus}")
                                is_shadow_order = False
                                # 获取涨停价
                                limit_up_price = gpcode_manager.get_limit_up_price(code)
@@ -90,13 +97,9 @@
                                try:
                                    TradeResultProcessor.process_order(order)
                                finally:
                                    try:
                                        # 加入2次,增大加入成功率
                                        __response_thread_pool.submit(
                                            huaxin_trade_record_manager.DelegateRecordManager.add_one, data)
                                        async_log_util.info(hx_logger_trade_callback,
                                                            f"华鑫订单状态处理完成:code-{code} orderRef-{orderRef} orderSysID-{orderSysID} orderStatus-{orderStatus}")
                                        __save_data_queue.put_nowait(data)
                                    except Exception as e:
                                        hx_logger_trade_debug.exception(e)
@@ -120,6 +123,8 @@
    queue_strategy_w_trade_r = queue_strategy_w_trade_r_
    t1 = threading.Thread(target=lambda: __run_recv_queue_trade(queue_strategy_r_trade_w_), daemon=True)
    t1.start()
    t1 = threading.Thread(target=lambda: __run_save_data(), daemon=True)
    t1.start()
    t1 = threading.Thread(target=lambda: CancelOrderManager().run(cancel_order), daemon=True)
    t1.start()