Administrator
2023-07-17 a9681c7b03a6fde559bf77ef65917d6d4db5d84c
华鑫适配
7个文件已修改
1个文件已添加
148 ■■■■ 已修改文件
l2/cancel_buy_strategy.py 19 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/huaxin/huaxin_delegate_postion_manager.py 47 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/l2_data_manager_new.py 11 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/huaxin/huaxin_trade_api.py 8 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/huaxin/trade_server.py 31 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/trade_huaxin.py 19 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/trade_manager.py 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/trade_result_manager.py 11 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/cancel_buy_strategy.py
@@ -674,12 +674,23 @@
            return int(val)
        return None
    @classmethod
    def clear(cls, code=None):
        if code:
            cls.__getRedis().delete(f"d_cancel_real_order_index-{code}")
        else:
            keys = cls.__getRedis().keys("d_cancel_real_order_index-*")
            if keys:
                for k in keys:
                    cls.__getRedis().delete(k)
    # 设置成交位
    @classmethod
    def set_trade_progress(cls, code, index, buy_exec_index, total_data, local_today_num_operate_map, m_value,
                           limit_up_price):
        # 离下单执行位2分钟内的有效
        if tool.trade_time_sub(total_data[-1]['val']['time'], total_data[buy_exec_index]['val']['time']) > constant.D_CANCEL_EXPIRE_TIME:
        if tool.trade_time_sub(total_data[-1]['val']['time'],
                               total_data[buy_exec_index]['val']['time']) > constant.D_CANCEL_EXPIRE_TIME:
            return False, "超过D撤守护时间"
        real_order_index = cls.__get_real_order_index(code)
@@ -709,7 +720,11 @@
    # 设置真实的下单位置
    @classmethod
    def set_real_order_index(cls, code, index):
        pass
        cls.__set_real_order_index(code, index)
    @classmethod
    def cancel_success(cls, code):
        cls.clear(code)
# ---------------------------------L撤-------------------------------
l2/huaxin/huaxin_delegate_postion_manager.py
New file
@@ -0,0 +1,47 @@
"""
华鑫委托实际位置管理
"""
import time
from log_module.log import hx_logger_trade_debug
_place_order_info_dict = {}
# 下单
def place_order(code, price, volume, exec_index):
    _place_order_info_dict[code] = (price, volume, exec_index, time.time())
# 获取下单信息
def get_order_info(code):
    info = _place_order_info_dict.get(code)
    if info and time.time() - info[3] > 3:
        # 间隔3s以上就无效了
        info = None
        _place_order_info_dict.pop(code)
    return info
# L2数据列表
def get_l2_place_order_position(code, datas):
    order_info = get_order_info(code)
    if not order_info:
        # 暂无下单信息
        return None
    price = order_info[0]
    volume = order_info[1]
    exec_index = order_info[2]
    # 获取量
    for d in datas:
        if d["val"]["num"] != volume:
            continue
        if abs(float(price) - float(d["val"]["price"])) >= 0.01:
            continue
        # 不可能比下单执行位置还早
        if d["index"] <= exec_index:
            continue
        # 获取到了下单位置
        hx_logger_trade_debug.info(f"真实下单位置:{code}-{d['index']}")
        return d["index"]
    return None
l2/l2_data_manager_new.py
@@ -4,7 +4,7 @@
from code_attribute import big_money_num_manager, code_volumn_manager, code_data_util, industry_codes_sort, \
    limit_up_time_manager, global_data_loader, gpcode_manager
import constant
from l2.huaxin import l2_huaxin_util
from l2.huaxin import l2_huaxin_util, huaxin_delegate_postion_manager
from utils import global_util, ths_industry_util, tool
import l2_data_util
from db import redis_manager
@@ -13,7 +13,7 @@
    trade_result_manager, first_code_score_manager
from l2 import safe_count_manager, l2_data_manager, l2_data_log, l2_log, l2_data_source_util, code_price_manager
from l2.cancel_buy_strategy import SecondCancelBigNumComputer, HourCancelBigNumComputer, L2LimitUpMoneyStatisticUtil, \
    L2LimitUpSellStatisticUtil
    L2LimitUpSellStatisticUtil, DCancelBigNumComputer
from l2.l2_data_manager import L2DataException, TradePointManager
from l2.l2_data_util import local_today_datas, L2DataUtil, local_today_num_operate_map, local_today_buyno_map, \
    local_latest_datas
