Administrator
2023-12-13 11a349588bd3a277ef87ff186cb338093c7287f3
09:25之前订阅持仓代码的L2数据/修改代码板块的获取接口
8个文件已修改
110 ■■■■ 已修改文件
huaxin_client/l2_data_manager.py 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/huaxin/l2_huaxin_util.py 12 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
output/code_info_output.py 11 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test/l2_trade_test.py 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
third_data/code_plate_key_manager.py 13 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
third_data/kpl_api.py 47 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/huaxin/huaxin_trade_api_server.py 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/huaxin/huaxin_trade_data_update.py 22 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
huaxin_client/l2_data_manager.py
@@ -68,7 +68,6 @@
                return item
            return None
        return item
        # 过滤订单
    def __filter_transaction(self, item):
l2/huaxin/l2_huaxin_util.py
@@ -53,7 +53,8 @@
# 处理l2数据
def __format_l2_data(origin_datas, code, limit_up_price):
# filter_not_limit_up : 过滤掉非涨停数据
def __format_l2_data(origin_datas, code, limit_up_price, filter_not_limit_up=True):
    datas = []
    dataIndexs = {}
    same_time_num = {}
@@ -78,7 +79,8 @@
            item["limitPrice"] = "{}".format(limitPrice)
        operateType = item["operateType"]
        # 不需要非涨停买与买撤
        if int(item["limitPrice"]) != 1 and (int(operateType) == 0 or int(operateType) == 1) and num != 1:
        if filter_not_limit_up and int(item["limitPrice"]) != 1 and (
                int(operateType) == 0 or int(operateType) == 1) and num != 1:
            continue
        key = "{}-{}-{}".format(code, item["mainSeq"], item["subSeq"])
        if key in dataIndexs:
@@ -94,7 +96,11 @@
def get_format_l2_datas(code, origin_datas, limit_up_price, start_index):
    # 先转变数据格式
    datas = [__convert_order(x, float(limit_up_price)) for x in origin_datas]
    fdatas = __format_l2_data(datas, code, float(limit_up_price))
    # 在9:25之前不过滤非涨停金额
    filter_not_limit_up = True
    if int(datas[0]["time"][:5].replace(":", "")) <= 925:
        filter_not_limit_up = False
    fdatas = __format_l2_data(datas, code, float(limit_up_price), filter_not_limit_up=filter_not_limit_up)
    for i in range(0, len(fdatas)):
        fdatas[i]["index"] = start_index + i
    return fdatas
