Administrator
2024-05-23 09991e316ce092d0b05a198aad9d58e78e06f69b
bug修复
3个文件已添加
5个文件已修改
220 ■■■■ 已修改文件
code_attribute/code_market_manager.py 35 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
code_attribute/target_codes_manager.py 46 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
huaxin_client/l2_client_for_cb.py 40 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
main.py 48 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test/test_api.py 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/huaxin_trade_api.py 23 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
utils/middle_api_protocol.py 10 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
utils/output_util.py 14 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
code_attribute/code_market_manager.py
New file
@@ -0,0 +1,35 @@
"""
行情管理
"""
__market_info_dict = {}
class MarketInfo:
    def __init__(self, code, price, rate, buy1_price, buy1_volume, total_volume, total_bid_volume, total_ask_volume):
        self.code = code
        self.price = price
        self.rate = rate
        self.buy1_price = buy1_price
        self.buy1_volume = buy1_volume
        self.total_volume = total_volume
        self.total_bid_volume = total_bid_volume
        self.total_ask_volume = total_ask_volume
def set_market_info(data):
    """
    设置行情信息
    :param data: (代码, 最近的价格, 涨幅, 买1价, 买1量, 成交总量)
    :return:
    """
    __market_info_dict[data[0]] = MarketInfo(data[0], data[1], data[2], data[3], data[4], data[5], data[6], data[7])
def get_market_info(code) -> MarketInfo:
    """
    获取行情信息
    :param code:
    :return:
    """
    return __market_info_dict.get(code)
code_attribute/target_codes_manager.py
@@ -12,6 +12,10 @@
__valid_codes_cache = {}
# 正股到可转债代码的索引
__valid_underlying_code_map_cache = {}
# 可转债代码到正股代码的索引
__valid_cb_to_underlying_code_map_cache = {}
# 可转债昨日收盘价
__valid_pre_close_price_map_cache = {}
@@ -60,14 +64,16 @@
        __valid_codes_cache[day] = results
        __valid_underlying_code_map_cache[day] = {}
        __valid_pre_close_price_map_cache[day] = {}
        __valid_cb_to_underlying_code_map_cache[day] = {}
        for r in results:
            __valid_underlying_code_map_cache[day][r['underlying_symbol'].split('.')[1]] = r['sec_id']
            __valid_cb_to_underlying_code_map_cache[day][r['sec_id']] = r['underlying_symbol'].split('.')[1]
            __valid_pre_close_price_map_cache[day][r['sec_id']] = r['pre_close']
def get_subscript_codes():
def get_subscript_underlying_codes():
    """
    获取需要订阅的代码
    获取需要订阅的正股代码
    :return:
    """
    day = tool.get_now_date_str()
@@ -77,7 +83,19 @@
    return [x['underlying_symbol'].split('.')[1] for x in ffresults]
def get_underlying_code_map():
def get_subscript_cb_codes():
    """
    获取需要订阅的代码
    :return:
    """
    day = tool.get_now_date_str()
    if not __valid_codes_cache.get(day):
        load_valid_codes_info(tool.get_now_date_str())
    ffresults = __valid_codes_cache.get(tool.get_now_date_str())
    return [x['sec_id'] for x in ffresults]
def get_underlying_to_cb_code_map():
    """
    获取股票代码-可转债代码的map
    :param code:
@@ -90,13 +108,33 @@
    return __valid_underlying_code_map_cache.get(day)
def get_cb_to_underlying_code_map():
    """
    获取可转债代码-股票代码的map
    :param code:
    :return:
    """
    day = tool.get_now_date_str()
    if day not in __valid_cb_to_underlying_code_map_cache:
        load_valid_codes_info(day)
    return __valid_cb_to_underlying_code_map_cache.get(day)
def get_cb_code(code):
    """
    获取可转债代码
    :param code:
    :return:
    """
    map = get_underlying_code_map()
    map = get_underlying_to_cb_code_map()
    if map:
        return map.get(code)
    return None
def get_underlying_code(code):
    map = get_cb_to_underlying_code_map()
    if map:
        return map.get(code)
    return None
huaxin_client/l2_client_for_cb.py
@@ -12,7 +12,6 @@
import time
import concurrent.futures
from code_attribute import target_codes_manager
from code_attribute.history_k_data_util import JueJinApi, JueJinHttpApi
from huaxin_client import command_manager
@@ -50,8 +49,6 @@
market_code_dict = {}
class Lev2MdSpi(lev2mdapi.CTORATstpLev2MdSpi):
    special_code_volume_for_order_dict = {}
    # 已经订阅的代码
@@ -76,9 +73,10 @@
        szse_codes = []
        sse_codes = []
        for code in codes:
            if code.find("00") == 0 or code.find("30") == 0:
            market_type = tool.get_market_type(code)
            if market_type == tool.MARKET_TYPE_SZSE:
                szse_codes.append(code.encode())
            elif code.find("60") == 0 or code.find("68") == 0:
            elif  market_type == tool.MARKET_TYPE_SSE:
                sse_codes.append(code.encode())
        return sse_codes, szse_codes
@@ -279,12 +277,20 @@
            rate = round(
                (pDepthMarketData['LastPrice'] - pDepthMarketData['PreClosePrice']) / pDepthMarketData['PreClosePrice'],
                4)
            # 代码, 最近的价格, 涨幅, 买1价, 买1量, 成交总量, 委托买入总量, 委托卖出总量
            market_call_back_queue.put_nowait((pDepthMarketData['SecurityID'], pDepthMarketData['LastPrice'], rate,
                                               pDepthMarketData['BidPrice1'], pDepthMarketData['BidVolume1'],
                                               pDepthMarketData['TotalVolumeTrade'], pDepthMarketData['TotalBidVolume'], pDepthMarketData['TotalAskVolume']))
            code = pDepthMarketData['SecurityID']
            if code.find("00") == 0 or code.find("60") == 0:
                if rate >= 0.05:
                    self.__high_rate_codes.add(code)
                else:
                    self.__high_rate_codes.discard(code)
            elif code.find("11") == 0 or code.find("12") == 0:
                # 过滤可转债
                pass
            else:
                if rate >= 0.10:
                    self.__high_rate_codes.add(code)
@@ -383,29 +389,32 @@
    初始化数据
    :return:
    """
    codes = []
    market_codes = []
    # 获取目标代码
    for i in range(3):
        try:
            codes = target_codes_manager.get_subscript_codes()
            if codes:
            underlying_codes = target_codes_manager.get_subscript_underlying_codes()
            cb_codes = target_codes_manager.get_subscript_cb_codes()
            if underlying_codes and cb_codes:
                market_codes.extend(underlying_codes)
                market_codes.extend(cb_codes)
                break
        except Exception as e:
            logger_system.exception(e)
            time.sleep(5)
    logger_system.info(f'可转债正股数量:{len(codes)}')
    if codes:
    logger_system.info(f'订阅行情数量:{len(market_codes)}')
    if market_codes:
        # 获取目标代码的收盘价
        pre_price_dict = {}
        for i in range(3):
            try:
                pre_price_dict = get_pre_price(codes)
                pre_price_dict = get_pre_price(market_codes)
                if pre_price_dict:
                    break
            except Exception as e:
                logger_system.exception(e)
                time.sleep(5)
        logger_system.info(f'可转债正股昨日收盘价数量:{len(codes)}')
        logger_system.info(f'昨日收盘价数量:{len(market_codes)}')
        if pre_price_dict:
            for k in pre_price_dict:
                if k.find("00") == 0 or k.find("60") == 0:
@@ -413,7 +422,7 @@
                else:
                    limit_up_price = tool.to_price(decimal.Decimal(str(pre_price_dict[k])) * decimal.Decimal("1.2"))
                Lev2MdSpi.limit_up_price_dict[k] = round(float(limit_up_price), 2)
    return codes
    return market_codes