@@ -231,10 +231,13 @@
                _start_index = local_today_datas[code][-1]["index"] + 1
            datas = l2_huaxin_util.get_format_l2_datas(code, datas,
                                                       gpcode_manager.get_limit_up_price(code), _start_index)
            # 获取下单位置
            place_order_index = huaxin_delegate_postion_manager.get_l2_place_order_position(code, datas)
            if place_order_index:
                DCancelBigNumComputer.set_real_order_index(code, place_order_index)
            __start_time = round(t.time() * 1000)
            print("格式化L2数据成功", code)
            cls.process_add_datas(code, datas, 0, __start_time)
            print("huaxin L2数据处理成功", code)
        except Exception as e:
            print("huaxin L2数据处理异常", code, str(e))
            logging.exception(e)
trade/huaxin/huaxin_trade_api.py
@@ -90,6 +90,14 @@
    def heart(cls, rid):
        cls.active_client_dict[rid] = time.time()
    @classmethod
    def del_invalid_clients(cls):
        # 清除长时间无心跳的客户端通道
        for k in cls.active_client_dict.keys():
            if time.time() - cls.active_client_dict[k] > 20:
                # 心跳时间间隔20s以上视为无效
                cls.del_client(k)
TRADE_DIRECTION_BUY = 1
TRADE_DIRECTION_SELL = 2
trade/huaxin/trade_server.py
@@ -5,17 +5,18 @@
import random
import socket
import socketserver
import threading
import time
from code_attribute import gpcode_manager
from l2 import l2_data_manager_new, l2_log, code_price_manager, l2_data_util, l2_data_manager
from l2.cancel_buy_strategy import HourCancelBigNumComputer, LCancelBigNumComputer
from l2.cancel_buy_strategy import HourCancelBigNumComputer, LCancelBigNumComputer, DCancelBigNumComputer
from l2.huaxin import huaxin_target_codes_manager
from log_module.log import hx_logger_l2_upload, hx_logger_contact_debug, hx_logger_trade_callback, \
    hx_logger_l2_orderdetail, hx_logger_l2_transaction, hx_logger_l2_market_data, logger_l2_trade_buy_queue
