Administrator
2023-09-18 0ced5496fcfcb9de9927793023e8692fe792b477
bug修复
5个文件已修改
207 ■■■■ 已修改文件
huaxin_client/command_manager.py 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test/test.py 39 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/huaxin/huaxin_trade_api.py 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/trade_huaxin.py 80 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/trade_manager.py 82 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
huaxin_client/command_manager.py
@@ -100,9 +100,9 @@
            if _type == CLIENT_TYPE_TRADE:
                # 交易
                ctype = data["trade_type"]
                async_log_util.info(logger_trade, f"交易开始")
                async_log_util.info(logger_trade, f"交易开始:{request_id}")
                cls.action_callback.OnTrade(client_id, request_id, sk, ctype, data)
                async_log_util.info(logger_trade, f"交易结束")
                async_log_util.info(logger_trade, f"交易结束:{request_id}")
            elif _type == CLIENT_TYPE_MONEY:
                cls.action_callback.OnMoney(client_id, request_id, sk)
            elif _type == CLIENT_TYPE_DEAL_LIST:
test/test.py
@@ -1,22 +1,39 @@
import multiprocessing
import queue
import threading
import time
from huaxin_client import l2_data_manager
from log_module import async_log_util
from log_module.log import logger_debug
__queue = queue.Queue()
def start_thread():
    print(threading.current_thread().getName(), threading.current_thread().ident)
def add_data(msg):
    time.sleep(1)
    start_time = time.time()
    __queue.put({"msg": msg})
    end_time = time.time()
    if end_time - start_time > 0.002:
        print("加入日志耗时")
def test_process_1(pipe):
    while True:
        for i in range(10):
            pipe.send_bytes(f"hello world:{i}".encode("utf-8"))
            time.sleep(1)
def test_process_2(pipe):
    while True:
        results = pipe.recv_bytes()
        if results:
            print("接受到内容:", results)
if __name__ == "__main__":
    _queue = queue.Queue()
    _queue.put("123123")
    print(_queue.get(False))
    print(_queue.get(False))
    p1, p2 = multiprocessing.Pipe()
    # L1订阅数据
    progress1 = multiprocessing.Process(target=lambda: test_process_1(p1))
    progress2 = multiprocessing.Process(target=lambda: test_process_2(p2))
    progress1.start()
    progress2.start()
    input()
trade/huaxin/huaxin_trade_api.py
@@ -256,7 +256,7 @@
            trade_cmd_callback(TradeRequest(_type, root_data, request_id))
        else:
            start_time = time.time()
            pipe_trade.send(json.dumps(root_data).encode("utf-8"))
            pipe_trade.send_bytes(json.dumps(root_data).encode("utf-8"))
            use_time = int((time.time() - start_time)*1000)
            if use_time > 10:
                async_log_util.info(hx_logger_trade_loop, f"发送耗时:request_id-{request_id} 耗时时间:{use_time}")
