Administrator
2024-06-07 1b49e9a1c5d31a568406a361b883eec211326983
自由流通量相关测试
14个文件已修改
275 ■■■■■ 已修改文件
code_attribute/code_data_util.py 74 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
code_attribute/first_target_code_data_processor.py 9 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
code_attribute/global_data_loader.py 12 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
code_attribute/gpcode_manager.py 13 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/huaxin/huaxin_target_codes_manager.py 44 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
log_module/log.py 7 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
log_module/log_export.py 17 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
server.py 19 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test/test_log.py 28 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
third_data/history_k_data_util.py 13 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
third_data/kpl_api.py 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/huaxin/huaxin_trade_server.py 32 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
utils/global_util.py 3 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
utils/tool.py 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
code_attribute/code_data_util.py
@@ -8,8 +8,10 @@
from code_attribute import gpcode_manager
from db import mysql_data_delegate as mysql_data, redis_manager_delegate as redis_manager
from db.mysql_data_delegate import Mysqldb
from db.redis_manager_delegate import RedisUtils
from utils import tool
from utils.tool import async_call
__db = 0
_redisManager = redis_manager.RedisManager(0)
@@ -31,6 +33,7 @@
# 自由流通股本工具类
class ZYLTGBUtil:
    __db = 0
    __mysql = Mysqldb()
    @classmethod
    def save(cls, code, val, unit):
@@ -39,10 +42,51 @@
                             float(val) * 10000))
    @classmethod
    def save_async(cls, code, val, unit):
    def save_async(cls, code, val, price):
        """
        异步保存自由流通股本
        @param code:
        @param val:
        @param price:
        @return:
        """
        RedisUtils.setex_async(cls.__db, "zyltgb-{}".format(code), tool.get_expire(),
                               round(float(val) * 100000000) if int(unit) == 0 else round(
                                   float(val) * 10000))
                               val)
        if price > 0:
            zylt_volume = int(int(val) / float(price))
            cls.save_volume(code, zylt_volume)
    @classmethod
    def save_volume(cls, code, zylt_volume):
        result = cls.__mysql.select_one(f"select * from kpl_zylt_volume where code = {code}")
        if result:
            cls.__mysql.execute(
                f"update kpl_zylt_volume set zylt_volume = {zylt_volume}, update_time=now() where code = {code}")
        else:
            cls.__mysql.execute(
                f"insert into kpl_zylt_volume(code, zylt_volume, create_time, update_time) values ('{code}',{zylt_volume},now(),now())")
    @classmethod
    def get_today_updated_volume_codes(cls):
        """
        获取今日已经更新量的代码
        @return:
        """
        fresults = cls.__mysql.select_all(f"select code from kpl_zylt_volume where update_time >= '{tool.get_now_date_str()}'")
        if fresults:
            return [x[0] for x in fresults]
        return []
    @classmethod
    def count_today_updated_volume_codes(cls):
        """
        查询今日已经更新量的个数
        @return:
        """
        fresults = cls.__mysql.select_one(f"select count(code) from kpl_zylt_volume where update_time >= '{tool.get_now_date_str()}'")
        return fresults[0]
    @classmethod
    def get(cls, code):
@@ -51,27 +95,7 @@
            return int(val)
        return None
    @classmethod
    def save_list(self, datasList):
        # 保存自由流通市值
        mysqldb = mysql_data.Mysqldb()
        for data in datasList:
            # 保存
            _dict = {"_id": data["code"], "zyltgb": data["zyltgb"], "zyltgb_unit": data["zyltgb_unit"],
                     "update_time": int(round(time.time() * 1000))}
            if float(data["zyltgb"]) > 0:
                # 保存10天
                ZYLTGBUtil.save(data["code"], data["zyltgb"], data["zyltgb_unit"])
                result = mysqldb.select_one("select * from ths_zylt where _id='{}'".format(data["code"]))
                if result is None:
                    mysqldb.execute(
                        "insert into ths_zylt(_id,zyltgb,zyltgb_unit,update_time) values ('{}','{}',{},{})".format(
                            data["code"], data["zyltgb"], data["zyltgb_unit"], round(time.time() * 1000)))
                else:
                    mysqldb.execute(
                        "update ths_zylt set zyltgb='{}',zyltgb_unit={},update_time={} where _id='{}'".format(
                            data["zyltgb"], data["zyltgb_unit"], round(time.time() * 1000), data["code"]))
