Administrator
2023-11-24 d6b6be5eb2ae00a8ccf46bd7d53cd7d0c1e59c72
初始化独立某些方法
1个文件已添加
10个文件已修改
328 ■■■■ 已修改文件
code_attribute/first_target_code_data_processor.py 29 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
code_attribute/gpcode_first_screen_manager.py 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
gui.py 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
huaxin_client/l1_client.py 28 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
inited_data.py 86 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
main.py 24 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
server.py 43 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/huaxin/huaxin_trade_api.py 7 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/huaxin/huaxin_trade_api_server.py 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/huaxin/huaxin_trade_data_update.py 10 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
utils/init_data_util.py 92 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
code_attribute/first_target_code_data_processor.py
@@ -5,7 +5,6 @@
# 处理首板代码信息
import constant
import inited_data
from code_attribute import gpcode_manager, gpcode_first_screen_manager, code_nature_analyse, \
    code_volumn_manager
from code_attribute.code_data_util import ZYLTGBUtil
@@ -15,7 +14,7 @@
from third_data.history_k_data_util import HistoryKDatasUtils
from ths import l2_code_operate
from trade import trade_data_manager, l2_trade_util
from utils import global_util, tool
from utils import global_util, tool, init_data_util
__CodesPlateKeysManager = CodesHisReasonAndBlocksManager()
@@ -44,7 +43,7 @@
                # 获取涨停价
                _limit_up_price = gpcode_manager.get_limit_up_price(code)
                if not _limit_up_price:
                    inited_data.re_set_price_pres([code], True)
                    init_data_util.re_set_price_pres([code], True)
                    # 再次获取涨停价
                    _limit_up_price = gpcode_manager.get_limit_up_price(code)
                if _limit_up_price:
@@ -81,7 +80,7 @@
    for code in codes:
        # 如果涨停价是空值就需要设置昨日收盘价格
        if gpcode_manager.get_limit_up_price(code) is None:
            inited_data.re_set_price_pres([code], True)
            init_data_util.re_set_price_pres([code], True)
    # 板块关键字准备  暂时删除
    # for code in codes:
@@ -112,11 +111,11 @@
            if limit_up_price is None:
                continue
            try:
                volumes_data = inited_data.get_volumns_by_code(code, 150)
                volumes = inited_data.parse_max_volume(volumes_data[:90],
                                                       code_nature_analyse.is_new_top(
                                                           limit_up_price,
                                                           volumes_data[:90]))
                volumes_data = init_data_util.get_volumns_by_code(code, 150)
                volumes = init_data_util.parse_max_volume(volumes_data[:90],
                                                          code_nature_analyse.is_new_top(
                                                              limit_up_price,
                                                              volumes_data[:90]))
                logger_first_code_record.info("{} 获取到首板60天最大量:{}", code, volumes)
                code_volumn_manager.set_histry_volumn(code, volumes[0], volumes[1], volumes[2])
@@ -186,7 +185,7 @@
    # 获取涨停价
    if temp_codes:
        # 获取涨停价
        inited_data.re_set_price_pres(temp_codes)
        init_data_util.re_set_price_pres(temp_codes)
        # 重新获取涨停价
        for code in temp_codes:
            limit_up_price = gpcode_manager.get_limit_up_price(code)
