admin
2023-08-04 ca310f014336d93eba73ed5010c1c5645424a1e0
l2_data_manager.py
@@ -4,6 +4,7 @@
import random
import threading
import time
import socket_util
from client_network import SendResponseSkManager
from mylog import logger_l2_error, logger_l2_upload
@@ -15,6 +16,7 @@
tmep_order_detail_queue_dict = {}
tmep_transaction_queue_dict = {}
target_codes = set()
common_queue = queue.Queue()
# 添加委托详情
@@ -59,24 +61,24 @@
def add_market_data(data):
    code = data['securityID']
    upload_data(code, "l2_market_data", data)
    # 加入上传队列
    common_queue.put((code, "l2_market_data", data))
def add_subscript_codes(codes):
    upload_data('', "l2_subscript_codes", list(codes))
    print("add_subscript_codes", codes)
    # 加入上传队列
    common_queue.put(('', "l2_subscript_codes", list(codes)))
def __send_response(sk, msg):
    msg = SendResponseSkManager.format_response(msg)
    msg = socket_util.load_header(msg)
    sk.sendall(msg)
    while True:
        result = sk.recv(1024)
        if result:
            result = result.decode("utf-8")
            result_json = json.loads(result)
            if result_json.get("code") == 0:
                return True
            break
    result, header_str = socket_util.recv_data(sk)
    if result:
        result_json = json.loads(result)
        if result_json.get("code") == 0:
            return True
    return False
@@ -102,7 +104,6 @@
# 上传数据
def upload_data(code, _type, datas):
    print(time.time() * 1000)
    uid = random.randint(0, 100000)
    key = f"{_type}_{code}"
    fdata = json.dumps(
@@ -111,6 +112,7 @@
    # print("请求开始", uid, len(datas), len(fdata), f"{fdata[:20]}...{fdata[-20:]}")
    result = None
    start_time = time.time()
    logger_l2_upload.info(f"{code} 上传数据开始-{_type}")
    try:
        result = send_response(key, fdata.encode('utf-8'))
    except Exception as e:
@@ -138,7 +140,7 @@
            if udatas:
                upload_data(code, "l2_order", udatas)
            time.sleep(0.04)
            time.sleep(0.01)
        except Exception as e:
            logger_l2_error.error(f"上传订单数据出错:{str(e)}")
@@ -160,9 +162,22 @@
                udatas.append(temp)
            if udatas:
                upload_data(code, "l2_trans", udatas)
            time.sleep(0.1)
            time.sleep(0.01)
        except Exception as e:
            logger_l2_error.error(f"上传成交数据出错:{str(e)}")
def __run_upload_common():
    print("__run_upload_common")
    while True:
        try:
            while not common_queue.empty():
                temp = common_queue.get()
                upload_data(temp[0], temp[1], temp[2])
            time.sleep(0.01)
        except Exception as e:
            logger_l2_error.exception(e)
            logger_l2_error.error(f"上传普通数据出错:{str(e)}")
# 运行上传任务
@@ -180,6 +195,11 @@
        t.start()
def run_upload_common():
    t = threading.Thread(target=lambda: __run_upload_common(), daemon=True)
    t.start()
if __name__ == "__main__":
    code = "603809"
    target_codes.add(code)