output/code_info_output.py
@@ -332,9 +332,14 @@
                type = record[1]
                data = record[2]
                if type == trade_record_log_util.TYPE_PLACE_ORDER:
                    records_new_data.append((time_, "开盘啦推荐原因",
                                             f"{'、'.join([k[1] for k in data['kpl_blocks']])}",
                                             None))
                    if data['kpl_blocks'] and (type(data['kpl_blocks'][0]) == list or type(data['kpl_blocks'][0]) == tuple):
                        records_new_data.append((time_, "开盘啦推荐原因",
                                                 f"{'、'.join([k[1] for k in data['kpl_blocks']])}",
                                                 None))
                    else:
                        records_new_data.append((time_, "开盘啦推荐原因",
                                                 f"{'、'.join(data['kpl_blocks'])}",
                                                 None))
                    if "kpl_match_blocks" in data:
                        if data["kpl_match_blocks"]:
                            records_new_data.append((time_, "匹配原因",
test/l2_trade_test.py
@@ -222,7 +222,7 @@
    # @unittest.skip("跳过此单元测试")
    def test_block(self):
        code = "603918"
        code = "600713"
        KPLCodeJXBlockManager().load_jx_blocks(code, 23.52,23.62,
                                               kpl_data_manager.KPLLimitUpDataRecordManager.get_current_reasons())
third_data/code_plate_key_manager.py
@@ -43,11 +43,8 @@
        final_blocks = copy.deepcopy(blocks)
        if len(blocks) > 2:
            final_blocks.clear()
            # 根据涨幅排序
            blocks.sort(key=lambda x: x[2])
            blocks.reverse()
            for b in blocks:
                if b[1] not in constant.KPL_INVALID_BLOCKS:
                if b not in constant.KPL_INVALID_BLOCKS:
                    final_blocks.append(b)
            if len(final_blocks) < 2:
                final_blocks = blocks
@@ -100,7 +97,7 @@
                if price_rate > 0.07:
                    jx_blocks_info = self.get_jx_blocks_cache(code)
                    if not jx_blocks_info:
                        blocks = kpl_api.getCodeJingXuanBlocks(code)
                        blocks = kpl_api.getCodeBlocks(code)
                        self.save_jx_blocks(code, blocks, current_limit_up_blocks)
                        async_log_util.info(logger_kpl_block_can_buy, f"{code}:获取到精选板块-{blocks}")
                    else:
@@ -118,14 +115,14 @@
                            if time.time() - jx_blocks_info[1] > UPDATE_TIME_SPACE:
                                # 距离上次更新时间过去了5分钟
                                blocks = kpl_api.getCodeJingXuanBlocks(code)
                                blocks = kpl_api.getCodeBlocks(code)
                                self.save_jx_blocks(code, blocks, current_limit_up_blocks)
                                async_log_util.info(logger_kpl_block_can_buy, f"{code}:获取到精选板块(更新)-{blocks}")
                elif price_rate > 0.03:
                    # 添加备用板块
                    if not self.get_jx_blocks_cache(code, by=True):
                        blocks = kpl_api.getCodeJingXuanBlocks(code)
                        blocks = kpl_api.getCodeBlocks(code)
                        self.save_jx_blocks(code, blocks, current_limit_up_blocks, by=True)
                        async_log_util.info(logger_kpl_block_can_buy, f"{code}:获取到精选板块(备用)-{blocks}")
        except Exception as e:
@@ -450,7 +447,7 @@
            jingxuan_block_info = self.__KPLCodeJXBlockManager.get_jx_blocks_cache(code, by=True)
        if jingxuan_block_info:
            jingxuan_blocks = jingxuan_block_info[0]
            k4 |= set([x[1] for x in jingxuan_blocks])
            k4 |= jingxuan_blocks  #set([x[1] for x in jingxuan_blocks])
        for k in [k1, k11, k2, k3, k4]:
            keys |= k
third_data/kpl_api.py
@@ -135,8 +135,53 @@
    return None
# 获取F10中的精选板块
def __getConceptJXBK(code):
    data = f"a=GetConceptJXBKw23&apiv=w32&c=StockF10Basic&StockID={code}&PhoneOSNew=1&DeviceID=a38adabd-99ef-3116-8bb9-6d893c846e23&VerSion=5.8.0.2&"
    result = __base_request("https://apparticle.longhuvip.com/w1/api/index.php", data=data, timeout=3)
    result = json.loads(result)
    if result:
        if "List" in result:
            names = [x["CName"] for x in result["List"]]
            return names
    return []
# 获取F10常规板块
def __getConceptBK(code):
    data = f"a=GetConceptw23&apiv=w32&c=StockF10Basic&StockID={code}&PhoneOSNew=1&DeviceID=a38adabd-99ef-3116-8bb9-6d893c846e23&VerSion=5.8.0.2&"
    result = __base_request("https://apparticle.longhuvip.com/w1/api/index.php", data=data, timeout=3)
    result = json.loads(result)
    if result:
        if "List" in result:
            names = [x["CName"] for x in result["List"]]
            return names
    return []
# 获取代码的板块
def getCodeBlocks(code):
    blocks = []
    try:
        _bks = __getConceptJXBK(code)
        if _bks:
            blocks.extend(_bks)
    except:
        pass
    try:
        _bks = __getConceptBK(code)
        if _bks:
            blocks.extend(_bks)
    except:
        pass
    return list(set(blocks))
if __name__ == "__main__":
    blocks = getCodeJingXuanBlocks("002827")
    blocks = getCodeBlocks("600713")
    blocks1 = getCodeJingXuanBlocks("600713")
    if len(blocks) > 2:
        # 根据涨幅排序
        blocks.sort(key=lambda x: x[2])
trade/huaxin/huaxin_trade_api_server.py
@@ -538,7 +538,7 @@
    logger_system.info("create TradeApiServer")
    logger_system.info(f"trade_api_server 线程ID:{tool.get_thread_id()}")
    # 拉取交易信息
    huaxin_trade_data_update.run(queue_l1_r_strategy_w)
    huaxin_trade_data_update.run(queue_l1_r_strategy_w, queue_other_w_l2_r)
    #
    t1 = threading.Thread(target=lambda: __set_target_codes(queue_other_w_l2_r), daemon=True)
    t1.start()
trade/huaxin/huaxin_trade_data_update.py
@@ -1,6 +1,7 @@
"""
华鑫交易数据更新
"""
import json
import logging
import queue
import threading
@@ -12,6 +13,7 @@
from log_module.log import hx_logger_trade_debug, logger_system
from trade import trade_manager
from trade.huaxin import huaxin_trade_api, huaxin_trade_record_manager
from trade.huaxin.huaxin_trade_api import ClientSocketManager
from trade.huaxin.huaxin_trade_order_processor import HuaxinOrderEntity, TradeResultProcessor
from utils import huaxin_util, tool, init_data_util
import concurrent.futures
@@ -116,7 +118,20 @@
                                    position_codes.add(d["securityID"])
                            queue_l1_r_strategy_w.put_nowait(
                                {"type": "set_position_codes", "data": list(position_codes)})
                            # 9点25之前需要订阅持仓票
                            if position_codes and tool.trade_time_sub(tool.get_now_time_str(), "09:25:00") < 0:
                                try:
                                    # 如果有持仓票
                                    l2_subscript_datas = []
                                    for code in position_codes:
                                        l2_subscript_datas.append(
                                            (code, float(gpcode_manager.get_limit_up_price(code)), 10, 0, time.time()))
                                    # 在9:25之前订阅持仓票的L2
                                    root_data = {"type": ClientSocketManager.CLIENT_TYPE_CMD_L2,
                                                 "data": l2_subscript_datas}
                                    queue_other_w_l2_r.put_nowait(json.dumps(root_data))
                                except Exception as e:
                                    raise e
                            __process_thread_pool.submit(huaxin_trade_record_manager.PositionManager.add, datas)
                    async_log_util.info(hx_logger_trade_debug, f"获取交易数据成功:{type_}")
                except Exception as e1:
@@ -153,8 +168,9 @@
# 运行
def run(queue_l1_r_strategy_w_):
    global queue_l1_r_strategy_w
def run(queue_l1_r_strategy_w_, queue_other_w_l2_r_):
    global queue_l1_r_strategy_w, queue_other_w_l2_r
    queue_l1_r_strategy_w = queue_l1_r_strategy_w_
    queue_other_w_l2_r = queue_other_w_l2_r_
    t1 = threading.Thread(target=lambda: __read_update_task_queue(), daemon=True)
    t1.start()