Administrator
2 天以前 ad3cc1a24c4b413bae3069cc6d1c2a1923540ce3
开盘啦TCP请求/委托数据持久化异步处理
1个文件已添加
5个文件已修改
211 ■■■■ 已修改文件
api/outside_api_command_callback.py 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
constant.py 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
servers/huaxin_trade_server.py 2 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
third_data/kpl_api.py 23 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
third_data/kpl_app_util_of_tcp.py 152 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/huaxin/huaxin_trade_record_manager.py 30 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
api/outside_api_command_callback.py
@@ -1556,7 +1556,7 @@
                                       request_id)
                    return
                rate = round(rate, 2)
                old_rate = LCancelRateManager().get_cancel_rate(0)[0]
                old_rate = LCancelRateManager().get_cancel_rate(code)[0]
                CancelRateHumanSettingManager().set_l_down(code, rate)
                # L后重新囊括
                # if rate < old_rate:
constant.py
@@ -134,7 +134,7 @@
L_CANCEL_MIN_WATCH_COUNT = 10
# 撤单比例
L_CANCEL_RATE = 0.39  # L后
L_CANCEL_RATE = 0.3  # L后
L_CANCEL_RATE_WITH_MUST_BUY = 0.9  # 加红L后
servers/huaxin_trade_server.py
@@ -1153,6 +1153,8 @@
    gpcode_manager.WantBuyCodesManager()
    # 加载历史K线数据
    HistoryKDataManager().load_data()
    # 队列持久化
    threading.Thread(target=lambda: DelegateRecordManager().run(), daemon=True).start()
def run(queue_strategy_r_trade_w, queue_strategy_w_trade_r, queue_strategy_w_trade_r_for_read, trade_ipc_addr):
third_data/kpl_api.py
@@ -79,23 +79,6 @@
    return json.dumps({"errcode": 0, "list": fresults, "day": day})
def getLimitUpInfo():
    list_ = []
    page_size = 20
    MAX_SIZE = 150
    for i in range(0, 10):
        result_str = daBanList(DABAN_TYPE_LIMIT_UP, page_size=page_size, index=len(list_))
        result = json.loads(result_str)
        temp_list = result["list"]
        list_ += temp_list
        if len(temp_list) < page_size:
            result['list'] = list_
            return json.dumps(result)
        elif len(list_) > MAX_SIZE:
            return json.dumps(result)
    return None
def getHistoryLimitUpInfo(day):
    fresults = []
    for i in range(0, 100):
@@ -357,8 +340,10 @@
    #     print(r)
    # result = getCodeJingXuanBlocks("000756", True)
    # for x in result:
    result = getHistoryCodesByPlateOrderByLZCS("801074", "2025-05-16", "1025")
    print(result)
    results = getLimitUpInfoNew()
    results = json.loads(results)
    for d in results['list']:
        print(len(d), d)
    # request_new_blocks_codes([("机器人", "801159")])
    # result = getCodesByPlate("801159")  # getHistoryLimitUpInfo("2024-02-19")
third_data/kpl_app_util_of_tcp.py
New file
@@ -0,0 +1,152 @@
"""
TSL-TCP版的接口
"""
import codecs
import logging
import re
import socket
import ssl
class DataParseUtil:
    @classmethod
    def parse_limit_up_data(cls, hex_str, enable_log=False):
        limit_up_data_list = []
        results = re.split(r"\sA2\s01\s\w{2}\s01\s0A\s06\s", hex_str)
        # 涨停的日期
        day = None
        for i in range(1, len(results)):
            try:
                result = results[i]
                if enable_log:
                    print(bytes.fromhex(result))
                temp_results = re.finditer(r"\s12\s\w{2}\s", result)
                for match in temp_results:
                    code = bytes.fromhex(result[:match.start()]).decode('utf-8')
                    result = result[match.end():]
                    break
                # print(bytes.fromhex(result))
                # print(result)
                temp_results = re.split(r"\sA2\s06\s[A-Z0-9]{2}\s", result)
                contents = [code]
                pattern = r'[^\u4e00-\u9fa5a-zA-Z0-9\u3001\.\-]'  # 中文/字母/数字/中文顿号
                for j in range(len(temp_results)):
                    content = temp_results[j]
                    if j == 0:
                        # 代码名称
                        code_name = re.split(r"\s1A\s06", content)[0]
                        code_name = bytes.fromhex(code_name).decode('utf-8')
                        # 替换所有非中英文数字字符为空字符串
                        code_name = re.sub(pattern, '', code_name)
                        contents.append(code_name)
                    else:
                        content_str = bytes.fromhex(content).decode('utf-8', errors='ignore')
                        content_str = re.sub(pattern, '', content_str)
                        if len(contents) >= 20:
                            # 最多20项数据
                            day = content_str
                            break
                        contents.append(content_str)
                # contents 格式: [代码,名称,未知,未知,未知,成交额,涨停时间,高度, 涨停原因, 推荐原因,封单金额, 主力净额, 实际换手百分比, 实际流通市值, 最大封单金额, 最近涨停时间, 涨停原因的代码个数, 涨停原因的编码, 未知, 未知]
                if enable_log:
                    print(i, len(contents), contents)
                limit_up_data_list.append(contents)
            except Exception as e:
                logging.exception(e)
        return limit_up_data_list, day
def __recieve_data(ssl_sock):
    """
    接收数据请求
    """
    response = ssl_sock.read(3)
    hex_str = codecs.encode(response, 'hex').decode('utf-8')
    content_length = int(hex_str[2:6], 16)
    total_content = b''
    while content_length > len(total_content):
        content = ssl_sock.recv(4096)
        total_content += content
    return total_content