@@ -212,7 +211,7 @@
            gpcode_manager.FirstCodeManager().add_limited_up_record([code])
        pricePre = gpcode_manager.CodePrePriceManager.get_price_pre_cache(code)
        if pricePre is None:
            inited_data.re_set_price_pres([code])
            init_data_util.re_set_price_pres([code])
        rate = round((float(price) - pricePre) * 100 / pricePre, 1)
        prices.append(
@@ -223,11 +222,3 @@
    logger_l2_codes_subscript.info(f"({request_id})l2代码相关数据加载完成")
    return tick_datas
if __name__ == "__main__":
    code = "002308"
    limit_up_price = gpcode_manager.get_limit_up_price(code)
    volumes_data = inited_data.get_volumns_by_code(code, 150)
    # 保存K线形态
    k_format = code_nature_analyse.get_k_format(limit_up_price, volumes_data)
    code_nature_analyse.CodeNatureRecordManager().save_k_format(code, k_format)
code_attribute/gpcode_first_screen_manager.py
@@ -8,7 +8,6 @@
from db.redis_manager_delegate import RedisUtils
from utils import tool
from db import redis_manager_delegate as redis_manager
from third_data import block_info
class FirstCodeDataManager:
gui.py
@@ -10,7 +10,7 @@
import win32gui
from db.redis_manager_delegate import RedisUtils
from utils import data_export_util
from utils import data_export_util, init_data_util
import multiprocessing
from log_module import log, log_export
@@ -227,7 +227,7 @@
            sv_num.set("获取到收盘价数量:{}".format(count))
        def re_get_close_price():
            inited_data.re_set_price_pres(gpcode_manager.get_gp_list())
            init_data_util.re_set_price_pres(gpcode_manager.get_gp_list())
        def get_limit_up_codes_win():
            width = 500
huaxin_client/l1_client.py
@@ -169,7 +169,26 @@
        pass
def run(queue_l1_w_strategy_r):
__position_codes = set()
def __read_from_strategy(queue_l1_r_strategy_w: multiprocessing.Queue):
    while True:
        try:
            data = queue_l1_r_strategy_w.get()
            if type(data) == str:
                data = json.loads(data)
            if data["type"] == "set_position_codes":
                codes = set(data["data"])
                global __position_codes
                __position_codes = codes
        except:
            pass
        finally:
            time.sleep(1)
def run(queue_l1_w_strategy_r, queue_l1_r_strategy_w):
    logger_local_huaxin_l1.info("运行l1订阅服务")
    codes_sh = []
    codes_sz = []
@@ -218,6 +237,7 @@
    # level1_data_dict["002292"] = (
    #     "002292", 8.06, 9.96, 969500 * 100, time.time())
    threading.Thread(target=__read_from_strategy, args=(queue_l1_r_strategy_w,), daemon=True).start()
    # 等待程序结束
    while True:
        print("数量", len(level1_data_dict))
@@ -229,13 +249,17 @@
            # (代码,现价,涨幅,量,时间)
            list_ = [level1_data_dict[k] for k in level1_data_dict]
            flist = []
            plist = []
            for d in list_:
                if d[2] >= constant.L1_MIN_RATE:
                    # 涨幅小于5%的需要删除
                    flist.append(d)
                if d[0] in __position_codes:
                    plist.append(d)
            flist.sort(key=lambda x: x[2], reverse=True)
            datas = flist[:200]
            codes = [x[0] for x in datas]
            # 将持仓股加入进去
            datas.extend(plist)
            print("代码数量:", len(datas))
            logger_l2_codes_subscript.info("开始#华鑫L1上传代码:数量-{}", len(datas))
            __upload_codes_info(queue_l1_w_strategy_r, datas)
inited_data.py
@@ -208,93 +208,7 @@
        symbol = symbol.split(".")[1]
# 设置收盘价
def re_set_price_pre(code):
    codes = [code]
    re_set_price_pres(codes)
def re_set_price_pres(codes, force=False):
    result = HistoryKDatasUtils.get_gp_latest_info(codes)
    for item in result:
        symbol = item['symbol']
        symbol = symbol.split(".")[1]
        pre_close = tool.to_price(decimal.Decimal(str(item['pre_close'])))
        gpcode_manager.CodePrePriceManager.set_price_pre(symbol, pre_close, force)
__prices_now = {}
# 获取近90天的最大量与最近的量
# 获取最近一次涨停/涨停下一个交易日的最大值
def get_volumns_by_code(code, count=60) -> object:
    datas = HistoryKDatasUtils.get_history_tick_n(code, count, "open,high,low,close,volume,pre_close,bob,amount")
    # 计算
    datas.sort(key=lambda x: x["bob"], reverse=True)
    return datas
# 解析最大量
def parse_max_volume(datas, is_new_top=False):
    max_volume = 0
    max_volume_date = None
    if is_new_top:
        # 如果是突破前高就取最大量
        for item in datas:
            if max_volume < item["volume"]:
                max_volume = item["volume"]
                max_volume_date = item["bob"]
        return max_volume, max_volume, max_volume_date.strftime("%Y-%m-%d")
    else:
        date = None
        target_volume = None
        for i in range(len(datas)):
            # 查询涨停
            item = datas[i]
            volume = item["volume"]
            if max_volume < volume:
                max_volume = volume
                max_volume_date = item['bob']
            # 是否有涨停
            limit_up_price = float(gpcode_manager.get_limit_up_price_by_preprice(item["pre_close"]))
            if abs(limit_up_price - item["high"]) < 0.01:
                # 涨停
                next_volume = 0
                if i > 0:
                    next_volume = datas[i - 1]["volume"]
                date = datas[i]["bob"]
                if volume < next_volume:
                    volume = next_volume
                    date = datas[i - 1]["bob"]
                target_volume = (volume, date)
                break
        if not target_volume:
            target_volume = (max_volume, max_volume_date)
        # --判断近60天无涨停的最大量
        max_60_volume_info = [0, None]
        # 60天内是否有涨停
        has_60_limit_up = False
        for i in range(60):
            if i >= len(datas):
                break
            item = datas[i]
            volume = item["volume"]
            if max_60_volume_info[0] < volume:
                max_60_volume_info = [volume, item["bob"]]
            limit_up_price = float(gpcode_manager.get_limit_up_price_by_preprice(item["pre_close"]))
            if abs(limit_up_price - item["high"]) < 0.01:
                has_60_limit_up = True
                break
        if not has_60_limit_up and target_volume[0] > max_60_volume_info[0] * 3:
            # 60天内无涨停,且60天内最大量小于最大量的1/3,判断为地量,返回近60个交易日的最大量
            return max_60_volume_info[0], max_60_volume_info[0], max_60_volume_info[1].strftime("%Y-%m-%d")
        else:
            return target_volume[0], target_volume[0], target_volume[1].strftime("%Y-%m-%d")
# 保存运行时数据
main.py
@@ -1,10 +1,12 @@
"""
GUI管理
"""
import logging
import multiprocessing
import os
import threading
import constant
from db import redis_manager_delegate as redis_manager
import huaxin_client.trade_client
import huaxin_client.l2_client
@@ -12,7 +14,7 @@
from log_module import log
from log_module.log import logger_l2_trade, logger_system, logger_local_huaxin_l1
from server import *
import server
# 交易服务
from third_data import data_server
@@ -20,17 +22,18 @@
# from huaxin_api import trade_client, l2_client, l1_client
from utils import tool
def createTradeServer(pipe_server, queue_strategy_r_trade_w_: multiprocessing.Queue,
                      queue_l1_w_strategy_r_: multiprocessing.Queue,
                      queue_strategy_w_trade_r_: multiprocessing.Queue,
                      queue_strategy_w_trade_r_for_read_: multiprocessing.Queue, order_queues_, transaction_queues_,
                      market_queue_):
                      market_queue_,queue_l1_r_strategy_w):
    logger_system.info("策略进程ID:{}", os.getpid())
    log.close_print()
    # 初始化参数
    global_data_loader.init()
    server.global_data_loader.init()
    # # 数据服务
    t1 = threading.Thread(target=createDataServer, name="createDataServer", daemon=True)
