Administrator
2023-07-31 b9dcb0b8ef3501395d4a2624c2adcf09d725e09c
增加些实时参数的缓存
2个文件已修改
1个文件已添加
414 ■■■■ 已修改文件
code_attribute/first_target_code_data_processor.py 205 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/huaxin/huaxin_target_codes_manager.py 18 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
server.py 191 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
code_attribute/first_target_code_data_processor.py
New file
@@ -0,0 +1,205 @@
"""
首板目标代码处理器
"""
# 处理首板代码信息
import logging
import constant
import inited_data
from code_attribute import gpcode_manager, gpcode_first_screen_manager, global_data_loader, code_nature_analyse, \
    code_volumn_manager
from code_attribute.code_data_util import ZYLTGBUtil
from log_module.log import logger_first_code_record
from third_data import kpl_api, block_info
from third_data.code_plate_key_manager import CodesHisReasonAndBlocksManager
from third_data.history_k_data_util import HistoryKDatasUtils
from third_data.kpl_data_manager import KPLLimitUpDataRecordManager
from ths import l2_code_operate
from trade import trade_data_manager
from utils import global_util, tool
__CodesPlateKeysManager = CodesHisReasonAndBlocksManager()
def process_first_codes_datas(dataList):
    print("首板代码数量:", len(dataList))
    limit_up_price_dict = {}
    temp_codes = []
    codes = []
    tick_datas = []
    if dataList:
        for data in dataList:
            code = data["code"]
            codes.append(code)
    # ---查询想买单,如果没有在列表中就需要强行加入列表
    want_codes = gpcode_manager.WantBuyCodesManager.list_code()
    if want_codes:
        # 没有在现价采集中的想买代码
        diff_codes = set(want_codes) - set(codes)
        if diff_codes:
            # 想买单的代码还没有在目标代码中
            zyltgb_list = []
            for code in diff_codes:
                # 获取涨停价
                _limit_up_price = gpcode_manager.get_limit_up_price(code)
                if not _limit_up_price:
                    inited_data.re_set_price_pres([code], True)
                    # 再次获取涨停价
                    _limit_up_price = gpcode_manager.get_limit_up_price(code)
                if _limit_up_price:
                    # 成功获取到了涨停价,构造虚拟的现价信息
                    codes.append(code)
                    dataList.append({"code": code, "price": f"{_limit_up_price}", "volume": "0",
                                     "volumeUnit": 0, "time": "00:00:00", "zyltgb": "100",
                                     "zyltgbUnit": 0})
            # 强制更新自由流通股本
            if zyltgb_list:
                ZYLTGBUtil.save_list(zyltgb_list)
                # 将保存的数据更新到内存中
                for z in zyltgb_list:
                    val = ZYLTGBUtil.get(z["code"])
                    if val:
                        global_util.zyltgb_map[z["code"]] = val
    # ---保存未筛选的首板代码
    new_add_codes = gpcode_first_screen_manager.set_target_no_screen_codes(codes)
    # 保存自由流通股本
    if dataList:
        zyltgb_list = []
        for data in dataList:
            code = data["code"]
            if code in global_util.zyltgb_map:
                continue
            zyltgb_list.append(
                {"code": code, "zyltgb": data["zyltgb"], "zyltgb_unit": data["zyltgbUnit"]})
        if zyltgb_list:
            ZYLTGBUtil.save_list(zyltgb_list)
            global_data_loader.load_zyltgb()
    # 获取昨日收盘价
    for code in codes:
        # 如果涨停价是空值就需要设置昨日收盘价格
        if gpcode_manager.get_limit_up_price(code) is None:
            inited_data.re_set_price_pres([code], True)
    # 板块关键字准备
    for code in codes:
        if __CodesPlateKeysManager.get_history_limit_up_reason(code) is None:
            # 从数据库加载历史涨停原因
            __CodesPlateKeysManager.set_history_limit_up_reason(code,
                                                                KPLLimitUpDataRecordManager.get_latest_blocks_set(
                                                                    code))
        if __CodesPlateKeysManager.get_blocks(code) is None:
            try:
                results = kpl_api.getStockIDPlate(code)
                bs = [r[1] for r in results]
                __CodesPlateKeysManager.set_blocks(code, bs)
            except Exception as e:
                logging.exception(e)
                pass
    # 获取60天最大记录
    for code in codes:
        need_get_volumn = False
        if code not in global_util.max60_volumn or global_util.max60_volumn.get(code) is None:
            need_get_volumn = True
        if not need_get_volumn and code_nature_analyse.CodeNatureRecordManager.get_nature(
                code) is None:
            need_get_volumn = True
        if need_get_volumn:
            volumes_data = inited_data.get_volumns_by_code(code, 150)
            volumes = inited_data.parse_max_volume(volumes_data[:90],
                                                   code_nature_analyse.is_new_top(
                                                       gpcode_manager.get_limit_up_price(code),
                                                       volumes_data[:90]))
            logger_first_code_record.info("{} 获取到首板60天最大量:{}", code, volumes)
            code_volumn_manager.set_histry_volumn(code, volumes[0], volumes[1], volumes[2])
            # 判断K线形态
            # is_has_k_format, msg = code_nature_analyse.is_has_k_format(
            #     gpcode_manager.get_limit_up_price(code), volumes_data)
            # if not is_has_k_format:
            #     logger_first_code_record.info("{}首板K线形态不好,{}", code, msg)
            #     # 股性不好,就不要加入
            #     bad_codes.add(code)
            #     # 加入禁止交易代码
            #     l2_trade_util.forbidden_trade(code)
            code_nature_analyse.set_record_datas(code,
                                                 gpcode_manager.get_limit_up_price(code),
                                                 volumes_data)
    gpcode_manager.FirstCodeManager.add_record(codes)
    # 初始化板块信息
    for code in codes:
        block_info.init_code(code)
    if new_add_codes:
        gpcode_manager.set_first_gp_codes_with_data(HistoryKDatasUtils.get_gp_latest_info(codes,
                                                                                          fields="symbol,sec_name,sec_type,sec_level"))
        # 加入首板历史记录
        logger_first_code_record.info("新增首板:{}", new_add_codes)
        # 移除代码
        if constant.L2_SOURCE_TYPE == constant.L2_SOURCE_TYPE_THS:
            listen_codes = gpcode_manager.get_listen_codes()
            for lc in listen_codes:
                if not gpcode_manager.is_in_gp_pool(lc):
                    # 移除代码
                    l2_code_operate.L2CodeOperate.get_instance().add_operate(0, lc, "代码被移除")
    # 保存现价
    if dataList:
        for data in dataList:
            code = data["code"]
            codes.append(code)
            limit_up_price = gpcode_manager.get_limit_up_price(code)
            if limit_up_price is not None:
                limit_up_price_dict[code] = limit_up_price
            else:
                temp_codes.append(code)
            tick_datas.append({"code": code, "price": data["price"], "volume": data["volume"],
                               "volumeUnit": data["volumeUnit"]})
    # 获取涨停价
    if temp_codes:
        # 获取涨停价
        inited_data.re_set_price_pres(temp_codes)
        # 重新获取涨停价
        for code in temp_codes:
            limit_up_price = gpcode_manager.get_limit_up_price(code)
            if limit_up_price is not None:
                limit_up_price_dict[code] = limit_up_price
    # 首板数据加工
    prices = []
    for data in dataList:
        code = data["code"]
        price = data["price"]
        limit_up_time = data["time"]
        if limit_up_time == "00:00:00":
            limit_up_time = None
        if code not in limit_up_price_dict:
            continue
        is_limit_up = abs(float(limit_up_price_dict[code]) - float(price)) < 0.01
        # 纠正数据
        if is_limit_up and limit_up_time is None:
            limit_up_time = tool.get_now_time_str()
        if is_limit_up:
            # 加入首板涨停
            gpcode_manager.FirstCodeManager.add_limited_up_record([code])
        pricePre = gpcode_manager.get_price_pre(code)
        if pricePre is None:
            inited_data.re_set_price_pres([code])
        rate = round((float(price) - pricePre) * 100 / pricePre, 1)
        prices.append(
            {"code": code, "time": limit_up_time, "rate": rate,
             "limit_up": is_limit_up})
        if code in new_add_codes:
            if is_limit_up:
                place_order_count = trade_data_manager.placeordercountmanager.get_place_order_count(
                    code)
                if place_order_count == 0:
                    trade_data_manager.placeordercountmanager.place_order(code)
    gpcode_first_screen_manager.process_ticks(prices)
    return tick_datas
