"""
|
客户端管理,用于向客户端发送指令
|
"""
|
|
# 操作l2的代码
|
import json
|
import logging
|
import threading
|
|
import data_process
|
import gpcode_manager
|
import l2_data_manager
|
|
import server
|
import tool
|
import trade_manager
|
import time
|
import redis_manager
|
from log import logger_code_operate
|
|
|
class L2CodeOperate(object):
|
__instance = None
|
__lock = threading.RLock()
|
redis_manager_ = redis_manager.RedisManager()
|
|
@classmethod
|
def getRedis(cls):
|
return cls.redis_manager_.getRedis()
|
|
@classmethod
|
def get_instance(cls, *args, **kwargs):
|
if not hasattr(cls, "ins"):
|
insObject = cls(*args, **kwargs)
|
setattr(cls, "ins", insObject)
|
return getattr(cls, "ins")
|
|
@staticmethod
|
def setGPCode(client_id, position, gpcode):
|
data = {"action": "setGPCode", "data": {"index": int(position), "code": gpcode}}
|
logger_code_operate.info("setGPCode:clientid-{} position-{} code-{}".format(client_id, position, gpcode))
|
gpcode_manager.set_operate(gpcode)
|
try:
|
result = server.send_msg(client_id, data)
|
logger_code_operate.info(
|
"setGPCode结束({}):clientid-{} position-{} code-{}".format(result, client_id, position, gpcode))
|
jsonData = json.loads(result)
|
if jsonData["code"] == 0:
|
gpcode_manager.set_listen_code_by_pos(client_id, position, gpcode)
|
L2CodeOperate.set_operate_code_state(client_id, position, 1)
|
|
except Exception as e:
|
logger_code_operate.error("setGPCode出错:{}", str(e))
|
finally:
|
gpcode_manager.rm_operate(gpcode)
|
|
@classmethod
|
def run(cls):
|
cls.__lock.acquire()
|
try:
|
t1 = threading.Thread(target=lambda: L2CodeOperate.send_operate())
|
# 后台运行
|
t1.setDaemon(True)
|
t1.start()
|
finally:
|
cls.__lock.release()
|
|
@staticmethod
|
def send_operate():
|
redis = L2CodeOperate.getRedis()
|
while True:
|
try:
|
data = redis.lpop("code_operate_queue")
|
# print("读取操作队列", data, redis.llen("code_operate_queue"))
|
if data is not None:
|
data = json.loads(data)
|
logger_code_operate.info("读取操作队列:{}", data)
|
type, code = data["type"], data["code"]
|
create_time = data.get("create_time")
|
if create_time is not None:
|
# 设置10s超时时间
|
if round(time.time() * 1000) - create_time > 20 * 1000:
|
logger_code_operate.debug("读取操作超时:{}", data)
|
continue
|
|
if type == 0:
|
# 是否在固定库
|
if l2_data_manager.is_in_l2_fixed_codes(code):
|
continue
|
if gpcode_manager.is_listen(code) and not gpcode_manager.is_operate(code):
|
client_id, pos = gpcode_manager.get_listen_code_pos(code)
|
if client_id is not None and pos is not None:
|
L2CodeOperate.setGPCode(client_id, pos, "")
|
elif type == 1:
|
if trade_manager.is_in_forbidden_trade_codes(code):
|
continue
|
|
if not gpcode_manager.is_listen(code) and not gpcode_manager.is_operate(
|
code) and not gpcode_manager.is_listen_full():
|
client_id, pos = gpcode_manager.get_can_listen_pos()
|
if pos is not None and client_id is not None:
|
L2CodeOperate.setGPCode(client_id, pos, code)
|
# 强制设置
|
elif type == 2:
|
client_id = data["client"]
|
pos = data["pos"]
|
state = L2CodeOperate.get_instance().get_operate_code_state(client_id, pos)
|
if state == 1:
|
continue
|
code_ = gpcode_manager.get_listen_code_by_pos(client_id, pos)
|
if code_ == "" or code_ is None:
|
continue
|
|
logger_code_operate.info("修复代码一致:{}-{}-{}", client_id, pos, code)
|
|
L2CodeOperate.setGPCode(client_id, pos, code)
|
# 修复l2的数据错误
|
elif type == 3:
|
if tool.is_trade_time():
|
client = data["client"]
|
data = data["data"]
|
result = server.send_msg(client, data)
|
print("L2數據修復結果:", result)
|
else:
|
print("非交易时间,放弃修复L2")
|
elif type == 4:
|
# 清理监听位置
|
client = data["client"]
|
pos = data["pos"]
|
L2CodeOperate.setGPCode(client, pos, "")
|
|
|
else:
|
time.sleep(1)
|
except Exception as e:
|
logging.exception(e)
|
print("发送操作异常:", str(e))
|
|
def add_operate(self, type, code, msg="", client=None, pos=None):
|
redis = self.redis_manager_.getRedis()
|
redis.rpush("code_operate_queue",
|
json.dumps({"type": type, "msg": msg, "code": code, "client": client, "pos": pos,
|
"create_time": round(time.time() * 1000)}))
|
|
def repaire_operate(self, client, pos, code):
|
# 如果本来该位置代码为空则不用修复
|
code_ = gpcode_manager.get_listen_code_by_pos(client, pos)
|
if code_ == "" or code_ is None:
|
return
|
logger_code_operate.info("客户端位置代码修复:client-{},pos-{},code-{}", client, pos, code)
|
self.add_operate(2, code, "", client, pos)
|
|
# 修复l2的数据错误
|
def repaire_l2_data(self, code):
|
logger_code_operate.info("修复单票的L2数据:" + code)
|
client_id, pos = gpcode_manager.get_listen_code_pos(code)
|
if client_id is not None and pos is not None:
|
# 获取涨停价与跌停价
|
max_price = gpcode_manager.get_limit_up_price(code)
|
min_price = gpcode_manager.get_limit_down_price(code)
|
data = {"action": "repairL2Data",
|
"data": {"index": int(pos), "code": code, "min_price": float(min_price),
|
"max_price": float(max_price)}}
|
redis = self.redis_manager_.getRedis()
|
redis.rpush("code_operate_queue", json.dumps({"type": 3, "code": code, "client": client_id, "data": data, "create_time": round(time.time() * 1000)}))
|
|
# 移除监控
|
def remove_l2_listen(self, code, msg):
|
# 是否正在监听
|
if gpcode_manager.is_listen(code):
|
self.add_operate(0, code, msg=msg)
|
|
# 设置代码操作状态,服务器保存的代码是否与实际设置的代码保持一致
|
@classmethod
|
def set_operate_code_state(cls, client_id, channel, state):
|
cls.getRedis().setex("code-operate_state-{}-{}".format(client_id, channel), 10, state)
|
|
def get_operate_code_state(self, client_id, channel):
|
value = self.getRedis().get("code-operate_state-{}-{}".format(client_id, channel))
|
if value is not None:
|
return int(value)
|
return value
|
|
|
# 获取客户端正在监听的代码
|
def get_listen_codes_from_client(client_id):
|
data = {"action": "getL2Codes"}
|
result = server.send_msg(client_id, data)
|
result = json.loads(result)
|
if result["code"] == 0:
|
data = json.loads(result["data"])
|
codes = data["data"]
|
result_list = {}
|
if codes is not None:
|
for d in codes:
|
result_list[d["index"]] = d["code"]
|
return result_list
|
else:
|
raise Exception("获取客户端监听代码出错:{}".format(result))
|
|
|
# 矫正客户端代码
|
def correct_client_codes():
|
client_ids = data_process.getValidL2Clients()
|
for client_id in client_ids:
|
try:
|
index_codes = get_listen_codes_from_client(client_id)
|
for index in range(0, 8):
|
code = gpcode_manager.get_listen_code_by_pos(client_id, index)
|
if code is not None and len(code) > 0 and index_codes.get(index) != code:
|
# 交易时间才修复代码
|
if tool.is_trade_time():
|
L2CodeOperate().repaire_operate(client_id, index, code)
|
elif code is None or len(code) == 0 and index_codes.get(index) is not None:
|
# 删除前端代码位
|
# L2CodeOperate().add_operate(4, "", client_id, index)
|
pass
|
except Exception as e:
|
logger_code_operate.error("client:{} msg:{}".format(client_id, str(e)))
|
|
|
if __name__ == "__main__":
|
correct_client_codes()
|