Administrator
2022-09-08 e7f8c6013d777dd5ba10b8d548d2d3db6158d37a
l2_code_operate.py
@@ -5,6 +5,7 @@
import queue
import threading
import data_process
import gpcode_manager
import l2_data_manager
@@ -105,20 +106,30 @@
                        L2CodeOperate.setGPCode(client_id, pos, code)
                    # 修复l2的数据错误
                    elif type == 3:
                        if tool.is_trade_time():
                            client = data["client"]
                            data = data["data"]
                            result = server.send_msg(client, data)
                            print("L2數據修復結果:", result)
                        else:
                            print("非交易时间,放弃修复L2")
                    elif type == 4:
                        # 清理监听位置
                        client = data["client"]
                        data=data["data"]
                        result = server.send_msg(client, data)
                        print("L2數據修復結果:",result)
                        pos = data["pos"]
                        L2CodeOperate.setGPCode(client, pos, "")
                else:
                    time.sleep(1)
            except Exception as e:
                logging.exception(e)
                print("发送操作异常:",str(e))
                print("发送操作异常:", str(e))
    def add_operate(self, type, code):
    def add_operate(self, type, code, client=None, pos=None):
        redis = self.redis_manager_.getRedis()
        print("add_operate", type, code)
        redis.rpush("code_operate_queue", json.dumps({"type": type, "code": code}))
        redis.rpush("code_operate_queue", json.dumps({"type": type, "code": code, "client": client, "pos": pos}))
    def repaire_operate(self, client, pos, code):
        # 如果本来该位置代码为空则不用修复
@@ -141,8 +152,8 @@
            data = {"action": "repairL2Data",
                    "data": {"index": int(pos), "code": code, "min_price": float(min_price),
                             "max_price": float(max_price)}}
        redis = self.redis_manager_.getRedis()
        redis.rpush("code_operate_queue", json.dumps({"type": 3,"code":code, "client": client_id, "data": data}))
            redis = self.redis_manager_.getRedis()
            redis.rpush("code_operate_queue", json.dumps({"type": 3, "code": code, "client": client_id, "data": data}))
    # 移除监控
    def remove_l2_listen(self, code):
@@ -160,3 +171,41 @@
        if value is not None:
            return int(value)
        return value
# 获取客户端正在监听的代码
def get_listen_codes_from_client(client_id):
    data = {"action": "getL2Codes"}
    result = server.send_msg(client_id, data)
    result = json.loads(result)
    if result["code"] == 0:
        data = json.loads(result["data"])
        codes = data["data"]
        result_list = {}
        for d in codes:
            result_list[d["index"]]=d["code"]
        return result_list
    else:
        raise Exception("获取客户端监听代码出错")
# 矫正客户端代码
def correct_client_codes():
    client_ids = data_process.getValidL2Clients()
    for client_id in client_ids:
        try:
            index_codes = get_listen_codes_from_client(client_id)
            for index in range(0, 8):
                code = gpcode_manager.get_listen_code_by_pos(client_id, index)
                if code is not None and len(code) > 0 and index_codes.get(index) != code:
                    # 修复代码
                    L2CodeOperate().repaire_operate(client_id, index, code)
                elif code is None or len(code) == 0 and index_codes.get(index) is not None:
                    # 删除前端代码位
                    L2CodeOperate().add_operate(4, "", client_id, index)
        except Exception as e:
            logger_code_operate.error("client:{} msg:{}".format(client_id, str(e)))
if __name__ == "__main__":
    correct_client_codes()