Administrator
2023-08-23 f273791e2337215a2a3bd7e3c46c23c69bcb1c7c
bug修复
5个文件已修改
1个文件已添加
181 ■■■■ 已修改文件
huaxin_client/l1_client.py 48 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
huaxin_client/l1_subscript_codes_manager.py 98 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
huaxin_client/l2_data_manager.py 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
outside_api_command_manager.py 6 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test/test.py 15 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/huaxin/trade_server.py 12 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
huaxin_client/l1_client.py
@@ -5,7 +5,7 @@
import threading
import time
from huaxin_client import socket_util
from huaxin_client import socket_util, l1_subscript_codes_manager
import xmdapi
from huaxin_client import tool
from huaxin_client.client_network import SendResponseSkManager
@@ -25,38 +25,6 @@
        if result_json.get("code") == 0:
            return True
    return False
def get_level1_codes():
    type_ = "get_level1_codes"
    fdata = json.dumps(
        {"type": type_, "data": {}})
    msg = fdata.encode("utf-8")
    # 发送消息
    for i in range(3):
        try:
            sk = SendResponseSkManager.create_send_response_sk()
            msg = socket_util.load_header(msg)
            sk.sendall(msg)
            result, header_str = socket_util.recv_data(sk)
            # 读取代码
            result_json = json.loads(result)
            if result_json["code"] == 0:
                codes = result_json["data"]
                codes_sh = []
                codes_sz = []
                for code in codes:
                    if code.find("00") == 0:
                        codes_sz.append(code.encode("utf-8"))
                    else:
                        codes_sh.append(code.encode("utf-8"))
                print("获取订阅目标代数量:", len(codes_sh), len(codes_sz))
                return codes_sh, codes_sz
        except ConnectionResetError:
            SendResponseSkManager.del_send_response_sk(type_)
        except BrokenPipeError:
            SendResponseSkManager.del_send_response_sk(type_)
    return None, None
class MdSpi(xmdapi.CTORATstpXMdSpi):
@@ -149,6 +117,9 @@
        #        pMarketDataField.AskVolume1, pMarketDataField.UpperLimitPrice, pMarketDataField.LowerLimitPrice))
__latest_subscript_codes = set()
def __upload_codes_info(pipe_l2, datas):
    if not tool.is_trade_time():
        return
@@ -158,6 +129,13 @@
        {"type": type_, "data": {"data": datas}})
    if pipe_l2 is not None:
        pipe_l2.send(fdata)
    # 记录新增加的代码
    codes = set([x[0] for x in datas])
    add_codes = codes - __latest_subscript_codes
    __latest_subscript_codes.clear()
    for c in codes:
        __latest_subscript_codes.add(c)
    logger_local_huaxin_l1.info(f"新增加订阅的代码:{add_codes}")
def run(pipe_l2):
@@ -166,7 +144,7 @@
    codes_sz = []
    for i in range(15):
        try:
            codes_sh, codes_sz = get_level1_codes()
            codes_sh, codes_sz = l1_subscript_codes_manager.get_codes()
            logger_local_huaxin_l1.info(f"获取上证,深证代码数量:sh-{len(codes_sh)} sz-{len(codes_sz)}")
            break
        except Exception as e:
@@ -204,6 +182,8 @@
            if len(level1_data_dict) < 1:
                continue
            # 根据涨幅排序
            # (代码,现价,涨幅,量,时间)
            list_ = [level1_data_dict[k] for k in level1_data_dict]
            flist = []
            for d in list_:
huaxin_client/l1_subscript_codes_manager.py
New file
@@ -0,0 +1,98 @@
"""
L1需要订阅的代码管理
"""
import json
import os
import constant
from huaxin_client import socket_util
from huaxin_client.client_network import SendResponseSkManager
# 请求l1订阅的目标代码
def request_l1_subscript_target_codes():
    type_ = "get_level1_codes"
    fdata = json.dumps(
        {"type": type_, "data": {}})
    msg = fdata.encode("utf-8")
    # 发送消息
    for i in range(3):
        try:
            sk = SendResponseSkManager.create_send_response_sk()
            msg = socket_util.load_header(msg)
            sk.sendall(msg)
            result, header_str = socket_util.recv_data(sk)
            # 读取代码
            result_json = json.loads(result)
            if result_json["code"] == 0:
                codes = result_json["data"]
                codes_sh = []
                codes_sz = []
                for code in codes:
                    if code.find("00") == 0:
                        codes_sz.append(code.encode("utf-8"))
                    else:
                        codes_sh.append(code.encode("utf-8"))
                print("获取订阅目标代数量:", len(codes_sh), len(codes_sz))
                return codes_sh, codes_sz
        except ConnectionResetError:
            SendResponseSkManager.del_send_response_sk(type_)
        except BrokenPipeError:
            SendResponseSkManager.del_send_response_sk(type_)
    return None, None
__DIR_PATH = f"{constant.get_path_prefix()}/codes"
__CODE_SH_PATH = f"{__DIR_PATH}/codes_sh.text"
__CODE_SZ_PATH = f"{__DIR_PATH}/codes_sz.text"
# 保存目标代码
def save_codes(codes_sh, codes_sz):
    if not os.path.exists(__DIR_PATH):
        os.mkdir(__DIR_PATH)
    with open(__CODE_SH_PATH, 'w') as f:
        for c in codes_sh:
            if type(c) == bytes:
                f.write(c.encode('utf-8'))
            else:
                f.write(c)
            f.write("\n")
    with open(__CODE_SZ_PATH, 'w') as f:
        for c in codes_sz:
            if type(c) == bytes:
                f.write(c.decode('utf-8'))
            else:
                f.write(c)
            f.write("\n")
