unknown
2022-08-18 bc6204f30deb7518b6f4738e3c842598cd040545
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
# 操作l2的代码
import json
import os
import queue
import threading
 
import gpcode_manager
import l2_data_manager
 
import server
import tool
import trade_manager
import time
import redis_manager
from log import logger_code_operate
 
 
class L2CodeOperate(object):
    __instance = None
    __lock = threading.RLock()
    redis_manager_ = redis_manager.RedisManager()
 
    @classmethod
    def getRedis(cls):
        return cls.redis_manager_.getRedis()
 
    @classmethod
    def get_instance(cls, *args, **kwargs):
        if not hasattr(cls, "ins"):
            insObject = cls(*args, **kwargs)
            setattr(cls, "ins", insObject)
        return getattr(cls, "ins")
 
    @staticmethod
    def setGPCode(client_id, position, gpcode):
        data = {"action": "setGPCode", "data": {"index": int(position), "code": gpcode}}
        logger_code_operate.info("setGPCode:clientid-{}  position-{} code-{}".format(client_id, position, gpcode))
        gpcode_manager.set_operate(gpcode)
        try:
            result = server.send_msg(client_id, data)
            logger_code_operate.info(
                "setGPCode结束({}):clientid-{}  position-{} code-{}".format(result, client_id, position, gpcode))
            if result.__contains__('OK'):
                gpcode_manager.set_listen_code_by_pos(client_id, position, gpcode)
 
        except Exception as e:
            logger_code_operate.error("setGPCode出错:{}", str(e))
        finally:
            gpcode_manager.rm_operate(gpcode)
 
    @classmethod
    def run(cls):
        cls.__lock.acquire()
        try:
            t1 = threading.Thread(target=lambda: L2CodeOperate.send_operate())
            # 后台运行
            t1.setDaemon(True)
            t1.start()
        finally:
            cls.__lock.release()
 
    @staticmethod
    def send_operate():
        redis = L2CodeOperate.getRedis()
        while True:
            try:
                data = redis.lpop("code_operate_queue")
                print("读取操作队列", data, redis.llen("code_operate_queue"))
                if data is not None:
                    data = json.loads(data)
                    type, code = data["type"], data["code"]
                    if type == 0:
                        # 是否在固定库
                        if l2_data_manager.is_in_l2_fixed_codes(code):
                            continue
                        if gpcode_manager.is_listen(code) and not gpcode_manager.is_operate(code):
                            client_id, pos = gpcode_manager.get_listen_code_pos(code)
                            if client_id is not None and pos is not None:
                                L2CodeOperate.setGPCode(client_id, pos, "")
                    elif type == 1:
                        if trade_manager.is_in_forbidden_trade_codes(code):
                            continue
 
                        if not gpcode_manager.is_listen(code) and not gpcode_manager.is_operate(
                                code) and not gpcode_manager.is_listen_full():
                            client_id, pos = gpcode_manager.get_can_listen_pos()
                            if pos is not None and client_id is not None:
                                L2CodeOperate.setGPCode(client_id, pos, code)
                    # 强制设置
                    elif type == 2:
                        client_id = data["client"]
                        pos = data["pos"]
                        state = L2CodeOperate.get_instance().get_operate_code_state(client_id, pos)
                        if state == 1:
                            continue
                        code_ = gpcode_manager.get_listen_code_by_pos(client_id, pos)
                        if code_ == "" or code_ is None:
                            continue
 
                        logger_code_operate.info("修复代码一致:{}-{}-{}", client, pos, code)
 
                        L2CodeOperate.setGPCode(client_id, pos, code)
                    # 修复l2的数据错误
                    elif type == 3:
                        data = data["data"]
                        client = data["client"]
                        server.send_msg(client, json.dumps(data))
 
                else:
                    time.sleep(1)
            except:
                print("发送操作异常")
 
    def add_operate(self, type, code):
        redis = self.redis_manager_.getRedis()
        print("add_operate", type, code)
        redis.rpush("code_operate_queue", json.dumps({"type": type, "code": code}))
 
    def repaire_operate(self, client, pos, code):
 
        redis = self.redis_manager_.getRedis()
        redis.rpush("code_operate_queue", json.dumps({"type": 2, "client": client, "pos": pos, "code": code}))
 
    # 修复l2的数据错误
    def repaire_l2_data(self, code):
        logger_code_operate.info("修复单票的L2数据:" + code)
        client_id, pos = gpcode_manager.get_listen_code_pos(code)
        if client_id is not None and pos is not None:
            # 获取涨停价与跌停价
            max_price = gpcode_manager.get_limit_up_price(code)
            min_price = gpcode_manager.get_limit_down_price(code)
            data = {"action": "repairL2Data",
                    "data": {"index": int(pos), "code": code, "min_price": float(min_price),
                             "max_price": float(max_price)}}
        redis = self.redis_manager_.getRedis()
        redis.rpush("code_operate_queue", json.dumps({"type": 3, "client": client_id, "data": data}))
 
    # 移除监控
    def remove_l2_listen(self, code):
        # 是否正在监听
        if gpcode_manager.is_listen(code):
            self.add_operate(0, code)
 
    # 设置代码操作状态,服务器保存的代码是否与实际设置的代码保持一致
    def set_operate_code_state(self, client_id, channel, state):
        self.getRedis().setex("code-operate_state-{}-{}".format(client_id, channel), tool.get_expire(), state)
 
    def get_operate_code_state(self, client_id, channel):
        value = self.getRedis().get("code-operate_state-{}-{}".format(client_id, channel))
        if value is not None:
            return int(value)
        return value