def __base_request(request_body, ssl_sock=None):
    """
    """
    if not ssl_sock:
        host, port = "hwsockapp.longhuvip.com", 14000
        sock = socket.create_connection((host, port))
        sock.settimeout(2)
        context = ssl.create_default_context()
        # 不验证证书
        context.check_hostname = False
        context.verify_mode = ssl.CERT_NONE
        ssl_sock = context.wrap_socket(sock, server_hostname=host)
        try:
            # 注册使用
            register_body = "30 00 65 00 02 02 59 10 00 00 0A 24 62 36 39 32 65 35 31 63 2D 31 62 63 34 2D 33 65 38 63 2D 61 30 31 62 2D 36 32 30 61 61 36 32 34 30 65 32 38 10 01 1A 08 35 2E 32 30 2E 30 2E 36 20 01 2A 20 63 65 30 36 34 63 66 38 62 35 34 62 62 66 35 30 63 33 66 36 62 37 36 63 30 38 38 64 64 63 33 36 32 01 30 3A 01 30 40 63"
            ssl_sock.sendall(bytes.fromhex(register_body))
            __recieve_data(ssl_sock)
            __recieve_data(ssl_sock)
        except Exception as e:
            raise e
    ssl_sock.sendall(bytes.fromhex(request_body))
    response = __recieve_data(ssl_sock)
    hex_str = codecs.encode(response, 'hex').decode('utf-8').upper()
    hex_array = []
    for i in range(0, len(hex_str), 2):
        str_ = hex_str[i: i + 2]
        hex_array.append(str_)
    return " ".join(hex_array), ssl_sock
def get_limit_up_list():
    # 获取涨停列表
    pages = ["400014006F0837080003020837080110061801281E3801",
             "400016008B08370800030208370801100618012011282B3801",
             "400016008C0837080003020837080110061801202F282B3801",
             "400016008D0837080003020837080110061801202F282B3801",
             "400016008E0837080003020837080110061801202F282B3801",
             "400016008F0837080003020837080110061801202F282B3801"
             ]
    limit_up_list = []
    ssl_sock = None
    day = None
    for page in pages:
        # print("======================")
        result, ssl_sock = __base_request(page, ssl_sock)
        fresults, day = DataParseUtil.parse_limit_up_data(result)
        limit_up_list.extend(fresults)
        if len(fresults) < 30:
            break
    seen = set()
    # 去重
    unique_limit_up_list = [x for x in limit_up_list if x[0] not in seen and not seen.add(x[0])]
    unique_limit_up_list.sort(key=lambda e: int(e[6], ), reverse=True)
    # 格式化数据
    format_limit_up_data_list = []
    if unique_limit_up_list:
        for d in unique_limit_up_list:
            format_data = [0] * 21
            format_data[0], format_data[1], format_data[3], format_data[4], format_data[5] = d[0], d[1], '', int(d[6]), \
                                                                                             d[8]
            format_data[6], format_data[7], format_data[8] = int(d[10]), int(d[14]), int(d[11])
            format_data[12], format_data[13], format_data[14] = d[9], int(d[13]), round(float(d[12]), 2)
            format_data[18], format_data[19], format_data[20] = d[7], d[17], int(d[16])
            format_limit_up_data_list.append(format_data)
    fdata = {"errcode": 0, "list": format_limit_up_data_list, "day": day}
    return fdata
if __name__ == "__main__":
    result = get_limit_up_list()
    print(len(result["list"]))
trade/huaxin/huaxin_trade_record_manager.py
@@ -6,6 +6,7 @@
import copy
import datetime
import json
import queue
from db.redis_manager_delegate import RedisUtils, RedisManager
from utils import tool, huaxin_util
@@ -19,6 +20,7 @@
    __current_delegate_records_dict_cache = {}
    mysqldb = mysql_data.Mysqldb()
    __instance = None
    __queue = queue.Queue(maxsize=4096)
    def __new__(cls, *args, **kwargs):
        if not cls.__instance:
@@ -57,12 +59,7 @@
            pass
    @classmethod
    def add_one(cls, d):
        if huaxin_util.is_can_cancel(str(d["orderStatus"])):
            cls.__current_delegate_records_dict_cache[d['orderSysID']] = d
        else:
            if d['orderSysID'] in cls.__current_delegate_records_dict_cache:
                cls.__current_delegate_records_dict_cache.pop(d['orderSysID'])
    def __add_one(cls, d):
        # 查询是否有数据
        _id = f"{d['insertDate']}-{d['orderLocalID']}"
        result = cls.mysqldb.select_one(
@@ -114,6 +111,16 @@
                    f"update hx_trade_delegate_record set {','.join(where_list)} where id='{result[0]}'")
    @classmethod
    def add_one(cls, d):
        if huaxin_util.is_can_cancel(str(d["orderStatus"])):
            cls.__current_delegate_records_dict_cache[d['orderSysID']] = d
        else:
            if d['orderSysID'] in cls.__current_delegate_records_dict_cache:
                cls.__current_delegate_records_dict_cache.pop(d['orderSysID'])
        # pass
        cls.__queue.put_nowait(d)
    @classmethod
    def list_by_day(cls, day, min_update_time, orderStatus=[]):
        mysqldb = mysql_data.Mysqldb()
        try:
@@ -149,6 +156,15 @@
            return fresults, max_update_time
        finally:
            pass
    @classmethod
    def run(cls):
        while True:
            try:
                data = cls.__queue.get()
                cls.__add_one(data)
            except:
                pass
# 持仓记录
@@ -485,4 +501,4 @@
if __name__ == "__main__":
    print(DelegateRecordManager().list_current_delegates("600239"))
    pass