trade/trade_huaxin.py
@@ -34,47 +34,51 @@
# 通过量下单,返回(代码,账号ID,订单号)
def order_volume(code, price, count, last_data_index, order_ref=None):
    async_log_util.info(logger_trade, f"{code}下单方法开始")
    price = round(float(price), 2)
    if code.find("00") != 0 and code.find("60") != 0:
        raise Exception("只支持00开头与60开头的代码下单")
    # 保存下单信息
    huaxin.huaxin_delegate_postion_manager.place_order(code, price, count, last_data_index)
    if not constant.TRADE_ENABLE:
        return
    result = None
    blocking = False
    async_log_util.info(logger_trade, f"{code} trade_huaxin.order_volume 开始")
    try:
        async_log_util.info(logger_trade, f"{code}下单开始")
        result = huaxin_trade_api.order(1, code, count, price, blocking=blocking, order_ref=order_ref)
        async_log_util.info(logger_trade, f"{code}下单结束")
    except Exception as e:
        if str(e).find("超时") >= 0:
            # 此处出现超时,需要通过读取委托列表来设置订单信息
            async_log_util.error(hx_logger_trade_debug, f"{code}:下单结果反馈出错-{str(e)}")
        else:
            raise e
    if result:
        if blocking:
            if result['code'] == 0:
                result = result["data"]
                if result["orderStatus"] == huaxin_util.TORA_TSTP_OST_Rejected:
                    async_log_util.info(hx_logger_trade_debug, f"{code}:下单失败:{result.get('statusMsg')}")
                    raise Exception(result.get('statusMsg'))
                else:
                    __TradeOrderIdManager.add_order_id(code, result["accountID"], result["orderSysID"])
                    async_log_util.info(hx_logger_trade_debug, f"{code}:下单成功 orderSysID:{result['orderSysID']}")
                    return result["securityId"], result["accountID"], result["orderSysID"]
        price = round(float(price), 2)
        if code.find("00") != 0 and code.find("60") != 0:
            raise Exception("只支持00开头与60开头的代码下单")
        # 保存下单信息
        huaxin.huaxin_delegate_postion_manager.place_order(code, price, count, last_data_index)
        if not constant.TRADE_ENABLE:
            return
        result = None
        blocking = False
        try:
            async_log_util.info(logger_trade, f"{code}下单开始")
            result = huaxin_trade_api.order(1, code, count, price, blocking=blocking, order_ref=order_ref)
            async_log_util.info(logger_trade, f"{code}下单结束")
        except Exception as e:
            if str(e).find("超时") >= 0:
                # 此处出现超时,需要通过读取委托列表来设置订单信息
                async_log_util.error(hx_logger_trade_debug, f"{code}:下单结果反馈出错-{str(e)}")
            else:
                raise Exception(result['msg'])
                raise e
        if result:
            if blocking:
                if result['code'] == 0:
                    result = result["data"]
                    if result["orderStatus"] == huaxin_util.TORA_TSTP_OST_Rejected:
                        async_log_util.info(hx_logger_trade_debug, f"{code}:下单失败:{result.get('statusMsg')}")
                        raise Exception(result.get('statusMsg'))
                    else:
                        __TradeOrderIdManager.add_order_id(code, result["accountID"], result["orderSysID"])
                        async_log_util.info(hx_logger_trade_debug, f"{code}:下单成功 orderSysID:{result['orderSysID']}")
                        return result["securityId"], result["accountID"], result["orderSysID"]
                else:
                    raise Exception(result['msg'])
            else:
                order_ref = result["order_ref"]
                __TradeOrderIdManager.add_order_ref(code, order_ref)
                async_log_util.info(hx_logger_trade_debug, f"{code}:下单成功 orderRef:{order_ref}")
                return code, "local", order_ref
        else:
            order_ref = result["order_ref"]
            __TradeOrderIdManager.add_order_ref(code, order_ref)
            async_log_util.info(hx_logger_trade_debug, f"{code}:下单成功 orderRef:{order_ref}")
            return code, "local", order_ref
    else:
        raise Exception("下单失败,无返回")
            raise Exception("下单失败,无返回")
    finally:
        async_log_util.info(logger_trade, f"{code} trade_huaxin.order_volume 结束")
def order_success(code, accountId, orderSysID, orderRef, insertTime):
trade/trade_manager.py
@@ -5,11 +5,10 @@
# 交易管理器
import datetime
import json
import random
import threading
import time
import dask
import concurrent.futures
from code_attribute import gpcode_manager
from db import mysql_data_delegate as mysql_data, redis_manager_delegate as redis_manager
@@ -393,20 +392,17 @@
# 开始交易
def start_buy(code, capture_timestamp, last_data, last_data_index):
    # @dask.delayed
    def is_forbidden(code):
        if l2_trade_util.is_in_forbidden_trade_codes(code):
            return Exception("禁止交易")
        return None, None
    # @dask.delayed
    def is_state_right(code):
        trade_state = __CodesTradeStateManager.get_trade_state_cache(code)
        if trade_state != TRADE_STATE_NOT_TRADE and trade_state != TRADE_STATE_BUY_CANCEL_SUCCESS and trade_state != TRADE_STATE_BUY_CANCEL_ING:
            return Exception("代码处于不可交易状态"), trade_state
        return None, trade_state
    # @dask.delayed
    def is_money_enough(code):
        money = AccountAvailableMoneyManager().get_available_money_cache()
        if money is None:
