| | |
| | | # -*- coding: utf-8 -*- |
| | | import json |
| | | import logging |
| | | import multiprocessing |
| | | import os |
| | | import threading |
| | | import time |
| | |
| | | __latest_subscript_codes = set() |
| | | |
| | | |
| | | def __upload_codes_info(pipe_l2, datas): |
| | | def __upload_codes_info(queue_l1_w_strategy_r: multiprocessing.Queue, datas): |
| | | if not tool.is_trade_time(): |
| | | return |
| | | # 上传数据 |
| | |
| | | request_id = f"sb_{int(time.time() * 1000)}" |
| | | fdata = json.dumps( |
| | | {"type": type_, "data": {"data": datas}, "request_id": request_id, "time": round(time.time() * 1000, 0)}) |
| | | if pipe_l2 is not None: |
| | | pipe_l2.send(fdata) |
| | | if queue_l1_w_strategy_r is not None: |
| | | queue_l1_w_strategy_r.put_nowait(fdata) |
| | | # 记录新增加的代码 |
| | | codes = set([x[0] for x in datas]) |
| | | add_codes = codes - __latest_subscript_codes |
| | |
| | | pass |
| | | |
| | | |
| | | def run(pipe_l2): |
| | | def run(queue_l1_w_strategy_r): |
| | | logger_local_huaxin_l1.info("运行l1订阅服务") |
| | | codes_sh = [] |
| | | codes_sz = [] |
| | |
| | | # 测试链路 |
| | | # level1_data_dict["000969"] = ( |
| | | # "000969", 9.46, 9.11, 771000*100, time.time()) |
| | | # level1_data_dict["000961"] = ( |
| | | # "000961",1.93, 10.29, 2638000 * 100, time.time()) |
| | | level1_data_dict["002292"] = ( |
| | | "002292", 8.06, 9.96, 969500 * 100, time.time()) |
| | | |
| | | # 等待程序结束 |
| | | while True: |
| | |
| | | codes = [x[0] for x in datas] |
| | | print("代码数量:", len(datas)) |
| | | logger_l2_codes_subscript.info("开始#华鑫L1上传代码:数量-{}", len(datas)) |
| | | __upload_codes_info(pipe_l2, datas) |
| | | __upload_codes_info(queue_l1_w_strategy_r, datas) |
| | | except Exception as e: |
| | | logging.exception(e) |
| | | finally: |