"""
|
华鑫交易数据更新
|
"""
|
import json
|
import logging
|
import queue
|
import threading
|
import time
|
|
from code_attribute import gpcode_manager
|
from huaxin_client import constant as huaxin_client_constant
|
from log_module import async_log_util
|
from log_module.log import hx_logger_trade_debug, logger_system
|
from records import huaxin_trade_record_manager
|
from trade import huaxin_trade_api
|
from trade.trade_manager import CodeTradeStateManager
|
|
from utils import huaxin_util, tool
|
import concurrent.futures
|
|
trade_data_request_queue = queue.Queue()
|
__process_thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=10)
|
|
|
# 主动更新数据
|
def __read_update_task_queue():
|
logger_system.info("启动读取交易数据更新队列")
|
while True:
|
try:
|
data = trade_data_request_queue.get()
|
if data:
|
type_ = data["type"]
|
delay = data.get("delay")
|
if delay and delay > 0:
|
time.sleep(delay)
|
async_log_util.info(hx_logger_trade_debug, f"获取交易数据开始:{type_}")
|
try:
|
if type_ == "delegate_list":
|
dataJSON = huaxin_trade_api.get_delegate_list(can_cancel=False, timeout=10)
|
if dataJSON["code"] == 0:
|
data = dataJSON["data"]
|
__process_thread_pool.submit(huaxin_trade_record_manager.DelegateRecordManager.add, data)
|
# 是否可以撤单
|
if data:
|
codes = []
|
for d in data:
|
code = d["securityID"]
|
orderStatus = d["orderStatus"]
|
orderSysID = d.get("orderSysID")
|
orderRef = d["orderRef"]
|
accountID = d["accountID"]
|
insertTime = d.get('insertTime')
|
acceptTime = d.get('acceptTime')
|
insertDate = d.get('insertDate')
|
direction = d.get("direction")
|
limitPrice = d.get("limitPrice")
|
volume = d.get("volume")
|
# 获取涨停价
|
is_shadow_order = False
|
limit_up_price = gpcode_manager.get_limit_up_price_cache(code)
|
if limit_up_price and volume == huaxin_client_constant.SHADOW_ORDER_VOLUME:
|
|
if abs(float(limitPrice) - float(limit_up_price)) >= 0.01:
|
is_shadow_order = True
|
if is_shadow_order:
|
continue
|
|
if huaxin_util.is_can_cancel(orderStatus):
|
codes.append(code)
|
if codes:
|
pass
|
|
elif type_ == "money":
|
dataJSON = huaxin_trade_api.get_money()
|
if dataJSON["code"] == 0:
|
data = dataJSON["data"]
|
huaxin_trade_record_manager.MoneyManager.save_data(data)
|
# if data:
|
# usefulMoney = data[0]["usefulMoney"]
|
# # 设置可用资金
|
# trade_manager.AccountAvailableMoneyManager().set_available_money(0, usefulMoney)
|
# 设置可用资金
|
elif type_ == "deal_list":
|
dataJSON = huaxin_trade_api.get_deal_list(timeout=10)
|
print("成交响应:", dataJSON)
|
if dataJSON["code"] == 0:
|
datas = dataJSON["data"]
|
# 今日成交了的票就不下单了
|
for d in datas:
|
if d["volume"] > 0 and int(d["direction"]) == 0:
|
CodeTradeStateManager().set_trade_state(d["securityID"], 0,
|
CodeTradeStateManager.TRADE_STATE_ALREADY_BUY)
|
huaxin_trade_record_manager.DealRecordManager.add(datas)
|
# 持仓股
|
elif type_ == "position_list":
|
dataJSON = huaxin_trade_api.get_position_list()
|
if dataJSON["code"] == 0:
|
datas = dataJSON["data"]
|
huaxin_trade_record_manager.PositionManager.cache(datas)
|
__process_thread_pool.submit(huaxin_trade_record_manager.PositionManager.add, datas)
|
async_log_util.info(hx_logger_trade_debug, f"获取交易数据成功:{type_}")
|
except Exception as e1:
|
# if str(e1).find("超时") >= 0:
|
# # 读取结果超时需要重新请求
|
# trade_data_request_queue.put_nowait({"type": type_})
|
raise e1
|
except Exception as e:
|
hx_logger_trade_debug.exception(e)
|
finally:
|
# 有0.1s的间隔
|
time.sleep(0.01)
|
|
|
def __add_data(data):
|
trade_data_request_queue.put_nowait(data)
|
|
|
def add_delegate_list(source, delay=0):
|
__add_data({"type": "delegate_list", "delay": delay})
|
async_log_util.info(hx_logger_trade_debug, f"请求委托列表,来源:{source}")
|
|
|
def add_deal_list():
|
__add_data({"type": "deal_list"})
|
|
|
def add_money_list(delay=0):
|
__add_data({"type": "money", "delay": delay})
|
|
|
def add_position_list():
|
__add_data({"type": "position_list"})
|
|
|
# 运行
|
def run():
|
"""
|
运行读取接收命令的队列
|
:return:
|
"""
|
t1 = threading.Thread(target=lambda: __read_update_task_queue(), daemon=True)
|
t1.start()
|