@@ -38,7 +41,7 @@
    #
    # 交易接口服务
    t1 = threading.Thread(target=huaxin_trade_api_server.run, name="trade_api_server",
                          args=(pipe_server, queue_other_w_l2_r),
                          args=(pipe_server, queue_other_w_l2_r, queue_l1_r_strategy_w),
                          daemon=True)
    t1.start()
    #
@@ -47,7 +50,8 @@
    t1.start()
    #
    # 启动华鑫交易服务
    huaxin_trade_server.run(queue_strategy_r_trade_w_, queue_l1_w_strategy_r_, queue_strategy_w_trade_r_, queue_strategy_w_trade_r_for_read_, order_queues_,
    huaxin_trade_server.run(queue_strategy_r_trade_w_, queue_l1_w_strategy_r_, queue_strategy_w_trade_r_,
                            queue_strategy_w_trade_r_for_read_, order_queues_,
                            transaction_queues_, market_queue_)
@@ -56,7 +60,7 @@
    logger_system.info("create Server")
    laddr = "", 9001
    try:
        tcpserver = MyThreadingTCPServer(laddr, MyBaseRequestHandle, pipe_trade=pipe)  # 注意:参数是MyBaseRequestHandle
        tcpserver = server.MyThreadingTCPServer(laddr, server.MyBaseRequestHandle, pipe_trade=pipe)  # 注意:参数是MyBaseRequestHandle
        tcpserver.serve_forever()
    except Exception as e:
        logger_system.exception(e)
@@ -90,6 +94,7 @@
        queue_other_w_l2_r = multiprocessing.Queue()
        #
        queue_l1_w_strategy_r = multiprocessing.Queue()
        queue_l1_r_strategy_w = multiprocessing.Queue()
        # 交易读策略写
        queue_strategy_w_trade_r = multiprocessing.Queue()
@@ -103,7 +108,8 @@
        logger_system.info("主进程ID:{}", os.getpid())
        # L1订阅数据
        l1Process = multiprocessing.Process(target=huaxin_client.l1_client.run, args=(queue_l1_w_strategy_r,))
        l1Process = multiprocessing.Process(target=huaxin_client.l1_client.run,
                                            args=(queue_l1_w_strategy_r, queue_l1_r_strategy_w,))
        l1Process.start()
        # 交易进程
@@ -131,7 +137,7 @@
        # 主进程
        createTradeServer(pss_strategy, queue_strategy_r_trade_w, queue_l1_w_strategy_r, queue_strategy_w_trade_r,
                          queue_strategy_w_trade_r_for_read,
                          order_queues, transaction_queues, market_queue)
                          order_queues, transaction_queues, market_queue,queue_l1_r_strategy_w)
        # 将tradeServer作为主进程
        l1Process.join()
server.py
@@ -11,11 +11,9 @@
import time
from utils import alert_util, data_process, global_util, ths_industry_util, tool, import_util, socket_util
from code_attribute import code_volumn_manager, code_nature_analyse, global_data_loader, gpcode_manager, \
    gpcode_first_screen_manager, first_target_code_data_processor
from code_attribute import code_volumn_manager, global_data_loader, gpcode_manager, first_target_code_data_processor
import constant
from user import authority
import inited_data
from l2 import l2_data_manager_new, l2_data_manager, l2_data_log, l2_log, code_price_manager
import l2_data_util
from l2.cancel_buy_strategy import HourCancelBigNumComputer, LCancelBigNumComputer
@@ -25,15 +23,15 @@
from third_data import block_info, kpl_api
from third_data.code_plate_key_manager import CodesHisReasonAndBlocksManager
from third_data.history_k_data_util import HistoryKDatasUtils
from third_data.kpl_data_manager import KPLCodeLimitUpReasonManager, KPLLimitUpDataRecordManager
from third_data.kpl_data_manager import KPLCodeLimitUpReasonManager
from ths import l2_listen_pos_health_manager, l2_code_operate, client_manager
from trade import trade_data_manager, trade_manager, l2_trade_util, deal_big_money_manager, \
from trade import trade_data_manager, trade_manager, l2_trade_util, \
    current_price_process_manager, trade_juejin
from code_attribute.code_data_util import ZYLTGBUtil
import l2.transaction_progress
from log_module.log import logger_l2_error, logger_device, logger_trade_delegate, logger_buy_1_volumn_record, \
    logger_l2_trade_queue, logger_l2_latest_data, logger_l2_trade_buy_queue, logger_first_code_record, logger_debug
    logger_l2_trade_queue, logger_l2_latest_data, logger_l2_trade_buy_queue, logger_debug
from trade.huaxin import huaxin_trade_record_manager
from trade.trade_manager import TradeTargetCodeModeManager
from trade.trade_queue_manager import THSBuy1VolumnManager, thsl2tradequeuemanager
@@ -954,35 +952,4 @@
                trade_manager.save_trade_success_data(fdatas)
        except:
            pass
        time.sleep(1.5)