l2/huaxin/huaxin_target_codes_manager.py
@@ -8,6 +8,7 @@
from code_attribute.code_data_util import ZYLTGBUtil
from db import redis_manager
from third_data import kpl_data_manager, kpl_api
from trade import current_price_process_manager
from utils import tool, global_util, socket_util
redisManager = redis_manager.RedisManager(0)
@@ -89,23 +90,8 @@
            fitem = {"code": code, "price": d[1], "volume": d[3] // (10000*100), "volumeUnit": 1, "time": "00:00:00",
                     "zyltgb": zyltgb // 10000, "zyltgbUnit": 1}
            flist.append(fitem)
        fdata = {"type": 22, "data": flist}
        print("首板代码数量", len(flist))
        addr, port = "127.0.0.1", 9001
        sk = socket.socket()  # 生成socket,连接server
        sk.connect((addr, port))
        try:
            sk.sendall(socket_util.load_header(json.dumps(fdata).encode('utf-8')))
        finally:
            sk.close()
        sk = socket.socket()  # 生成socket,连接server
        sk.connect((addr, port))
        try:
            sk.sendall(socket_util.load_header(json.dumps({"type": 40, "data": []}).encode('utf-8')))
        finally:
            sk.close()
        current_price_process_manager.accept_prices(flist)
if __name__ == "__main__":
server.py
@@ -12,7 +12,7 @@
from utils import alert_util, data_process, global_util, ths_industry_util, tool, import_util, socket_util
from code_attribute import code_volumn_manager, code_nature_analyse, global_data_loader, gpcode_manager, \
    gpcode_first_screen_manager
    gpcode_first_screen_manager, first_target_code_data_processor
import constant
from user import authority
import inited_data
@@ -275,197 +275,10 @@
                            raise Exception('未到接受时间')
                        # 首板代码
                        dataList, is_add = data_process.parseGPCode(_str)
                        print("首板代码数量:", len(dataList))
                        limit_up_price_dict = {}
                        temp_codes = []
                        codes = []
                        tick_datas = []
                        if dataList:
                            for data in dataList:
                                code = data["code"]
                                codes.append(code)
                        # ---查询想买单,如果没有在列表中就需要强行加入列表
                        want_codes = gpcode_manager.WantBuyCodesManager.list_code()
                        if want_codes:
                            # 没有在现价采集中的想买代码
                            diff_codes = set(want_codes) - set(codes)
                            if diff_codes:
                                zyltgb_list = []
                                for code in diff_codes:
                                    # 查询是否在L2现价中
                                    if code in self.__l2_current_price_data:
                                        item = self.__l2_current_price_data.get(code)
                                        codes.append(code)
                                        dataList.append(item)
                                        # 保存自由流通股本
                                        zyltgb_list.append(
                                            {"code": code, "zyltgb": item["zyltgb"], "zyltgb_unit": item["zyltgbUnit"]})
                                    else:
                                        # 获取涨停价
                                        _limit_up_price = gpcode_manager.get_limit_up_price(code)
                                        if not _limit_up_price:
                                            inited_data.re_set_price_pres([code], True)
                                            # 再次获取涨停价
                                            _limit_up_price = gpcode_manager.get_limit_up_price(code)
                                        if _limit_up_price:
                                            # 成功获取到了涨停价,构造虚拟的现价信息
                                            codes.append(code)
                                            dataList.append({"code": code, "price": f"{_limit_up_price}", "volume": "0",
                                                             "volumeUnit": 0, "time": "00:00:00", "zyltgb": "100",
                                                             "zyltgbUnit": 0})
                                # 强制更新自由流通股本
                                if zyltgb_list:
                                    ZYLTGBUtil.save_list(zyltgb_list)
                                    # 将保存的数据更新到内存中
                                    for z in zyltgb_list:
                                        val = ZYLTGBUtil.get(z["code"])
                                        if val:
                                            global_util.zyltgb_map[z["code"]] = val
                        # ---保存未筛选的首板代码
                        new_add_codes = gpcode_first_screen_manager.set_target_no_screen_codes(codes)
                        # 保存自由流通股本
                        if dataList:
                            zyltgb_list = []
                            for data in dataList:
                                code = data["code"]
                                if code in global_util.zyltgb_map:
                                    continue
                                zyltgb_list.append(
                                    {"code": code, "zyltgb": data["zyltgb"], "zyltgb_unit": data["zyltgbUnit"]})
                            if zyltgb_list:
                                ZYLTGBUtil.save_list(zyltgb_list)
                                global_data_loader.load_zyltgb()
                        bad_codes = set()
                        # 获取昨日收盘价
                        for code in codes:
                            # 如果涨停价是空值就需要设置昨日收盘价格
                            if gpcode_manager.get_limit_up_price(code) is None:
                                inited_data.re_set_price_pres([code], True)
                        # 板块关键字准备
                        for code in codes:
                            if self.__CodesPlateKeysManager.get_history_limit_up_reason(code) is None:
                                self.__CodesPlateKeysManager.set_history_limit_up_reason(code,
                                                                                         KPLLimitUpDataRecordManager.get_latest_blocks_set(
                                                                                             code))
                            if self.__CodesPlateKeysManager.get_blocks(code) is None:
                                try:
                                    results = kpl_api.getStockIDPlate(code)
                                    bs = [r[1] for r in results]
                                    self.__CodesPlateKeysManager.set_blocks(code, bs)
                                except Exception as e:
                                    logging.exception(e)
                                    pass
                        # 获取60天最大记录
                        for code in codes:
                            need_get_volumn = False
                            if code not in global_util.max60_volumn or global_util.max60_volumn.get(code) is None:
                                need_get_volumn = True
                            if not need_get_volumn and code_nature_analyse.CodeNatureRecordManager.get_nature(
                                    code) is None:
                                need_get_volumn = True
                            if need_get_volumn:
                                volumes_data = inited_data.get_volumns_by_code(code, 150)
                                volumes = inited_data.parse_max_volume(volumes_data[:90],
                                                                       code_nature_analyse.is_new_top(
                                                                           gpcode_manager.get_limit_up_price(code),
                                                                           volumes_data[:90]))
                                logger_first_code_record.info("{} 获取到首板60天最大量:{}", code, volumes)
                                code_volumn_manager.set_histry_volumn(code, volumes[0], volumes[1], volumes[2])
                                # 判断K线形态
                                is_has_k_format, msg = code_nature_analyse.is_has_k_format(
                                    gpcode_manager.get_limit_up_price(code), volumes_data)
                                if not is_has_k_format:
                                    logger_first_code_record.info("{}首板K线形态不好,{}", code, msg)
                                    # 股性不好,就不要加入
                                    bad_codes.add(code)
                                    # 加入禁止交易代码
                                    l2_trade_util.forbidden_trade(code)
                                code_nature_analyse.set_record_datas(code,
                                                                     gpcode_manager.get_limit_up_price(code),
                                                                     volumes_data)
                        gpcode_manager.FirstCodeManager.add_record(codes)
                        # 初始化板块信息
                        for code in codes:
                            block_info.init_code(code)
                        if new_add_codes:
                            gpcode_manager.set_first_gp_codes_with_data(HistoryKDatasUtils.get_gp_latest_info(codes,
                                                                                                              fields="symbol,sec_name,sec_type,sec_level"))
                            # 加入首板历史记录
                            logger_first_code_record.info("新增首板:{}", new_add_codes)
                            # 移除代码
                            listen_codes = gpcode_manager.get_listen_codes()
                            for lc in listen_codes:
                                if not gpcode_manager.is_in_gp_pool(lc):
                                    # 移除代码
                                    l2_code_operate.L2CodeOperate.get_instance().add_operate(0, lc, "代码被移除")
                        # 保存现价
                        if dataList:
                            for data in dataList:
                                code = data["code"]
                                codes.append(code)
                                limit_up_price = gpcode_manager.get_limit_up_price(code)
                                if limit_up_price is not None:
                                    limit_up_price_dict[code] = limit_up_price
                                else:
                                    temp_codes.append(code)
                                tick_datas.append({"code": code, "price": data["price"], "volume": data["volume"],
                                                   "volumeUnit": data["volumeUnit"]})
                        # 获取涨停价
                        if temp_codes:
                            # 获取涨停价
                            inited_data.re_set_price_pres(temp_codes)
                            # 重新获取涨停价
                            for code in temp_codes:
                                limit_up_price = gpcode_manager.get_limit_up_price(code)
                                if limit_up_price is not None:
                                    limit_up_price_dict[code] = limit_up_price
                        tick_datas =  first_target_code_data_processor.process_first_codes_datas(dataList)
                        # 保存现价
                        self.first_tick_datas.clear()
                        self.first_tick_datas.extend(tick_datas)
                        # 首板数据加工
                        prices = []
                        for data in dataList:
                            code = data["code"]
                            price = data["price"]
                            limit_up_time = data["time"]
                            if limit_up_time == "00:00:00":
                                limit_up_time = None
                            if code not in limit_up_price_dict:
                                continue
                            is_limit_up = abs(float(limit_up_price_dict[code]) - float(price)) < 0.01
                            # 纠正数据
                            if is_limit_up and limit_up_time is None:
                                limit_up_time = tool.get_now_time_str()
                            if is_limit_up:
                                # 加入首板涨停
                                gpcode_manager.FirstCodeManager.add_limited_up_record([code])
                            pricePre = gpcode_manager.get_price_pre(code)
                            if pricePre is None:
                                inited_data.re_set_price_pres([code])
                            rate = round((float(price) - pricePre) * 100 / pricePre, 1)
                            prices.append(
                                {"code": code, "time": limit_up_time, "rate": rate,
                                 "limit_up": is_limit_up})
                            if code in new_add_codes:
                                if is_limit_up:
                                    place_order_count = trade_data_manager.placeordercountmanager.get_place_order_count(
                                        code)
                                    if place_order_count == 0:
                                        trade_data_manager.placeordercountmanager.place_order(code)
                        gpcode_first_screen_manager.process_ticks(prices)
                    except Exception as e:
                        logging.exception(e)
                    finally: