"""
|
华鑫交易数据更新
|
"""
|
import json
|
import queue
|
import threading
|
import time
|
|
import constant
|
from code_atrribute import gpcode_manager
|
from code_atrribute.history_k_data_util import HistoryKDatasUtils
|
from code_atrribute.position_code_data_manager import PositionCodeDataManager
|
from log_module import async_log_util
|
from log_module.log import hx_logger_trade_debug, logger_system, printlog
|
from trade import huaxin_trade_api, huaxin_trade_record_manager
|
|
import concurrent.futures
|
|
from utils import tool
|
|
trade_data_request_queue = queue.Queue()
|
__process_thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=10)
|
|
|
# 主动更新数据
|
def __read_update_task_queue():
|
logger_system.info("启动读取交易数据更新队列")
|
while True:
|
try:
|
data = trade_data_request_queue.get()
|
if data:
|
type_ = data["type"]
|
delay = data.get("delay")
|
if delay and delay > 0:
|
time.sleep(delay)
|
async_log_util.info(hx_logger_trade_debug, f"获取交易数据开始:{type_}")
|
try:
|
# 持仓股
|
if type_ == "position_list":
|
dataJSON = huaxin_trade_api.get_position_list()
|
async_log_util.info(hx_logger_trade_debug, f"获取交易数据成功:{type_}")
|
if dataJSON["code"] == 0:
|
datas = dataJSON["data"]
|
huaxin_trade_record_manager.PositionManager.cache(datas)
|
# 获取持仓股的涨停价
|
position_codes = set()
|
# position_codes.add("113565")
|
need_update_codes = []
|
for d in datas:
|
if (d["prePosition"] > 0 and tool.is_can_sell_code(d["securityID"])) or d[
|
"todayBSPos"] > 0:
|
# 订阅以往持仓和今日买入
|
position_codes.add(d["securityID"])
|
# 获取昨日收盘价
|
if not gpcode_manager.get_price_pre_cache(d["securityID"]):
|
# 获取昨日的收盘价
|
need_update_codes.append(d["securityID"])
|
try:
|
PositionCodeDataManager.request_pre_volume(d["securityID"])
|
except:
|
pass
|
if need_update_codes:
|
gpcode_manager.request_price_pre(need_update_codes)
|
async_log_util.info(hx_logger_trade_debug, f"获取收盘价:{type_}")
|
queue_l1_trade_r_strategy_w.put_nowait(
|
{"type": "set_target_codes", "data": list(position_codes)})
|
constant.SUBSCRIPT_L2_CODES |= position_codes
|
queue_strategy_w_l2_r.put_nowait(json.dumps(
|
{"type": "l2_cmd", "data": list(constant.SUBSCRIPT_L2_CODES)}))
|
# 9点25之前需要订阅持仓票
|
__process_thread_pool.submit(huaxin_trade_record_manager.PositionManager.add, datas)
|
async_log_util.info(hx_logger_trade_debug, f"获取交易数据结束:{type_}")
|
except Exception as e1:
|
# if str(e1).find("超时") >= 0:
|
# # 读取结果超时需要重新请求
|
# trade_data_request_queue.put_nowait({"type": type_})
|
raise e1
|
except Exception as e:
|
hx_logger_trade_debug.exception(e)
|
finally:
|
# 有0.1s的间隔
|
time.sleep(0.01)
|
|
|
def __add_data(data):
|
trade_data_request_queue.put_nowait(data)
|
|
|
def add_delegate_list(source, delay=0):
|
__add_data({"type": "delegate_list", "delay": delay})
|
async_log_util.info(hx_logger_trade_debug, f"请求委托列表,来源:{source}")
|
|
|
def add_deal_list():
|
__add_data({"type": "deal_list"})
|
|
|
def add_money_list(delay=0):
|
__add_data({"type": "money", "delay": delay})
|
|
|
def add_position_list():
|
__add_data({"type": "position_list"})
|
|
|
def test():
|
time.sleep(2)
|
position_codes = ["000333"]
|
queue_l1_trade_r_strategy_w.put_nowait(
|
{"type": "set_target_codes", "data": list(position_codes)})
|
|
|
# 运行
|
def run(queue_l1_trade_r_strategy_w_, queue_strategy_w_l2_r_):
|
global queue_l1_trade_r_strategy_w, queue_strategy_w_l2_r
|
queue_l1_trade_r_strategy_w = queue_l1_trade_r_strategy_w_
|
queue_strategy_w_l2_r = queue_strategy_w_l2_r_
|
t1 = threading.Thread(target=lambda: __read_update_task_queue(), daemon=True)
|
t1.start()
|
# threading.Thread(target=lambda: test(), daemon=True).start()
|
|
|
if __name__ == "__main__":
|
printlog(HistoryKDatasUtils.get_gp_latest_info(["000333"]))
|