if __name__ == "__main__":
    pass
    #ZYLTGBUtil.save_async("000333", 256222112, 65.54)
    print(ZYLTGBUtil.count_today_updated_volume_codes())
code_attribute/first_target_code_data_processor.py
@@ -60,7 +60,6 @@
        diff_codes = set(want_codes) - set(codes)
        if diff_codes:
            # 想买单的代码还没有在目标代码中
            zyltgb_list = []
            for code in diff_codes:
                # 获取涨停价
                _limit_up_price = gpcode_manager.get_limit_up_price(code)
@@ -74,14 +73,6 @@
                    dataList.append({"code": code, "price": f"{_limit_up_price}", "volume": "0",
                                     "volumeUnit": 0, "time": "00:00:00", "zyltgb": "100",
                                     "zyltgbUnit": 0})
            # 强制更新自由流通股本
            if zyltgb_list:
                ZYLTGBUtil.save_list(zyltgb_list)
                # 将保存的数据更新到内存中
                for z in zyltgb_list:
                    val = ZYLTGBUtil.get(z["code"])
                    if val:
                        global_util.zyltgb_map[z["code"]] = val
    # ---保存未筛选的首板代码
    new_add_codes = gpcode_first_screen_manager.set_target_no_screen_codes(codes)
code_attribute/global_data_loader.py
@@ -1,4 +1,5 @@
from code_attribute import code_volumn_manager, gpcode_manager
from db.mysql_data_delegate import Mysqldb
from utils import global_util, ths_industry_util
from code_attribute.code_data_util import ZYLTGBUtil
@@ -26,6 +27,15 @@
        result = ZYLTGBUtil.get(code)
        if result is not None:
            global_util.zyltgb_map[code] = result
def load_zyltgb_volume_from_db():
    # 拉取自由流通量
    mysqldb = Mysqldb()
    fresults = mysqldb.select_all("select code, zylt_volume from kpl_zylt_volume")
    if fresults:
        for result in fresults:
            global_util.zylt_volume_map[result[0]] = result[1]
# 加载名称代码隐射
@@ -61,4 +71,4 @@
    for data in datas:
        __dict[data["code"]] = data
    # print(__dict)
    global_util.today_limit_up_codes.update(__dict)
    global_util.today_limit_up_codes.update(__dict)
code_attribute/gpcode_manager.py
@@ -7,6 +7,8 @@
import constant
from db import redis_manager_delegate as redis_manager
from db.redis_manager_delegate import RedisUtils
from log_module import log_export
from log_module.log import logger_pre_close_price
from utils import tool
import decimal
@@ -710,9 +712,9 @@
    # 获取收盘价
    @classmethod
    def get_price_pre(cls, code):
        result = RedisUtils.get(cls.__redisManager.getRedis(), "price-pre-{}".format(code))
        if result is not None:
            return float(result)
        fdatas = log_export.load_pre_close_price()
        if code in fdatas:
            return round(float(fdatas.get(code)), 2)
        return None
    # 获取缓存
@@ -731,8 +733,9 @@
        codes = get_gp_list()
        if code not in codes and not FirstCodeManager().is_in_first_record_cache(code) and not force:
            return
        RedisUtils.setex(cls.__redisManager.getRedis(), "price-pre-{}".format(code), tool.get_expire(), str(price))
        cls.__price_pre_cache[code] = float(price)
        price = round(float(price), 2)
        logger_pre_close_price.info(f"{code}-{price}")
        cls.__price_pre_cache[code] = price
__limit_up_price_dict = {}
l2/huaxin/huaxin_target_codes_manager.py
@@ -5,7 +5,6 @@
import queue
import time
import constant
from code_attribute import global_data_loader, code_volumn_manager, first_target_code_data_processor, gpcode_manager
from code_attribute.code_data_util import ZYLTGBUtil
@@ -14,7 +13,7 @@
from log_module.log import logger_l2_codes_subscript
from third_data import kpl_data_manager, kpl_api
from trade import current_price_process_manager
from utils import tool, global_util,  init_data_util
from utils import tool, global_util, init_data_util
redisManager = redis_manager.RedisManager(4)
l2_codes_queue = queue.Queue()
@@ -74,9 +73,18 @@
        if not global_util.zyltgb_map:
            global_data_loader.load_zyltgb()
        # 首先加载涨停价
        need_get_limit_up_codes = []
        for d in datas:
            code = d[0]
            if not gpcode_manager.get_limit_up_price(code):
                need_get_limit_up_codes.append(code)
        if need_get_limit_up_codes:
            init_data_util.re_set_price_pres(need_get_limit_up_codes, True)
        for d in datas:
            code = d[0]
            price = d[1]
            # 格式 (代码,现价,涨幅,量,更新时间,买1价格,买1量)
            # 剔除昨日涨停的票
            if code in yesterday_codes:
@@ -87,23 +95,29 @@
            # 获取自由流通市值
            if code not in global_util.zyltgb_map:
                try:
                # 获取自由流通量
                zylt = None
                zylt_volume = global_util.zylt_volume_map.get(code)
                if zylt_volume and d[1] > 0:
                    zylt = zylt_volume * d[1]
                if not zylt:
                    try:
                        __start_time = time.time()
                        zylt = kpl_api.getZYLTAmount(code)
                        async_log_util.info(logger_l2_codes_subscript,
                                            f"{request_id} {code}获取自由流通市值耗时-{round((time.time() - __start_time) * 1000)}ms")
                    except:
                        pass
                if zylt:
                    limit_up_price = gpcode_manager.get_limit_up_price(code)
                    if not limit_up_price:
                        init_data_util.re_set_price_pre(code, True)
                        limit_up_price = gpcode_manager.get_limit_up_price(code)
                    __start_time = time.time()
                    zylt = kpl_api.getZYLTAmount(code)
                    async_log_util.info(logger_l2_codes_subscript,
                                        f"{request_id} {code}获取自由流通市值耗时-{round((time.time() - __start_time) * 1000)}ms")
                    if zylt:
                        if limit_up_price:
                            zylt = int(zylt / d[1] * float(limit_up_price))
                        # 保存自由流通股本
                        ZYLTGBUtil.save_async(code, zylt // 10000, 1)
                        global_util.zyltgb_map[code] = int(zylt)
                except:
                    pass
                    if limit_up_price:
                        zylt = int(zylt / d[1] * float(limit_up_price))
                    # 保存自由流通股本
                    ZYLTGBUtil.save_async(code, zylt, price)
                    global_util.zyltgb_map[code] = int(zylt)
            # 自由流通市值不符合标准
            # zyltgb = global_util.zyltgb_map.get(code)
            # if zyltgb:
log_module/log.py
@@ -44,6 +44,10 @@
                   filter=lambda record: record["extra"].get("name") == "mysql_debug",
                   rotation="00:00", compression="zip", enqueue=True)
        logger.add(self.get_path("code_attribute", "pre_close_price"),
                   filter=lambda record: record["extra"].get("name") == "pre_close_price",
                   rotation="00:00", compression="zip", enqueue=True)
        # 显示在控制台
        # logger.add(sys.stdout,
        #           filter=lambda record: record["extra"].get("name") == "l2_trade", enqueue=True)
@@ -447,6 +451,9 @@
logger_local_huaxin_l1_trade_info = __mylogger.get_logger("local_huaxin_l1_trade_info")
logger_local_huaxin_l2_special_volume = __mylogger.get_logger("local_huaxin_l2_special_volume")
# 收盘价日志
logger_pre_close_price = __mylogger.get_logger("pre_close_price")
def close_print():
    logging.basicConfig(level=logging.ERROR)
log_module/log_export.py
@@ -624,6 +624,23 @@
    return fdatas
def load_pre_close_price(date=tool.get_now_date_str()):
    """
    加载之前的收盘价
    @param date:
    @return:
    """
    fdatas = {}
    path = f"{constant.get_path_prefix()}/logs/gp/code_attribute/pre_close_price.{date}.log"
    lines = __load_file_content(path)
    for line in lines:
        if line:
            data = line.split(" - ")[1]
            code, price = data.split("-")[0].strip(), data.split("-")[1].strip()
            fdatas[code] = price
    return fdatas
if __name__ == '__main__':
    fdatas = get_real_place_order_positions("002404")
    print(len(fdatas))
