# 操作l2的代码
|
import json
|
import os
|
import queue
|
import threading
|
|
import gpcode_manager
|
import l2_data_manager
|
|
import server
|
import tool
|
import trade_manager
|
import time
|
import redis_manager
|
from log import logger_code_operate
|
|
|
class L2CodeOperate(object):
|
__instance = None
|
__lock = threading.RLock()
|
redis_manager_ = redis_manager.RedisManager()
|
|
@classmethod
|
def getRedis(cls):
|
return cls.redis_manager_.getRedis()
|
|
@classmethod
|
def get_instance(cls, *args, **kwargs):
|
if not hasattr(cls, "ins"):
|
insObject = cls(*args, **kwargs)
|
setattr(cls, "ins", insObject)
|
return getattr(cls, "ins")
|
|
@staticmethod
|
def setGPCode(client_id, position, gpcode):
|
data = {"action": "setGPCode", "data": {"index": int(position), "code": gpcode}}
|
logger_code_operate.info("setGPCode:clientid-{} position-{} code-{}".format(client_id, position, gpcode))
|
gpcode_manager.set_operate(gpcode)
|
try:
|
result = server.send_msg(client_id, data)
|
logger_code_operate.info(
|
"setGPCode结束({}):clientid-{} position-{} code-{}".format(result, client_id, position, gpcode))
|
if result.__contains__('OK'):
|
gpcode_manager.set_listen_code_by_pos(client_id, position, gpcode)
|
|
except Exception as e:
|
logger_code_operate.error("setGPCode出错:{}", str(e))
|
finally:
|
gpcode_manager.rm_operate(gpcode)
|
|
@classmethod
|
def run(cls):
|
cls.__lock.acquire()
|
try:
|
t1 = threading.Thread(target=lambda: L2CodeOperate.send_operate())
|
# 后台运行
|
t1.setDaemon(True)
|
t1.start()
|
finally:
|
cls.__lock.release()
|
|
@staticmethod
|
def send_operate():
|
redis = L2CodeOperate.getRedis()
|
while True:
|
try:
|
data = redis.lpop("code_operate_queue")
|
print("读取操作队列", data, redis.llen("code_operate_queue"))
|
if data is not None:
|
data = json.loads(data)
|
type, code = data["type"], data["code"]
|
if type == 0:
|
# 是否在固定库
|
if l2_data_manager.is_in_l2_fixed_codes(code):
|
continue
|
if gpcode_manager.is_listen(code) and not gpcode_manager.is_operate(code):
|
client_id, pos = gpcode_manager.get_listen_code_pos(code)
|
if client_id is not None and pos is not None:
|
L2CodeOperate.setGPCode(client_id, pos, "")
|
elif type == 1:
|
if trade_manager.is_in_forbidden_trade_codes(code):
|
continue
|
|
if not gpcode_manager.is_listen(code) and not gpcode_manager.is_operate(
|
code) and not gpcode_manager.is_listen_full():
|
client_id, pos = gpcode_manager.get_can_listen_pos()
|
if pos is not None and client_id is not None:
|
L2CodeOperate.setGPCode(client_id, pos, code)
|
# 强制设置
|
elif type == 2:
|
client_id = data["client"]
|
pos = data["pos"]
|
state = L2CodeOperate.get_instance().get_operate_code_state(client_id, pos)
|
if state == 1:
|
continue
|
code_ = gpcode_manager.get_listen_code_by_pos(client_id, pos)
|
if code_ == "" or code_ is None:
|
continue
|
|
logger_code_operate.info("修复代码一致:{}-{}-{}", client, pos, code)
|
|
L2CodeOperate.setGPCode(client_id, pos, code)
|
# 修复l2的数据错误
|
elif type == 3:
|
data = data["data"]
|
client = data["client"]
|
server.send_msg(client, json.dumps(data))
|
|
else:
|
time.sleep(1)
|
except:
|
print("发送操作异常")
|
|
def add_operate(self, type, code):
|
redis = self.redis_manager_.getRedis()
|
print("add_operate", type, code)
|
redis.rpush("code_operate_queue", json.dumps({"type": type, "code": code}))
|
|
def repaire_operate(self, client, pos, code):
|
|
redis = self.redis_manager_.getRedis()
|
redis.rpush("code_operate_queue", json.dumps({"type": 2, "client": client, "pos": pos, "code": code}))
|
|
# 修复l2的数据错误
|
def repaire_l2_data(self, code):
|
logger_code_operate.info("修复单票的L2数据:" + code)
|
client_id, pos = gpcode_manager.get_listen_code_pos(code)
|
if client_id is not None and pos is not None:
|
# 获取涨停价与跌停价
|
max_price = gpcode_manager.get_limit_up_price(code)
|
min_price = gpcode_manager.get_limit_down_price(code)
|
data = {"action": "repairL2Data",
|
"data": {"index": int(pos), "code": code, "min_price": float(min_price),
|
"max_price": float(max_price)}}
|
redis = self.redis_manager_.getRedis()
|
redis.rpush("code_operate_queue", json.dumps({"type": 3, "client": client_id, "data": data}))
|
|
# 移除监控
|
def remove_l2_listen(self, code):
|
# 是否正在监听
|
if gpcode_manager.is_listen(code):
|
self.add_operate(0, code)
|
|
# 设置代码操作状态,服务器保存的代码是否与实际设置的代码保持一致
|
def set_operate_code_state(self, client_id, channel, state):
|
self.getRedis().setex("code-operate_state-{}-{}".format(client_id, channel), tool.get_expire(), state)
|
|
def get_operate_code_state(self, client_id, channel):
|
value = self.getRedis().get("code-operate_state-{}-{}".format(client_id, channel))
|
if value is not None:
|
return int(value)
|
return value
|