Administrator
2024-05-21 f8aa6f5fab0b354a06029237e325cda7dbd88a53
bug修复
2个文件已添加
2个文件已修改
201 ■■■■■ 已修改文件
main.py 125 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
records/huaxin_trade_record_manager.py 8 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/huaxin_sell_util.py 33 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
utils/sell_util.py 35 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
main.py
@@ -1,18 +1,58 @@
"""
可转债入口函数
"""
import json
import logging
import multiprocessing
import threading
import time
from code_attribute import target_codes_manager
from code_attribute import target_codes_manager, gpcode_manager
from huaxin_client import l2_client_for_cb, trade_client_for_cb
from huaxin_client.client_network import SendResponseSkManager
from log_module import async_log_util
from trade import huaxin_trade_api, huaxin_trade_data_update
from utils import middle_api_protocol, outside_api_command_manager, constant
from records import huaxin_trade_record_manager
from trade import huaxin_trade_api, huaxin_trade_data_update, huaxin_sell_util
from utils import middle_api_protocol, outside_api_command_manager, constant, tool, huaxin_util, socket_util, sell_util
constant.LOG_DIR = "logs_cb"
from log_module.log import logger_debug, logger_trade
from log_module.log import logger_debug, logger_trade, printlog
import concurrent.futures
__cancel_sell_thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=8)
def __send_response(data_bytes):
    sk = SendResponseSkManager.create_send_response_sk(addr=middle_api_protocol.SERVER_HOST,
                                                       port=middle_api_protocol.SERVER_PORT)
    try:
        data_bytes = socket_util.load_header(data_bytes)
        sk.sendall(data_bytes)
        result, header_str = socket_util.recv_data(sk)
        result = json.loads(result)
        if result["code"] != 0:
            raise Exception(result['msg'])
    finally:
        sk.close()
def send_response(data, _client_id, _request_id):
    data_bytes = json.dumps({"type": "response", "data": data, "client_id": _client_id,
                             "request_id": _request_id}).encode('utf-8')
    for i in range(3):
        try:
            __send_response(data_bytes)
            printlog("发送数据成功")
            break
        except Exception as e1:
            logging.exception(e1)
# 撤长期没有成交的单
def __cancel_not_deal_order(code, order_ref, timeout=3):
    time.sleep(timeout)
    # 撤买单
    huaxin_trade_api.cancel_order(1, code, "", orderRef=order_ref)