if __name__ == "__main__":
    codes = ["002792"]  # gpcode_manager.FirstGPCodesManager().get_first_gp_codes()
    for code in codes:
        volumes_data = inited_data.get_volumns_by_code(code, 150)
        # volumes_data = volumes_data[1:]
        print(code, code_nature_analyse.is_up_too_high_in_10d(volumes_data))
        # try:
        #     global_data_loader.load_zyltgb()
        #     limit_up_price = float(gpcode_manager.get_limit_up_price(code))
        #     volumes_data = inited_data.get_volumns_by_code(code, 150)
        #     volumes_data = volumes_data[1:]
        #     volumes = inited_data.parse_max_volume(volumes_data[:60],
        #                                            code_nature_analyse.is_new_top(limit_up_price,
        #                                                                           volumes_data[:60]))
        #     logger_first_code_record.info("{} 获取到首板60天最大量:{}", code, volumes)
        #     code_volumn_manager.set_histry_volumn(code, volumes[0], volumes[1], volumes[2])
        #     # 判断K线形态
        #     k_format = code_nature_analyse.get_k_format(
        #         limit_up_price, volumes_data)
        #     print(k_format)
        #
        #     code_nature_analyse.set_record_datas(code,
        #                                          gpcode_manager.get_limit_up_price(code),
        #                                          volumes_data)
        # except:
        #     pass
        # code_nature_analyse.set_record_datas(code,
        #                                      limit_up_price,
        #                                      volumes_data)
        time.sleep(1.5)
trade/huaxin/huaxin_trade_api.py
@@ -123,7 +123,6 @@
    queue_strategy_w_trade_r = queue_strategy_w_trade_r_
    queue_strategy_w_trade_r_for_read = queue_strategy_w_trade_r_for_read_
    t1 = threading.Thread(target=lambda: __run_recv_queue_trade(queue_strategy_r_trade_w_), daemon=True)
    t1.start()
    t1 = threading.Thread(target=lambda: __run_save_data(), daemon=True)
@@ -298,7 +297,7 @@
# 网络请求
def __request(_type, data, request_id=None, blocking=False, is_pipe=True, log_enable=True,is_trade=False):
def __request(_type, data, request_id=None, blocking=False, is_pipe=True, log_enable=True, is_trade=False):
    if not request_id:
        request_id = __get_request_id(_type)
    try:
@@ -392,7 +391,7 @@
                                "price": price, "shadow_price": shadow_price, "sinfo": sinfo, "blocking": blocking},
                               request_id=request_id,
                               blocking=blocking,
                               is_pipe=is_pipe_channel_normal(),is_trade=True)
                               is_pipe=is_pipe_channel_normal(), is_trade=True)
    try:
        if blocking:
            return __read_response(request_id, blocking)
@@ -428,7 +427,7 @@
                                "orderRef": orderRef,
                                "orderActionRef": order_action_ref,
                                "orderSysID": orderSysID, "sinfo": sinfo}, request_id=request_id, blocking=blocking,
                               is_pipe=is_pipe_channel_normal(),is_trade=True)
                               is_pipe=is_pipe_channel_normal(), is_trade=True)
    try:
        return __read_response(request_id, blocking)
    finally:
trade/huaxin/huaxin_trade_api_server.py
@@ -534,11 +534,11 @@
            time.sleep(1)
def run(pipe_server, queue_other_w_l2_r):
def run(pipe_server, queue_other_w_l2_r, queue_l1_r_strategy_w):
    logger_system.info("create TradeApiServer")
    logger_system.info(f"trade_api_server 线程ID:{tool.get_thread_id()}")
    # 拉取交易信息
    huaxin_trade_data_update.run()
    huaxin_trade_data_update.run(queue_l1_r_strategy_w)
    #
    t1 = threading.Thread(target=lambda: __set_target_codes(queue_other_w_l2_r), daemon=True)
    t1.start()
