Administrator
2023-11-02 eb33b717023d9871bd74e6dce47a065228cffefc
huaxin_client/l1_client.py
@@ -1,6 +1,7 @@
# -*- coding: utf-8 -*-
import json
import logging
import multiprocessing
import os
import threading
import time
@@ -122,7 +123,7 @@
__latest_subscript_codes = set()
def __upload_codes_info(pipe_l2, datas):
def __upload_codes_info(queue_l1_w_strategy_r: multiprocessing.Queue, datas):
    if not tool.is_trade_time():
        return
    # 上传数据
@@ -130,8 +131,8 @@
    request_id = f"sb_{int(time.time() * 1000)}"
    fdata = json.dumps(
        {"type": type_, "data": {"data": datas}, "request_id": request_id, "time": round(time.time() * 1000, 0)})
    if pipe_l2 is not None:
        pipe_l2.send(fdata)
    if queue_l1_w_strategy_r is not None:
        queue_l1_w_strategy_r.put_nowait(fdata)
    # 记录新增加的代码
    codes = set([x[0] for x in datas])
    add_codes = codes - __latest_subscript_codes
@@ -160,7 +161,7 @@
        pass
def run(pipe_l2):
def run(queue_l1_w_strategy_r):
    logger_local_huaxin_l1.info("运行l1订阅服务")
    codes_sh = []
    codes_sz = []
@@ -203,8 +204,8 @@
    # 测试链路
    # level1_data_dict["000969"] = (
    #     "000969", 9.46, 9.11, 771000*100, time.time())
    # level1_data_dict["000961"] = (
    #     "000961",1.93, 10.29, 2638000 * 100, time.time())
    level1_data_dict["002292"] = (
        "002292", 8.06, 9.96, 969500 * 100, time.time())
    # 等待程序结束
    while True:
@@ -226,7 +227,7 @@
            codes = [x[0] for x in datas]
            print("代码数量:", len(datas))
            logger_l2_codes_subscript.info("开始#华鑫L1上传代码:数量-{}", len(datas))
            __upload_codes_info(pipe_l2, datas)
            __upload_codes_info(queue_l1_w_strategy_r, datas)
        except Exception as e:
            logging.exception(e)
        finally: