admin
2025-02-27 8d151575d6fa5696545eba6555badbdc6a544dc4
指数日志记录
6个文件已修改
82 ■■■■■ 已修改文件
data_server.py 8 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
huaxin_client/l1_api_client.py 21 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
main.py 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
strategy/data_cache.py 5 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
strategy/kpl_api.py 4 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
utils/hx_qc_value_util.py 40 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
data_server.py
@@ -6,6 +6,7 @@
from http.server import BaseHTTPRequestHandler
import urllib.parse as urlparse
from log_module.log import hx_logger_l2_transaction
from strategy import data_cache
from trade import huaxin_trade_api, huaxin_trade_data_update
from trade.huaxin_trade_record_manager import DelegateRecordManager, DealRecordManager, MoneyManager, PositionManager
@@ -184,6 +185,13 @@
                orderSysID = params.get("orderSysID")  # 系统订单编号
                result = huaxin_trade_api.cancel_order(direction, code, orderSysID, blocking=True)
                result_str = json.dumps(result)
            elif url.path == "/upload_deal_big_orders":
                # 成交大单传递
                datas = self.rfile.read(int(self.headers['content-length']))
                _str = str(datas, encoding="gbk")
                hx_logger_l2_transaction.info(_str)
                # 记录日志
                result_str = json.dumps({"code": 0})
        except Exception as e:
            result_str = json.dumps({"code": 1, "msg": str(e)})
        finally:
huaxin_client/l1_api_client.py
@@ -22,6 +22,8 @@
class sampleSpi(qcvalueaddproapi.CQCValueAddProSpi):
    __result_cache = {}
    __temp_cache = {}
    # 指数数据
    stock_index_data_dict = {}
    def __init__(self, t_tapi):
        qcvalueaddproapi.CQCValueAddProSpi.__init__(self)
@@ -210,10 +212,10 @@
                "Turnover": pStockIndexData.Turnover,
                "LXLastPrice": pStockIndexData.LXLastPrice,
            }
            self.stock_index_data_dict[data["SecurityID"]] = data
            logger_debug.info(f"指数行情应答:{data}")
        except Exception as e:
            logging.exception(e)
    def ReqQryGGTEODPrices(self):
        QryField = qcvalueaddproapi.CQCVDQryGGTEODPricesField()
@@ -300,11 +302,25 @@
            pass
def run(request_queue: multiprocessing.Queue, response_queue: multiprocessing.Queue):
def __upload_datas(data_queue: multiprocessing.Queue):
    # 1s上传一次
    while True:
        try:
            if thespi.stock_index_data_dict:
                data_queue.put_nowait(("stock_index_datas", thespi.stock_index_data_dict))
        except:
            pass
        finally:
            time.sleep(1)
def run(request_queue: multiprocessing.Queue, response_queue: multiprocessing.Queue,
        data_callback_queue: multiprocessing.Queue):
    """
    运行
    @param request_queue: 请求队列
    @param response_queue: 响应队列
    @param data_callback_queue: 数据回调
    @return:
    """
    global g_userid, g_passwd, g_address, g_port
@@ -329,6 +345,7 @@
    theapi.RegisterSpi(thespi)
    theapi.RegisterFront(g_address, g_port)
    threading.Thread(target=__read_request, args=(request_queue, response_queue,), daemon=True).start()
    threading.Thread(target=__upload_datas, args=(data_callback_queue,), daemon=True).start()
    theapi.Run()
main.py
@@ -96,8 +96,8 @@
    # # 开盘啦的板块强度下的个股强度回调函数
    def get_market_sift_plate_its_stock_power_process(market_sift_plate_stock_dict):
        print(f"回调成功===精选板块股票强度数据更新==={market_sift_plate_stock_dict}")
        logger_kpl_jingxuan_in.info(f"{market_sift_plate_stock_dict}")
        # print(f"回调成功===精选板块股票强度数据更新==={market_sift_plate_stock_dict}")
        # logger_kpl_jingxuan_in.info(f"{market_sift_plate_stock_dict}")
        data_cache.market_sift_plate_stock_dict = market_sift_plate_stock_dict
    # 板块强度下个股强度线程
strategy/data_cache.py
@@ -218,10 +218,13 @@
# 今日计划下单金额
today_planned_order_amount = 0
# 最新的市场行情数据
# {"code":(代码,昨日收盘价,最新价,总成交量,总成交额,买五档(价格,成交额),卖五档(价格,成交额),更新时间)}
latest_code_market_info_dict = {}
# 股票指数字典 例如:{"000001":(指数, 量, 额)}
stock_index_dict = {}
logging.info(f"全局初始化数据  完成《《《 - {os.getpid()}")
# 获取当前进程的PID
strategy/kpl_api.py
@@ -9,7 +9,7 @@
import requests
import constant
from log_module.log import logger_common
from log_module.log import logger_common, logger_kpl_jingxuan_in
# import requests
from strategy import data_cache
from strategy import basic_methods
@@ -219,6 +219,8 @@
def get_market_sift_plate_its_stock_power():
    data = (getMarketJingXuanRealRankingInfo())
    market_sift_plate = json.loads(data)
    # logger_kpl_jingxuan_in 打印的日志专用于开盘了数据的存储分析,不能轻易删除
    logger_kpl_jingxuan_in.info(f"{market_sift_plate}")
    # print(f"market_sift_plate 数 ======{len(market_sift_plate['list'])}")
    # logger.info(f"market_sift_plate['list']======{market_sift_plate['list']}")
    # logger.info(f"market_sift_plate['list'][0]  ======{market_sift_plate['list'][0]}")
utils/hx_qc_value_util.py
@@ -8,6 +8,7 @@
from huaxin_client import l1_api_client
from log_module.log import logger_debug
from strategy import data_cache
from utils import tool
__response_data = {}
@@ -144,12 +145,47 @@
    return None
def __read_callback_data(data_callback_queue: multiprocessing.Queue):
    """
    读取数据回调
    :param data_callback_queue:
    :return:
    """
    while True:
        try:
            data = data_callback_queue.get()
            type_ = data[0]
            data = data[1]
            if type_ == "stock_index_datas":
                for k in data:
                    """
                   值的格式为:
                    {
                       "LastPrice": pStockIndexData.LastPrice,
                       "SecurityID": pStockIndexData.SecurityID,
                       "UpdateTime": pStockIndexData.UpdateTime,
                       "Volume": pStockIndexData.Volume,
                       "Turnover": pStockIndexData.Turnover,
                       "LXLastPrice": pStockIndexData.LXLastPrice,
                   }
                   """
                    d = data[k]
                    data_cache.stock_index_dict[k] = (round(d["LastPrice"], 2), d["Volume"], d["Turnover"])
        except:
            pass
def run():
    global request_queue
    request_queue, response_queue = multiprocessing.Queue(), multiprocessing.Queue()
    request_queue, response_queue, data_callback_queue = multiprocessing.Queue(), multiprocessing.Queue(), multiprocessing.Queue()
    # 启动增值服务进程
    process = multiprocessing.Process(target=l1_api_client.run, args=(request_queue, response_queue,), daemon=True)
    process = multiprocessing.Process(target=l1_api_client.run,
                                      args=(request_queue, response_queue, data_callback_queue,), daemon=True)
    process.start()
    # 读取数据回调
    threading.Thread(target=__read_callback_data, args=(data_callback_queue,), daemon=True).start()
    __set_response(response_queue)