from trade import deal_big_money_manager
from trade.huaxin import huaxin_trade_api as trade_api, trade_api_server
from trade.huaxin import huaxin_trade_api as trade_api, trade_api_server, huaxin_trade_api
trade_data_request_queue = queue.Queue()
@@ -173,8 +174,10 @@
                                code)
                            if True:
                                if buy_progress_index is not None:
                                    logger_l2_trade_buy_queue.info("获取成交位置成功: code-{} index-{}", code, buy_progress_index)
                                    buy_time = l2_data_util.local_today_datas.get(code)[buy_progress_index]["val"]["time"]
                                    logger_l2_trade_buy_queue.info("获取成交位置成功: code-{} index-{}", code,
                                                                   buy_progress_index)
                                    buy_time = l2_data_util.local_today_datas.get(code)[buy_progress_index]["val"][
                                        "time"]
                                    HourCancelBigNumComputer.set_trade_progress(code, buy_time, buy_exec_index,
                                                                                buy_progress_index,
                                                                                l2_data_util.local_today_datas.get(
@@ -185,13 +188,18 @@
                                                                             l2_data_util.local_today_datas.get(
                                                                                 code))
                                    # 计算大单成交额
                                    deal_big_money_manager.set_trade_progress(code, buy_progress_index,
                                                                              l2_data_util.local_today_datas.get(
                                                                                  code),
                                                                              l2_data_util.local_today_num_operate_map.get(
                                                                                  code))
                                    DCancelBigNumComputer.set_trade_progress(code, buy_progress_index, buy_exec_index,
                                                                             l2_data_util.local_today_datas.get(
                                                                                 code),
                                                                             l2_data_util.local_today_num_operate_map.get(
                                                                                 code), 1000 * 10000,
                                                                             gpcode_manager.get_limit_up_price(code))
                        except Exception as e:
                            hx_logger_l2_transaction.exception(e)
@@ -232,8 +240,21 @@
        super().finish()
def clear_invalid_client():
    while True:
        try:
            huaxin_trade_api.ClientSocketManager.del_invalid_clients()
        except:
            pass
        finally:
            time.sleep(2)
def run():
    print("create TradeServer")
    t1 = threading.Thread(target=lambda: clear_invalid_client(), daemon=True)
    t1.start()
    laddr = "0.0.0.0", 10008
    tcpserver = MyThreadingTCPServer(laddr, MyBaseRequestHandle)  # 注意:参数是MyBaseRequestHandle
    tcpserver.serve_forever()
trade/trade_huaxin.py
@@ -9,6 +9,7 @@
from log_module.log import logger_juejin_trade
from trade.huaxin import huaxin_trade_api
from utils import tool, huaxin_util
from l2 import huaxin
__context_dict = {}
@@ -52,17 +53,15 @@
# 通过量下单,返回(代码,账号ID,订单号)
def order_volume(code, price, count):
    if not constant.TRADE_ENABLE:
        return
def order_volume(code, price, count, last_data_index):
    if code.find("00") != 0 and code.find("60") != 0:
        raise Exception("只支持00开头与60开头的代码下单")
    code_str = code
    if code[0:2] == '00':
        code_str = f"SZSE.{code}"
    elif code[0:2] == '60':
        code_str = f"SHSE.{code}"
    start_time = time.time()
    # 保存下单信息
    huaxin.huaxin_delegate_postion_manager.place_order(code, price, count, last_data_index)
    if not constant.TRADE_ENABLE:
        return
    result = huaxin_trade_api.order(1, code, count, price)
    print("华鑫下单耗时", time.time() - start_time)
    logger_juejin_trade.info(f"{code}:下单耗时{round(time.time() - start_time, 3)}s")
@@ -72,8 +71,8 @@
        if result['code'] == 0:
            result = result["data"]
            if result["orderStatus"] == huaxin_util.TORA_TSTP_OST_Rejected:
                logger_juejin_trade.info(f"{code}:下单失败:{result['statusMsg']}")
                raise Exception(result["statusMsg"])
                logger_juejin_trade.info(f"{code}:下单失败:{result.get('statusMsg')}")
                raise Exception(result.get('statusMsg'))
            else:
                TradeOrderIdManager.add_order_id(code, result["accountID"], result["orderSysID"])
                logger_juejin_trade.info(f"{code}:下单成功 orderSysID:{result['orderSysID']}")
trade/trade_manager.py
@@ -357,7 +357,7 @@
            if constant.TRADE_WAY == constant.TRADE_WAY_JUEJIN:
                trade_juejin.order_volume(code, price, count)
            elif constant.TRADE_WAY == constant.TRADE_WAY_HUAXIN:
                trade_huaxin.order_volume(code, price, count)
                trade_huaxin.order_volume(code, price, count,last_data_index)
        else:
            guiTrade.buy(code, price)
        __place_order_success(code, capture_timestamp, last_data, last_data_index)
trade/trade_result_manager.py
@@ -5,7 +5,7 @@
from l2 import l2_data_manager
from l2.cancel_buy_strategy import HourCancelBigNumComputer, SecondCancelBigNumComputer, L2LimitUpSellStatisticUtil, \
    LCancelBigNumComputer
    LCancelBigNumComputer, DCancelBigNumComputer
from l2.l2_data_util import local_today_datas, local_today_num_operate_map
from l2.safe_count_manager import BuyL2SafeCountManager
from log_module.log import logger_l2_error
@@ -33,7 +33,8 @@
    f5 = dask.delayed(__buyL2SafeCountManager.save_place_order_info)(code, buy_single_index, buy_exec_index,
                                                                     total_datas[-1]["index"])
    f6 = dask.delayed(SecondCancelBigNumComputer.cancel_success)(code)
    dask.compute(f1, f2, f3, f4, f5, f6)
    f7 = dask.delayed(DCancelBigNumComputer.cancel_success)(code)
    dask.compute(f1, f2, f3, f4, f5, f6, f7)
# 真实买成功
@@ -91,7 +92,8 @@
    f4 = dask.delayed(l2_data_manager.TradePointManager.delete_compute_info_for_cancel_buy)(code)
    f5 = dask.delayed(l2_data_manager.TradePointManager.delete_count_info_for_cancel_buy)(code)
    f6 = dask.delayed(SecondCancelBigNumComputer.cancel_success)(code)
    dask.compute(f1, f2, f3, f4, f5, f6)
    f7 = dask.delayed(DCancelBigNumComputer.cancel_success)(code)
    dask.compute(f1, f2, f3, f4, f5, f6, f7)
if __name__ == "__main__":
@@ -101,4 +103,5 @@
    f4 = dask.delayed(l2_data_manager.TradePointManager.delete_compute_info_for_cancel_buy)(code)
    f5 = dask.delayed(l2_data_manager.TradePointManager.delete_count_info_for_cancel_buy)(code)
    f6 = dask.delayed(SecondCancelBigNumComputer.cancel_success)(code)
    dask.compute(f2, f3, f4, f5, f6)
    f7 = dask.delayed(DCancelBigNumComputer.cancel_success)(code)
    dask.compute(f2, f3, f4, f5, f6, f7)