def start_sub_high_price():
@@ -425,7 +434,7 @@
        time.sleep(3)
def run(trade_call_back_queue_: multiprocessing.Queue) -> None:
def run(trade_call_back_queue_: multiprocessing.Queue, market_call_back_queue_: multiprocessing.Queue) -> None:
    """
    先订阅所有的L2market行情数据,筛选出比较大的涨幅(主板>5%,科创板/创业板>10%)的票,然后订阅其交成交L2数据
    :param trade_call_back_queue_: 添加的内容格式为:(代码,交易时间)
@@ -436,8 +445,9 @@
    try:
        # log.close_print()
        # 初始化
        global trade_call_back_queue
        global trade_call_back_queue, market_call_back_queue
        trade_call_back_queue = trade_call_back_queue_
        market_call_back_queue = market_call_back_queue_
        codes = __init_data()
        __init_l2(codes)
main.py
@@ -7,13 +7,14 @@
import threading
import time
from code_attribute import target_codes_manager, gpcode_manager
from code_attribute import target_codes_manager, gpcode_manager, code_market_manager
from huaxin_client import l2_client_for_cb, trade_client_for_cb
from huaxin_client.client_network import SendResponseSkManager
from log_module import async_log_util
from records import huaxin_trade_record_manager
from trade import huaxin_trade_api, huaxin_trade_data_update, huaxin_sell_util
from utils import middle_api_protocol, outside_api_command_manager, constant, tool, huaxin_util, socket_util, sell_util
from utils import middle_api_protocol, outside_api_command_manager, constant, tool, huaxin_util, socket_util, sell_util, \
    output_util
middle_api_protocol.SERVER_PORT = 10008
middle_api_protocol.SERVER_HOST = "43.138.167.68"
@@ -123,6 +124,31 @@
        # 查询此仓
        code = data.get("code")
        results = huaxin_trade_record_manager.PositionManager().list_by_day(tool.get_now_date_str("%Y%m%d"), code)
        for r in results:
            cb_code = r["securityID"]
            underlying_code = target_codes_manager.get_underlying_code(cb_code)
            cb_market = code_market_manager.get_market_info(code)
            underlying_market = code_market_manager.get_market_info(underlying_code)
            if cb_market:
                r["marketInfo"] = {"code": cb_market.code, "name": r["securityName"],
                                   "rate": f"{cb_market.rate * 100}%",
                                   "price": cb_market.price, "lastVolume": cb_market.total_bid_volume // 100,
                                   "buy1Money": output_util.money_desc(cb_market.buy1_price * cb_market.buy1_volume)}
            if underlying_market:
                if not gpcode_manager.CodesNameManager().get_code_name(underlying_market.code):
                    # 异步请求名称
                    threading.Thread(
                        target=lambda: gpcode_manager.CodesNameManager().request_code_name(underlying_market.code),
                        daemon=True).start()
                r["underlyingMarketInfo"] = {"code": underlying_market.code,
                                             "name": gpcode_manager.CodesNameManager().get_code_name(
                                                 underlying_market.code), "rate": f"{underlying_market.rate * 100}%",
                                             "price": underlying_market.price,
                                             "lastVolume": underlying_market.total_bid_volume // 100,
                                             "buy1Money": output_util.money_desc(
                                                 underlying_market.buy1_price * underlying_market.buy1_volume)}
        send_response({"code": 0, "data": results}, client_id, request_id)
    elif type_ == "refresh_trade_data":
        # 刷新交易数据
@@ -178,6 +204,20 @@
            pass
def __read_market_data(queue_market: multiprocessing.Queue):
    while True:
        try:
            result = queue_market.get()
            if result:
                # 代码, 最近的价格, 涨幅, 买1价, 买1量, 成交总量
                code_market_manager.set_market_info(result)
        except Exception as e:
            logger_debug.exception(e)
            time.sleep(1)
        finally:
            pass
if __name__ == '__main__':
    # ===========初始化数据==========
    try:
@@ -214,4 +254,6 @@
    # ===========异步日志持久化==========
    threading.Thread(target=async_log_util.run_sync, daemon=True).start()
    # 运行L2数据监听队列
    l2_client_for_cb.run(trade_call_back_queue)
    queue_market = multiprocessing.Queue()
    threading.Thread(target=__read_market_data, args=(queue_market,), daemon=True).start()
    l2_client_for_cb.run(trade_call_back_queue, queue_market)
test/test_api.py
New file
@@ -0,0 +1,4 @@
from trade import huaxin_trade_api
if __name__ == "__main__":
    print(huaxin_trade_api.get_money())
trade/huaxin_trade_api.py
@@ -22,7 +22,7 @@
from records.huaxin_trade_record_manager import TradeOrderIdManager
from trade import huaxin_trade_data_update
from trade.entity import HuaxinOrderEntity
from utils import socket_util, huaxin_util, tool
from utils import socket_util, huaxin_util, tool, middle_api_protocol
__response_thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=15)
__save_data_queue = queue.Queue()
@@ -269,6 +269,8 @@
# 超时时间2s
TIMEOUT = 2.0
# 交易代理
TRADE_DELEGATED = True
# 等待响应的request_id
__request_response_dict = {}
@@ -276,6 +278,15 @@
def __get_request_id(type):
    return f"r_{type}_{round(time.time() * 10000)}_{random.randint(0, 100000)}"
def __request_delegate(request_id, type, data):
    fdata = middle_api_protocol.load_simulation_trade(type, data)
    try:
        result = middle_api_protocol.request(fdata, port=10020)
        set_response(request_id, result)
    except Exception as e:
        pass
# 网络请求
@@ -301,10 +312,13 @@
                     }
        root_data = socket_util.encryp_client_params_sign(root_data)
        start_time = time.time()
        if is_trade:
            queue_strategy_w_trade_r.put_nowait(root_data)
        if not TRADE_DELEGATED:
            if is_trade:
                queue_strategy_w_trade_r.put_nowait(root_data)
            else:
                queue_strategy_w_trade_r_for_query.put_nowait(root_data)
        else:
            queue_strategy_w_trade_r_for_query.put_nowait(root_data)
            threading.Thread(target=__request_delegate, args=(request_id, _type, data,), daemon=True).start()
        use_time = int((time.time() - start_time) * 1000)
        if use_time > 10:
@@ -471,7 +485,6 @@
    request_id = __request(ClientSocketManager.CLIENT_TYPE_MONEY,
                           {"type": ClientSocketManager.CLIENT_TYPE_MONEY})
    return __read_response(request_id, blocking)
# 设置L2订阅数据
utils/middle_api_protocol.py
@@ -9,11 +9,11 @@
SERVER_PORT = 10008
def request(data_json):
def request(data_json, host=SERVER_HOST, port=SERVER_PORT):
    if type(data_json) == set:
        data_json = list(data_json)
    data_bytes = socket_util.load_header(json.dumps(data_json).encode('utf-8'))
    sk = socket_util.create_socket(SERVER_HOST, SERVER_PORT)
    sk = socket_util.create_socket(host, port)
    try:
        sk.sendall(data_bytes)
        result_str, header_str = socket_util.recv_data(sk)
@@ -71,3 +71,9 @@
def load_push_msg(data):
    fdata = {"type": "push_msg", "data": {"ctype": "push_msg", "data": data}}
    return fdata
# ------------------------------仿真交易------------------------------------
def load_simulation_trade(type, data):
    fdata = {"type": "simulation_trade", "data": {"ctype": type, "data": data}}
    return fdata
utils/output_util.py
New file
@@ -0,0 +1,14 @@
import time
def money_desc(money):
    if abs(money) > 100000000:
        return f"{round(money / 100000000, 2)}亿"
    else:
        return f"{round(money / 10000, 2)}万"
def time_format(timestamp):
    if timestamp:
        return time.strftime("%H:%M:%S", time.localtime(int(timestamp)))
    return ""