""" 华鑫交易数据更新 """ import logging import queue import threading import time from log_module import async_log_util from log_module.log import hx_logger_trade_debug, logger_system, logger_debug from trade import trade_manager, trade_data_manager from trade.huaxin import huaxin_trade_api, huaxin_trade_record_manager from utils import huaxin_util import concurrent.futures trade_data_request_queue = queue.Queue(maxsize=1000) __process_thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=10) # 主动更新数据 def __read_update_task_queue(): logger_system.info("启动读取交易数据更新队列") while True: try: data = trade_data_request_queue.get() if data: type_ = data["type"] delay = data.get("delay") if delay and delay > 0: time.sleep(delay) async_log_util.info(hx_logger_trade_debug, f"获取交易数据开始:{type_}") try: if type_ == "delegate_list": dataJSON = huaxin_trade_api.get_delegate_list(can_cancel=False, timeout=10) if dataJSON["code"] == 0: data = dataJSON["data"] __process_thread_pool.submit(huaxin_trade_record_manager.DelegateRecordManager.add, data) elif type_ == "money": dataJSON = huaxin_trade_api.get_money() if dataJSON["code"] == 0: data = dataJSON["data"] huaxin_trade_record_manager.MoneyManager.save_data(data) if data: usefulMoney = data[0]["usefulMoney"] commission = data[0]["commission"] # 设置可用资金 trade_data_manager.AccountMoneyManager().set_available_money(0, usefulMoney) trade_data_manager.AccountMoneyManager().set_commission(commission) # 设置可用资金 elif type_ == "deal_list": dataJSON = huaxin_trade_api.get_deal_list(timeout=10) print("成交响应:", dataJSON) if dataJSON["code"] == 0: datas = dataJSON["data"] if datas is None: datas = [] try: buy_deal_codes = set() for d in datas: if str(d['direction']) == str(huaxin_util.TORA_TSTP_D_Buy): buy_deal_codes.add(d['securityID']) except Exception as e: logger_debug.exception(e) huaxin_trade_record_manager.DealRecordManager.add(datas) if datas: tempList = [ {"time": d["tradeTime"], "type": int(d['direction']), "code": d['securityID']} for d in datas] try: trade_manager.process_trade_success_data(tempList) except Exception as e: logging.exception(e) # 持仓股 elif type_ == "position_list": dataJSON = huaxin_trade_api.get_position_list() if dataJSON["code"] == 0: datas = dataJSON["data"] huaxin_trade_record_manager.PositionManager.cache(datas) __process_thread_pool.submit(huaxin_trade_record_manager.PositionManager.add, datas) async_log_util.info(hx_logger_trade_debug, f"获取交易数据成功:{type_}") except Exception as e1: # if str(e1).find("超时") >= 0: # # 读取结果超时需要重新请求 # trade_data_request_queue.put_nowait({"type": type_}) raise e1 except Exception as e: hx_logger_trade_debug.exception(e) finally: # 有0.1s的间隔 time.sleep(0.01) def get_request_queue_size(): """ 获取请求队列的大小 @return: """ return trade_data_request_queue.qsize() def repaire_task(): """ 任务修复 @return: """ queue_size = get_request_queue_size() if queue_size < 2: return threading.Thread(target=lambda: __read_update_task_queue(), daemon=True).start() def __add_data(data): trade_data_request_queue.put_nowait(data) def add_delegate_list(source, delay=0): __add_data({"type": "delegate_list", "delay": delay}) async_log_util.info(hx_logger_trade_debug, f"请求委托列表,来源:{source}") def add_deal_list(): __add_data({"type": "deal_list"}) def add_money_list(delay=0): __add_data({"type": "money", "delay": delay}) def add_position_list(): __add_data({"type": "position_list"}) # 运行 def run(): t1 = threading.Thread(target=lambda: __read_update_task_queue(), daemon=True) t1.start()