"""
|
客户端管理,用于向客户端发送指令
|
"""
|
|
# 操作l2的代码
|
import json
|
import logging
|
import threading
|
|
from db.redis_manager_delegate import RedisUtils
|
from ths import client_manager
|
from code_attribute import gpcode_manager
|
from l2 import l2_data_manager
|
from trade import l2_trade_util
|
|
from servers import server
|
from utils import tool
|
import time
|
from db import redis_manager_delegate as redis_manager
|
from log_module.log import logger_code_operate
|
|
__reset_code_dict = {}
|
__set_operate_code_state_dict = {}
|
|
|
class L2CodeOperate(object):
|
__instance = None
|
__lock = threading.RLock()
|
redis_manager_ = redis_manager.RedisManager()
|
|
@classmethod
|
def getRedis(cls):
|
return cls.redis_manager_.getRedis()
|
|
@classmethod
|
def get_instance(cls, *args, **kwargs):
|
if not hasattr(cls, "ins"):
|
insObject = cls(*args, **kwargs)
|
setattr(cls, "ins", insObject)
|
return getattr(cls, "ins")
|
|
@staticmethod
|
def setGPCode(client_id, position, gpcode):
|
print("setGPCode")
|
data = {"action": "setGPCode", "data": {"index": int(position), "code": gpcode}}
|
logger_code_operate.info("setGPCode:clientid-{} position-{} code-{}".format(client_id, position, gpcode))
|
gpcode_manager.set_operate(gpcode)
|
try:
|
result = server.send_msg(client_id, data)
|
logger_code_operate.info(
|
"setGPCode结束({}):clientid-{} position-{} code-{}".format(result, client_id, position, gpcode))
|
jsonData = json.loads(result)
|
if jsonData["code"] == 0:
|
gpcode_manager.set_listen_code_by_pos(client_id, position, gpcode)
|
L2CodeOperate.set_operate_code_state(client_id, position, 1)
|
|
except Exception as e:
|
logging.exception(e)
|
logger_code_operate.error("setGPCode出错:{}", str(e))
|
finally:
|
gpcode_manager.rm_operate(gpcode)
|
|
@staticmethod
|
def betchSetGPCode(client_id, codes_info):
|
# codes_info 格式[(0,"000333")]
|
datas = []
|
for info in codes_info:
|
datas.append({"index": info[0], "code": info[1]})
|
|
data = {"action": "betchSetGPCodes", "data": datas, "force": True}
|
logger_code_operate.info("betchSetGPCodes:clientid-{} info-{}".format(client_id, codes_info))
|
codes = []
|
for item in codes_info:
|
codes.append(item[1])
|
gpcode_manager.set_operates(codes)
|
try:
|
result = server.send_msg(client_id, data)
|
logger_code_operate.info(
|
"betchSetGPCodes结束({}):clientid-{} info-{}".format(result, client_id, codes_info))
|
jsonData = json.loads(result)
|
if jsonData["code"] == 0:
|
for item in codes_info:
|
gpcode_manager.set_listen_code_by_pos(client_id, item[0], item[1])
|
L2CodeOperate.set_operate_code_state(client_id, item[0], 1)
|
return True
|
else:
|
return False
|
|
except Exception as e:
|
logging.exception(e)
|
logger_code_operate.error("setGPCode出错:{}", str(e))
|
finally:
|
gpcode_manager.rm_operates(codes)
|
return False
|
|
@classmethod
|
def run(cls):
|
cls.__lock.acquire()
|
try:
|
t1 = threading.Thread(target=lambda: L2CodeOperate.send_operate())
|
# 后台运行
|
t1.setDaemon(True)
|
t1.start()
|
finally:
|
cls.__lock.release()
|
|
@classmethod
|
def send_operate(cls):
|
redis = L2CodeOperate.getRedis()
|
while True:
|
cls.set_read_queue_valid()
|
try:
|
data = RedisUtils.lpop(redis, "code_operate_queue", auto_free= False)
|
# print("读取操作队列", data, redis.llen("code_operate_queue"))
|
if data is not None:
|
data = json.loads(data)
|
# logger_code_operate.info("读取操作队列:{}", data)
|
type, code = data["type"], data.get("code")
|
create_time = data.get("create_time")
|
if create_time is not None:
|
# 设置10s超时时间
|
if round(time.time() * 1000) - create_time > 15 * 1000:
|
# logger_code_operate.debug("读取操作超时:{}", data)
|
continue
|
|
if type == 0:
|
# 是否在固定库
|
if l2_data_manager.is_in_l2_fixed_codes(code):
|
continue
|
if gpcode_manager.is_listen_old(code) and not gpcode_manager.is_operate(code):
|
client_id, pos = gpcode_manager.get_listen_code_pos(code)
|
if client_id is not None and pos is not None:
|
L2CodeOperate.setGPCode(client_id, pos, "")
|
elif type == 1:
|
if l2_trade_util.is_in_forbidden_trade_codes(code):
|
continue
|
|
if not gpcode_manager.is_listen_old(code) and not gpcode_manager.is_operate(
|
code) and not gpcode_manager.is_listen_full():
|
client_id, pos = gpcode_manager.get_can_listen_pos()
|
if pos is not None and client_id is not None:
|
L2CodeOperate.setGPCode(client_id, pos, code)
|
elif type == 10:
|
# 批量设置代码,通常在9:25-9:27期间设置
|
client_id = data.get("client_id")
|
codes = data[codes]
|
# TODO 需要完善分配
|
|
|
|
# 强制设置
|
elif type == 2:
|
client_id = data["client"]
|
pos = data["pos"]
|
state = L2CodeOperate.get_instance().get_operate_code_state(client_id, pos)
|
if state == 1:
|
continue
|
code_ = gpcode_manager.get_listen_code_by_pos(client_id, pos)
|
if code_ == "" or code_ is None:
|
continue
|
|
logger_code_operate.info("修复代码一致:{}-{}-{}", client_id, pos, code)
|
|
L2CodeOperate.setGPCode(client_id, pos, code)
|
# 修复l2的数据错误
|
elif type == 3:
|
if tool.is_repaire_time():
|
client = data["client"]
|
data = data["data"]
|
result = server.send_msg(client, data)
|
print("L2數據修復結果:", result)
|
else:
|
print("非修复时间,放弃修复L2")
|
elif type == 4:
|
# 清理监听位置
|
client = data["client"]
|
pos = data["pos"]
|
L2CodeOperate.setGPCode(client, pos, "")
|
else:
|
time.sleep(1)
|
except Exception as e:
|
logging.exception(e)
|
print("发送操作异常:", str(e))
|
|
def add_operate(self, type, code, msg="", client=None, pos=None):
|
# 09:25:10之后才能操作
|
if int(tool.get_now_time_str().replace(":", "")) < int("092510"):
|
return
|
RedisUtils.rpush(self.redis_manager_.getRedis(), "code_operate_queue",
|
json.dumps({"type": type, "msg": msg, "code": code, "client": client, "pos": pos,
|
"create_time": round(time.time() * 1000)}))
|
|
def repaire_operate(self, client, pos, code):
|
# 如果本来该位置代码为空则不用修复
|
code_ = gpcode_manager.get_listen_code_by_pos(client, pos)
|
if code_ == "" or code_ is None:
|
return
|
logger_code_operate.info("客户端位置代码修复:client-{},pos-{},code-{}", client, pos, code)
|
self.add_operate(2, code, "", client, pos)
|
|
# 修复l2的数据错误
|
def repaire_l2_data(self, code):
|
logger_code_operate.info("修复单票的L2数据:" + code)
|
client_id, pos = gpcode_manager.get_listen_code_pos(code)
|
if client_id is not None and pos is not None:
|
# 获取涨停价与跌停价
|
max_price = gpcode_manager.get_limit_up_price(code)
|
min_price = gpcode_manager.get_limit_down_price(code)
|
data = {"action": "repairL2Data",
|
"data": {"index": int(pos), "code": code, "min_price": float(min_price),
|
"max_price": float(max_price)}}
|
RedisUtils.rpush(self.redis_manager_.getRedis(), "code_operate_queue", json.dumps(
|
{"type": 3, "code": code, "client": client_id, "data": data, "create_time": round(time.time() * 1000)}))
|
|
# 移除监控
|
def remove_l2_listen(self, code, msg):
|
# 是否正在监听
|
if gpcode_manager.is_listen_old(code):
|
self.add_operate(0, code, msg=msg)
|
|
# 设置代码操作状态,服务器保存的代码是否与实际设置的代码保持一致
|
@classmethod
|
def set_operate_code_state(cls, client_id, channel, state):
|
RedisUtils.setex(cls.getRedis(), "code-operate_state-{}-{}".format(client_id, channel), 10, state)
|
|
def get_operate_code_state(self, client_id, channel):
|
value = RedisUtils.get(self.getRedis(), "code-operate_state-{}-{}".format(client_id, channel))
|
if value is not None:
|
return int(value)
|
return value
|
|
# 设置读取队列有效
|
@classmethod
|
def set_read_queue_valid(cls):
|
RedisUtils.setex(cls.getRedis(), "operate_queue_read_state", 20, 1)
|
|
@classmethod
|
def is_read_queue_valid(cls):
|
return RedisUtils.get(cls.getRedis(), "operate_queue_read_state") is not None
|
|
|
# 通过l2代码校验代码位
|
@tool.async_call
|
def verify_with_l2_data_pos_info(code, client, channel):
|
code_ = gpcode_manager.get_listen_code_by_pos(client, channel)
|
if code_ != code:
|
key = "{}-{}-{}".format(client, channel, code)
|
|
# 间隔2s
|
if key not in __reset_code_dict or round(
|
time.time() * 1000) - __reset_code_dict[key] > 2000:
|
|
L2CodeOperate.set_operate_code_state(client, channel, 0)
|
__reset_code_dict[key] = round(time.time() * 1000)
|
if code_ is None:
|
code_ = ""
|
if tool.is_repaire_time():
|
L2CodeOperate().repaire_operate(int(client), int(channel), code_)
|
else:
|
key = "{}-{}".format(client, channel)
|
if key not in __set_operate_code_state_dict or round(
|
time.time() * 1000) - __set_operate_code_state_dict[key] > 1000:
|
__set_operate_code_state_dict[key] = round(time.time() * 1000)
|
L2CodeOperate.set_operate_code_state(client, channel, 1)
|
|
|
# 获取客户端正在监听的代码
|
def get_listen_codes_from_client(client_id):
|
data = {"action": "getL2Codes"}
|
result = server.send_msg(client_id, data)
|
result = json.loads(result)
|
if result["code"] == 0:
|
data = json.loads(result["data"])
|
codes = data["data"]
|
result_list = {}
|
if codes is not None:
|
for d in codes:
|
result_list[d["index"]] = d["code"]
|
return result_list
|
else:
|
raise Exception("获取客户端监听代码出错:{}".format(result))
|
|
|
# 矫正客户端代码
|
def correct_client_codes():
|
client_ids = client_manager.getValidL2Clients()
|
for client_id in client_ids:
|
try:
|
index_codes = get_listen_codes_from_client(client_id)
|
for index in range(0, 8):
|
code = gpcode_manager.get_listen_code_by_pos(client_id, index)
|
if code is not None and len(code) > 0 and index_codes.get(index) != code:
|
# 修复时间才修复代码
|
if tool.is_repaire_time():
|
L2CodeOperate().repaire_operate(client_id, index, code)
|
elif code is None or len(code) == 0 and index_codes.get(index) is not None:
|
# 删除前端代码位
|
# L2CodeOperate().add_operate(4, "", client_id, index)
|
pass
|
except Exception as e:
|
logger_code_operate.error("client:{} msg:{}".format(client_id, str(e)))
|
|
|
# 批量设置代码
|
def betch_set_client_codes(client_id, codes_info):
|
# 获取涨幅前16位代码
|
return L2CodeOperate.betchSetGPCode(client_id, codes_info)
|
|
|
if __name__ == "__main__":
|
codes = [(0, "000615"), (1, "002264"), (2, "600225"), (3, "002495"), (4, "600572"), (5, "002279"), (6, "002591"),
|
(7, "002880")]
|
L2CodeOperate.betchSetGPCode(3, codes)
|