@@ -419,32 +415,25 @@
            return Exception("账户可用资金不足"), price
        return None, price
    # @dask.delayed
    def can_trade(*args):
        for arg in args:
            if arg[0] is not None:
                return arg[0], None, None
        return None, args[1][1], args[2][1]
    _start_time = tool.get_now_timestamp()
    ex = is_forbidden(code)[0]
    if ex:
        raise ex
    ex, trade_state = is_state_right(code)
    if ex:
        raise ex
    ex, price = is_money_enough(code)
    if ex:
        raise ex
    print("开始买入")
    async_log_util.info(logger_trade, "{}开始买入".format(code))
    __CodesTradeStateManager.set_trade_state(code, TRADE_STATE_BUY_PLACE_ORDER)
    # 状态改变过后必须要有本地下单编号
    # _start_time = l2_data_log.l2_time(code, tool.get_now_timestamp() - _start_time, "买入判断时间", force=True)
    __buy(code, price, trade_state, capture_timestamp, last_data, last_data_index)
    # l2_data_log.l2_time(code, tool.get_now_timestamp() - _start_time, "异步买入时间", force=True)
    async_log_util.info(logger_trade, "{} trade.manager.start_buy 开始".format(code))
    try:
        try:
            ex = is_forbidden(code)[0]
            if ex:
                raise ex
            ex, trade_state = is_state_right(code)
            if ex:
                raise ex
            ex, price = is_money_enough(code)
            if ex:
                raise ex
        finally:
            async_log_util.info(logger_trade, "{} trade.manager.start_buy 判断是否可买".format(code))
        __CodesTradeStateManager.set_trade_state(code, TRADE_STATE_BUY_PLACE_ORDER)
        # 状态改变过后必须要有本地下单编号
        __buy(code, price, trade_state, capture_timestamp, last_data, last_data_index)
    finally:
        async_log_util.info(logger_trade, "{} trade.manager.start_buy 结束".format(code))
# 中断买入
@@ -455,7 +444,9 @@
# 购买
# @tool.async_call
def __buy(code, price, trade_state, capture_timestamp, last_data, last_data_index):
    async_log_util.info(logger_trade, "{} trade_manager.__buy 开始".format(code))
    try:
        if constant.API_TRADE_ENABLE:
            count = (constant.BUY_MONEY_PER_CODE // int(round(float(price) * 100))) * 100
            # 最低下单1手
@@ -475,6 +466,8 @@
        logger_trade.error("{}买入异常{}".format(code, str(e)))
        logger_trade.exception(e)
        raise e
    finally:
        async_log_util.info(logger_trade, "{} trade_manager.__buy 结束".format(code))
# 下单成功
@@ -501,20 +494,20 @@
__CodesTradeStateManager = CodesTradeStateManager()
__cancel_order_thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=10)
# 开始取消买入
def start_cancel_buy(code, force=False):
    async_log_util.info(logger_trade, "{}进入撤单方法".format(code))
    async_log_util.info(logger_trade, "{} trade_manager.start_cancel_buy 开始".format(code))
    trade_state = __CodesTradeStateManager.get_trade_state_cache(code)
    if trade_state == TRADE_STATE_BUY_SUCCESS:
        return None
    if not force:
        if trade_state != TRADE_STATE_BUY_PLACE_ORDER and trade_state != TRADE_STATE_BUY_DELEGATED:
            return None
    try:
        async_log_util.info(logger_trade, "{}开始撤单".format(code))
        if trade_state == TRADE_STATE_BUY_SUCCESS:
            return None
        if not force:
            if trade_state != TRADE_STATE_BUY_PLACE_ORDER and trade_state != TRADE_STATE_BUY_DELEGATED:
                return None
        __CodesTradeStateManager.set_trade_state(code, TRADE_STATE_BUY_CANCEL_ING)
        async_log_util.info(logger_trade, "{}撤单方法开始".format(code))
        if constant.API_TRADE_ENABLE:
            if constant.TRADE_WAY == constant.TRADE_WAY_JUEJIN:
                trade_juejin.cancel_order(code)
@@ -522,11 +515,10 @@
                trade_huaxin.cancel_order(code)
        else:
            guiTrade.cancel_buy(code)
        async_log_util.info(logger_trade, "{}撤单方法结束".format(code))
        __cancel_success(code)
        # 再次撤单
        if constant.TRADE_WAY == constant.TRADE_WAY_HUAXIN:
            threading.Thread(target=lambda: trade_huaxin.cancel_order(code, msg="再次撤单"), daemon=True).start()
            __cancel_order_thread_pool.submit(lambda: trade_huaxin.cancel_order(code, msg="再次撤单"))
        # 不需要再次撤单了
        # try:
        #     cancel_buy_again(code)
@@ -535,9 +527,10 @@
    except Exception as e:
        # 状态还原
        CodesTradeStateManager().set_trade_state(code, trade_state)
        logger_trade.error("{}撤单异常:{}".format(code, str(e)))
        async_log_util.error(logger_trade, "{}撤单异常:{}".format(code, str(e)))
        raise e
    async_log_util.info(logger_trade, "{}撤单完毕".format(code))
    finally:
        async_log_util.info(logger_trade, "{} trade_manager.start_cancel_buy 结束".format(code))
# 再次撤单,防止没有撤掉
@@ -664,9 +657,6 @@
            RedisUtils.delete(redis_l2, k, auto_free=False)
    finally:
        RedisUtils.realse(redis_l2)
if __name__ == "__main__":