def command_callback(client_id, request_id, data):
@@ -25,20 +65,81 @@
    """
    type_ = data.get('type')
    if type_ == outside_api_command_manager.API_TYPE_TRADE:
        # 交易
        pass
        try:
            trade_type = data["trade_type"]
            if trade_type == outside_api_command_manager.TRADE_TYPE_ORDER:
                code = data["code"]
                direction = data["direction"]
                volume = data["volume"]
                price_type = data["price_type"]
                price = data["price"]
                sinfo = data["sinfo"]
                if direction == 2:
                    # price_type: 0-价格笼子 1-跌停价  2-涨停价 3-现价 4-买5价
                    async_log_util.info(logger_trade, f"API卖: 接收数据-{data}")
                    current_price = None  # L1DataProcessor.get_l1_current_price(code)
                    limit_down_price = target_codes_manager.get_limit_down_price(code)
                    limit_up_price = target_codes_manager.get_limit_up_price(code)
                    order_ref = huaxin_util.create_order_ref()
                    try:
                        result = huaxin_sell_util.start_sell(code, volume, price_type, limit_up_price,
                                                             limit_down_price,
                                                             current_price, blocking=True, request_id=request_id,
                                                             order_ref=order_ref)
                        async_log_util.info(logger_trade, f"API卖结果: {result}")
                        send_response(result, client_id, request_id)
                    except Exception as e:
                        if str(e).find("超时") >= 0:
                            send_response({"code": 0, "data": {"orderRef": order_ref}}, client_id, request_id)
                        else:
                            raise e
                else:
                    if not price:
                        limit_down_price = target_codes_manager.get_limit_down_price(code)
                        limit_up_price = target_codes_manager.get_limit_up_price(code)
                        price = sell_util.get_sell_price(price_type, limit_up_price, limit_down_price, None)
                        if not price:
                            raise Exception("尚未获取到买入价格")
                        # 获取买1金额
                        price = round(float(price), 3)
                        order_ref = huaxin_util.create_order_ref()
                        result = huaxin_trade_api.order(direction, code, volume, price,
                                                        sinfo=sinfo, order_ref=order_ref,
                                                        blocking=True, request_id=request_id)
                        # 2s内没成交就撤单
                        __cancel_sell_thread_pool.submit(__cancel_not_deal_order, code, order_ref)
                    else:
                        result = huaxin_trade_api.order(direction, code, volume, price,
                                                        sinfo=sinfo,
                                                        blocking=True, request_id=request_id)
                    send_response({"code": 0, "data": result}, client_id, request_id)
        except Exception as e:
            logger_debug.exception(e)
            send_response({"code": 1, "msg": str(e)}, client_id, request_id)
    elif type_ == "get_code_position_info":
        # 查询此仓
        pass
    elif type_ == "get_code_position_info":
        # 查询此仓
        pass
        code = data.get("code")
        results = huaxin_trade_record_manager.PositionManager().list_by_day(tool.get_now_date_str("%Y%m%d"), code)
        send_response({"code": 0, "data": results}, client_id, request_id)
    elif type_ == "refresh_trade_data":
        # 刷新交易数据
        ctype = data.get("ctype")
        if ctype == "money":
            huaxin_trade_data_update.add_money_list()
        elif ctype == "position_list":
            huaxin_trade_data_update.add_position_list()
        elif ctype == "deal_list":
            huaxin_trade_data_update.add_deal_list()
        elif ctype == "position_list":
            huaxin_trade_data_update.add_delegate_list("手动刷新")
        send_response({"code": 0, "data": {}}, client_id, request_id)
    elif type_ == outside_api_command_manager.API_TYPE_COMMON_REQUEST:
        # 常规接口
        ctype = data['ctype']
        if ctype == 'get_account_money':
            # 获取账户资金
            pass
            result = huaxin_trade_record_manager.MoneyManager.get_data()
            send_response({"code": 0, "data": result}, client_id, request_id)
    logger_debug.info(f"接收到命令:{request_id} - f{client_id} - {data}")
@@ -110,4 +211,4 @@
    # ===========异步日志持久化==========
    threading.Thread(target=async_log_util.run_sync, daemon=True).start()
    # 运行L2数据监听队列
    l2_client_for_cb.run(trade_call_back_queue)
    l2_client_for_cb.run(trade_call_back_queue)
records/huaxin_trade_record_manager.py
@@ -248,13 +248,13 @@
            pass
    @classmethod
    def list_by_day(cls, day):
    def list_by_day(cls, day, code=None):
        mysqldb = mysql_data.Mysqldb()
        try:
            results = mysqldb.select_all(
                f"select * from hx_trade_position r where r.tradingDay='{day}'  order by createTime")
                f"select * from hx_trade_position r where r.tradingDay='{day}' { f'and securityID={code}' if code else ''} order by createTime")
            # 转dict
            key_list = ["id", "investorID", "securityName", "securityID", "historyPos", "historyPosFrozen",
            key_list = ["id", "investorID","tradingDay", "securityName", "securityID", "historyPos", "historyPosFrozen",
                        "todayBSPos", "todayBSPosFrozen", "historyPosPrice", "totalPosCost", "prePosition",
                        "availablePosition", "currentPosition",
                        "openPosCost", "todayCommission", "todayTotalBuyAmount", "todayTotalSellAmount", "createTime",
@@ -479,4 +479,4 @@
if __name__ == "__main__":
    print(DelegateRecordManager().list_current_delegates("600239"))
    print(len(PositionManager().list_by_day("20240520","123089")))
trade/huaxin_sell_util.py
New file
@@ -0,0 +1,33 @@
# 撤卖单
import time
from log_module import async_log_util
from log_module.log import logger_trade
import concurrent.futures
from trade import huaxin_trade_api
from utils import huaxin_util, sell_util
__cancel_sell_thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=8)
def __cancel_sell_order(code, order_ref):
    try:
        result = huaxin_trade_api.cancel_order(2, code, None, orderRef=order_ref)
    except Exception as e:
        logger_trade.exception(e)
# 开始撤单
def start_sell(code, volume, price_type, limit_up_price, limit_down_price, current_price, blocking=False,
               request_id=None,order_ref=None):
    price = sell_util.get_sell_price(price_type, limit_up_price, limit_down_price, current_price)
    if not price:
        raise Exception("价格获取出错")
    async_log_util.info(logger_trade, f"API卖:  单价-{price}")
    if not order_ref:
        order_ref = huaxin_util.create_order_ref()
    result = huaxin_trade_api.order(2, code, volume, price, order_ref=order_ref,
                                    blocking=blocking, request_id=request_id)
    return result
utils/sell_util.py
New file
@@ -0,0 +1,35 @@
"""
卖票相关类
"""
# 获取卖价
from utils import tool
def get_sell_price(price_type, limit_up_price, limit_down_price, current_price):
    price = None
    if price_type == 0:
        if not current_price:
            raise Exception("没有获取到L1现价")
        price = tool.get_buy_min_price(current_price)
        if limit_down_price and price < round(float(limit_down_price), 3):
            price = round(float(limit_down_price), 3)
    elif price_type == 1:
        if not limit_down_price:
            raise Exception("没有获取到跌停价")
        price = round(float(limit_down_price), 3)
    elif price_type == 2:
        if not limit_up_price:
            raise Exception("没有获取到涨停价")
        price = round(float(limit_up_price), 3)
    elif price_type == 3:
        if not current_price:
            raise Exception("没有获取到L1现价")
        price = current_price
    elif price_type == 4:
        if not current_price:
            raise Exception("没有获取到L1现价")
        price = round(float(current_price) - 0.05, 3)
    else:
        raise Exception("价格类型错误")
    return price