trade/huaxin/huaxin_trade_data_update.py
@@ -107,8 +107,14 @@
                            datas = dataJSON["data"]
                            huaxin_trade_record_manager.PositionManager.cache(datas)
                            # 获取持仓股的涨停价
                            position_codes = set()
                            for d in datas:
                                gpcode_manager.get_limit_up_price(d["securityID"])
                                if d["prePosition"] > 0:
                                    position_codes.add(d["securityID"])
                            queue_l1_r_strategy_w.put_nowait(
                                {"type": "set_position_codes", "data": list(position_codes)})
                            __process_thread_pool.submit(huaxin_trade_record_manager.PositionManager.add, datas)
                    async_log_util.info(hx_logger_trade_debug, f"获取交易数据成功:{type_}")
                except Exception as e1:
@@ -145,6 +151,8 @@
# 运行
def run():
def run(queue_l1_r_strategy_w_):
    global queue_l1_r_strategy_w
    queue_l1_r_strategy_w = queue_l1_r_strategy_w_
    t1 = threading.Thread(target=lambda: __read_update_task_queue(), daemon=True)
    t1.start()
utils/init_data_util.py
New file
@@ -0,0 +1,92 @@
# 设置收盘价
import decimal
from code_attribute import gpcode_manager
from third_data.history_k_data_util import HistoryKDatasUtils
from utils import tool
def re_set_price_pre(code):
    codes = [code]
    re_set_price_pres(codes)
def re_set_price_pres(codes, force=False):
    result = HistoryKDatasUtils.get_gp_latest_info(codes)
    for item in result:
        symbol = item['symbol']
        symbol = symbol.split(".")[1]
        pre_close = tool.to_price(decimal.Decimal(str(item['pre_close'])))
        gpcode_manager.CodePrePriceManager.set_price_pre(symbol, pre_close, force)
# 获取近90天的最大量与最近的量
# 获取最近一次涨停/涨停下一个交易日的最大值
def get_volumns_by_code(code, count=60) -> object:
    datas = HistoryKDatasUtils.get_history_tick_n(code, count, "open,high,low,close,volume,pre_close,bob,amount")
    # 计算
    datas.sort(key=lambda x: x["bob"], reverse=True)
    return datas
# 解析最大量
def parse_max_volume(datas, is_new_top=False):
    max_volume = 0
    max_volume_date = None
    if is_new_top:
        # 如果是突破前高就取最大量
        for item in datas:
            if max_volume < item["volume"]:
                max_volume = item["volume"]
                max_volume_date = item["bob"]
        return max_volume, max_volume, max_volume_date.strftime("%Y-%m-%d")
    else:
        date = None
        target_volume = None
        for i in range(len(datas)):
            # 查询涨停
            item = datas[i]
            volume = item["volume"]
            if max_volume < volume:
                max_volume = volume
                max_volume_date = item['bob']
            # 是否有涨停
            limit_up_price = float(gpcode_manager.get_limit_up_price_by_preprice(item["pre_close"]))
            if abs(limit_up_price - item["high"]) < 0.01:
                # 涨停
                next_volume = 0
                if i > 0:
                    next_volume = datas[i - 1]["volume"]
                date = datas[i]["bob"]
                if volume < next_volume:
                    volume = next_volume
                    date = datas[i - 1]["bob"]
                target_volume = (volume, date)
                break
        if not target_volume:
            target_volume = (max_volume, max_volume_date)
        # --判断近60天无涨停的最大量
        max_60_volume_info = [0, None]
        # 60天内是否有涨停
        has_60_limit_up = False
        for i in range(60):
            if i >= len(datas):
                break
            item = datas[i]
            volume = item["volume"]
            if max_60_volume_info[0] < volume:
                max_60_volume_info = [volume, item["bob"]]
            limit_up_price = float(gpcode_manager.get_limit_up_price_by_preprice(item["pre_close"]))
            if abs(limit_up_price - item["high"]) < 0.01:
                has_60_limit_up = True
                break
        if not has_60_limit_up and target_volume[0] > max_60_volume_info[0] * 3:
            # 60天内无涨停,且60天内最大量小于最大量的1/3,判断为地量,返回近60个交易日的最大量
            return max_60_volume_info[0], max_60_volume_info[0], max_60_volume_info[1].strftime("%Y-%m-%d")
        else:
            return target_volume[0], target_volume[0], target_volume[1].strftime("%Y-%m-%d")