def get_codes_from_file():
    codes_sh, codes_sz = [], []
    if os.path.exists(__CODE_SH_PATH):
        with open(__CODE_SH_PATH, 'r') as f:
            line = f.readline()
            while line:
                if line.strip():
                    codes_sh.append(line.strip().encode('utf-8'))
                line = f.readline()
    if os.path.exists(__CODE_SZ_PATH):
        with open(__CODE_SZ_PATH, 'r') as f:
            line = f.readline()
            while line:
                if line.strip():
                    codes_sz.append(line.strip().encode('utf-8'))
                line = f.readline()
    return codes_sh, codes_sz
def get_codes():
    codes_sh, codes_sz = get_codes_from_file()
    if not codes_sh or not codes_sz:
        return request_l1_subscript_target_codes()
    return codes_sh, codes_sz
if __name__ == '__main__':
    pass
huaxin_client/l2_data_manager.py
@@ -60,7 +60,7 @@
    tmep_order_detail_queue_dict[code].put(
        (data['SecurityID'], data['Price'], data['Volume'], data['Side'], data['OrderType'], data['OrderTime'],
         data['MainSeq'], data['SubSeq'], data['OrderNO'], data['OrderStatus']), int(time.time()*1000))
         data['MainSeq'], data['SubSeq'], data['OrderNO'], data['OrderStatus'], int(time.time()*1000)))
# 添加逐笔成交
outside_api_command_manager.py
@@ -51,6 +51,7 @@
API_TYPE_CODE_ATRRIBUTE = "code_attribute"  # 代码属性
API_TYPE_CODE_TRADE_STATE = "code_trade_state"  # 代码交易状态
API_TYPE_GET_ENV = "get_env"  # 获取环境信息
API_TYPE_SYNC_L1_TARGET_CODES = "sync_l1_subscript_codes"  # 同步L1需要订阅的代码
class ActionCallback(object):
@@ -86,6 +87,9 @@
        pass
    def OnGetEnvInfo(self, client_id, request_id, data):
        pass
    def OnSyncL2SubscriptCodes(self, client_id, request_id):
        pass
@@ -181,6 +185,8 @@
                            cls.action_callback.OnGetCodeTradeState(client_id, request_id, data)
                        elif content_type == API_TYPE_GET_ENV:
                            cls.action_callback.OnGetEnvInfo(client_id, request_id, data)
                        elif content_type == API_TYPE_GET_ENV:
                            cls.action_callback.OnSyncL2SubscriptCodes(client_id, request_id)
                    except Exception as e:
                        logging.exception(e)
                        pass
test/test.py
@@ -1,9 +1,8 @@
import logging
import queue
import threading
import time
from log_module.log import logger_l2_process_time, logger_debug
from huaxin_client import l1_subscript_codes_manager
from log_module.log import logger_debug
_dict = {}
@@ -18,10 +17,6 @@
if __name__ == "__main__":
    for k in range(1):
        _dict.clear()
        for i in range(0, 1000):
            threading.Thread(target=lambda: add(i), daemon=True).start()
        time.sleep(2)
    input()
    # l1_subscript_codes_manager.save_codes(["600100", "600102"], ["000123", "000146"])
    code_sh,codes_sz =l1_subscript_codes_manager.request_l1_subscript_target_codes()
    l1_subscript_codes_manager.save_codes(code_sh,codes_sz)
trade/huaxin/trade_server.py
@@ -21,6 +21,7 @@
from code_attribute import gpcode_manager
from db import mysql_data_delegate as mysql_data, redis_manager_delegate as redis_manager
from db.redis_manager_delegate import RedisUtils
from huaxin_client import l1_subscript_codes_manager
from huaxin_client.client_network import SendResponseSkManager
from l2 import l2_data_manager_new, l2_log, code_price_manager, l2_data_util, l2_data_manager, transaction_progress
from l2.cancel_buy_strategy import HourCancelBigNumComputer, LCancelBigNumComputer, DCancelBigNumComputer
@@ -217,8 +218,7 @@
                                            code)
                                        logger_l2_trade_buy_queue.info("获取成交位置成功: code-{} index-{}", code,
                                                                       buy_progress_index)
                                        buy_time = total_datas[buy_progress_index]["val"][
                                            "time"]
                                        buy_time = total_datas[buy_progress_index]["val"]["time"]
                                        limit_up_price = gpcode_manager.get_limit_up_price(code)
                                        if buy_exec_index:
                                            need_cancel, msg = DCancelBigNumComputer().set_trade_progress(code,
@@ -680,6 +680,14 @@
        except Exception as e:
            self.send_response({"code": 1, "msg": str(e)}, client_id, request_id)
    # 同步L2订阅代码
    def OnSyncL2SubscriptCodes(self, client_id, request_id):
        codes_sh, codes_sz = l1_subscript_codes_manager.request_l1_subscript_target_codes()
        if codes_sh and codes_sz:
            l1_subscript_codes_manager.save_codes(codes_sh, codes_sz)
        result = {"code": 0, "data": {"codes_sh": len(codes_sh), "codes_sz": len(codes_sz)}}
        self.send_response(result, client_id, request_id)
def run(pipe_trade, pipe_l1):
    # 执行一些初始化数据