server.py
@@ -225,25 +225,6 @@
                        else:
                            logger_l2_error.exception(e)
                elif type == 1:
                    # 设置股票代码
                    data_list, is_add = data_process.parseGPCode(_str)
                    ZYLTGBUtil.save_list(data_list)
                    code_list = []
                    for data in data_list:
                        code_list.append(data["code"])
                    # 获取基本信息
                    code_datas = HistoryKDatasUtils.get_gp_latest_info(code_list)
                    # if is_add:
                    #     gpcode_manager.add_gp_list(code_datas)
                    # else:
                    #     gpcode_manager.set_gp_list(code_datas)
                    if not is_add:
                        # 同步同花顺目标代码
                        t1 = threading.Thread(target=lambda: sync_target_codes_to_ths())
                        t1.setDaemon(True)
                        t1.start()
                elif type == 2:
                    # 涨停代码
                    dataList, is_add = data_process.parseGPCode(_str)
test/test_log.py
@@ -1,8 +1,28 @@
from code_attribute.code_data_util import ZYLTGBUtil
from code_attribute.gpcode_manager import CodePrePriceManager
from huaxin_client import l1_subscript_codes_manager
from log_module.async_log_util import huaxin_l2_log
from log_module.log import logger_local_huaxin_l2_error
from third_data import history_k_data_util, kpl_api
if __name__ == "__main__":
    use_time = 0.02
    temp_list = ["123123"]
    huaxin_l2_log.info(logger_local_huaxin_l2_error, f"耗时:{use_time}s  结束数据:{temp_list[-1]}")
    huaxin_l2_log.run_sync()
    codes_sh, codes_sz = l1_subscript_codes_manager.get_codes()
    codes = set()
    if codes_sh:
        for code_byte in codes_sh:
            codes.add(code_byte.decode())
        for code_byte in codes_sz:
            codes.add(code_byte.decode())
    updated_codes = ZYLTGBUtil.get_today_updated_volume_codes()
    codes = codes - set(updated_codes)
    # 获取最近的价格
    price_datas = history_k_data_util.JueJinApi.get_gp_current_info(codes, "symbol,price")
    price_dict = {x['symbol'].split(".")[1]: x['price'] for x in price_datas}
    for code in price_dict:
        try:
            zylt = kpl_api.getZYLTAmount(code)
            ZYLTGBUtil.save_volume(code, int(zylt / price_dict[code]))
        except:
            pass
third_data/history_k_data_util.py
@@ -87,8 +87,6 @@
        return cls.__request("get_history_instruments",
                             {"symbols": symbols, "start_date": start_date, "end_date": end_date, "fields": fields})
    @classmethod
    def get_previous_trading_date(cls, exchange, date):
        return cls.__request("get_previous_trading_date", {"exchange": exchange, "date": date})
@@ -155,7 +153,7 @@
            return results
    @classmethod
    def get_gp_current_info(cls, codes):
    def get_gp_current_info(cls, codes, fields=None):
        if not codes:
            return []
        symbols = cls.get_juejin_code_list_with_prefix(codes)
@@ -165,7 +163,7 @@
            data = gmapi.current(symbols=",".join(symbols))
            return data
        else:
            data = JueJinHttpApi.current(symbols=",".join(symbols), fields='')
            data = JueJinHttpApi.current(symbols=",".join(symbols), fields=fields)
            return data
        # 返回指定日期的上个交易日
@@ -189,7 +187,7 @@
            account_id, s_id, token = cls.getJueJinAccountInfo()
            gmapi.set_token(token)
            return gmapi.get_history_instruments(symbols=symbols, start_date=start_date, end_date=end_date,
                                         fields="symbol,sec_type,sec_id,sec_name,listed_date,sec_level,is_suspended,pre_close")
                                                 fields="symbol,sec_type,sec_id,sec_name,listed_date,sec_level,is_suspended,pre_close")
        else:
            return JueJinHttpApi.get_history_instruments(symbols, start_date, end_date, fields)
@@ -335,11 +333,12 @@
if __name__ == "__main__":
    now_day = tool.get_now_date_str()
    results = JueJinApi.get_history_instruments(JueJinApi.get_juejin_code_list_with_prefix(["600265"]), tool.date_sub(now_day,30), tool.date_sub(now_day,1))
    results = JueJinApi.get_history_instruments(JueJinApi.get_juejin_code_list_with_prefix(["600265"]),
                                                tool.date_sub(now_day, 30), tool.date_sub(now_day, 1))
    results = results[-5:]
    normal = True
    for r in results:
        if r["sec_level"]!=1:
        if r["sec_level"] != 1:
            normal = False
            break
    print(normal)
third_data/kpl_api.py
@@ -258,4 +258,4 @@
if __name__ == "__main__":
    print(getLimitUpInfoNew())
    print(getZYLTAmount("300198"))
trade/huaxin/huaxin_trade_server.py
@@ -22,6 +22,7 @@
import outside_api_command_manager
from cancel_strategy.s_l_h_cancel_strategy import SCancelBigNumComputer
from code_attribute import gpcode_manager, code_volumn_manager, global_data_loader
from code_attribute.code_data_util import ZYLTGBUtil
from code_attribute.gpcode_manager import CodePrePriceManager, CodesNameManager, WantBuyCodesManager
from db import mysql_data_delegate as mysql_data, redis_manager_delegate as redis_manager
from db.redis_manager_delegate import RedisUtils
@@ -47,7 +48,7 @@
    logger_system, logger_trade, logger_trade_position_api_request, logger_request_api, \
    logger_local_huaxin_l1_trade_info, logger_real_place_order_position
from output import l2_output_util
from third_data import block_info, kpl_data_manager, kpl_util
from third_data import block_info, kpl_data_manager, kpl_util, history_k_data_util, kpl_api
from third_data.code_plate_key_manager import KPLCodeJXBlockManager, CodePlateKeyBuyManager
from third_data.history_k_data_util import JueJinApi, HistoryKDatasUtils
from third_data.kpl_data_manager import KPLDataManager
@@ -1716,7 +1717,32 @@
                    fdatas.append(d)
                results = [output_util.money_desc(d) for d in fdatas]
                self.send_response({"code": 0, "data": results}, client_id, request_id)
            elif ctype == "refresh_zylt_volume":
                # 刷新目标代码的自由流通量
                codes_sh, codes_sz = l1_subscript_codes_manager.get_codes()
                codes = set()
                if codes_sh:
                    for code_byte in codes_sh:
                        codes.add(code_byte.decode())
                    for code_byte in codes_sz:
                        codes.add(code_byte.decode())
                updated_codes = ZYLTGBUtil.get_today_updated_volume_codes()
                codes = codes - set(updated_codes)
                # 获取最近的价格
                price_datas = history_k_data_util.JueJinApi.get_gp_current_info(codes, "symbol, price")
                price_dict = {x['symbol'].split(".")[1]: x['price'] for x in price_datas}
                for code in price_dict:
                    try:
                        zylt = kpl_api.getZYLTAmount(code)
                        ZYLTGBUtil.save_volume(code, int(zylt / price_dict[code]))
                    except:
                        pass
                self.send_response({"code": 0, "data": {}, "msg": f"更新代码数量:{len(codes)}"}, client_id, request_id)
            elif ctype == "get_today_updated_zylt_volume_count":
                # 获取今日已经更新的自由流通量的代码数量
                count = ZYLTGBUtil.count_today_updated_volume_codes()
                self.send_response({"code": 0, "data": {"count": count}}, client_id, request_id)
        except Exception as e:
            logging.exception(e)
@@ -1799,7 +1825,7 @@
                place_order_count = trade_data_manager.PlaceOrderCountManager().get_place_order_count(code)
                if tool.is_sz_code(code) and place_order_count == 0 and current_total_sell_data[
                    1] > 500 * 10000 and global_util.zyltgb_map.get(
                        code) < 50 * 100000000:
                    code) < 50 * 100000000:
                    # 首次下单,自由流通50亿以下,总卖额500w才能激进下单
                    mode_descs.insert(0, "成交触发")
                    last_index = total_datas[-1]["index"]
@@ -1884,6 +1910,8 @@
    threading.Thread(target=run_pending, daemon=True).start()
    l2_data_util.load_l2_data_all(True)
    L2TradeSingleDataManager.set_callback(MyL2TradeSingleCallback())
    # 加载自由流通量
    global_data_loader.load_zyltgb_volume_from_db()
def run(queue_strategy_r_trade_w, queue_l1_w_strategy_r, queue_strategy_w_trade_r, queue_strategy_w_trade_r_for_read,
utils/global_util.py
@@ -8,6 +8,9 @@
industry_codes_map = {}
# 自由流通市值映射
zyltgb_map = {}
# 自由流通量映射
zylt_volume_map = {}
# 今日涨停代码隐射
today_limit_up_codes = {}
# 行业热度指数
utils/tool.py
@@ -313,7 +313,7 @@
def is_can_buy_code(code):
    if code.find("00") == 0 or code.find("60") == 0: #or code.find("30") == 0:
    if code.find("00") == 0 or code.find("60") == 0 or code.find("30") == 0:
        return True
    return False