import json
|
import logging
|
import socketserver
|
import socket
|
import time
|
|
import data_process
|
import gpcode_manager
|
import authority
|
import juejin
|
import l2_data_manager
|
import tool
|
import trade_manager
|
import l2_code_operate
|
|
from log import logger_l2_error, logger_l2_process, logger_device, logger_trade_delegate
|
|
|
class MyTCPServer(socketserver.TCPServer):
|
def __init__(self, server_address, RequestHandlerClass, bind_and_activate=True, pipe=None):
|
self.pipe = pipe # 增加的参数
|
socketserver.TCPServer.__init__(self, server_address, RequestHandlerClass, bind_and_activate=bind_and_activate)
|
|
|
# 如果使用异步的形式则需要再重写ThreadingTCPServer
|
class MyThreadingTCPServer(socketserver.ThreadingMixIn, MyTCPServer): pass
|
|
|
class MyBaseRequestHandle(socketserver.BaseRequestHandler):
|
reset_code_dict = {}
|
set_operate_code_state_dict = {}
|
l2_data_error_dict = {}
|
last_trade_delegate_data = None
|
|
def setup(self):
|
super().setup() # 可以不调用父类的setup()方法,父类的setup方法什么都没做
|
# print("----setup方法被执行-----")
|
# print("打印传入的参数:", self.server.pipe)
|
self.l2CodeOperate = l2_code_operate.L2CodeOperate.get_instance()
|
|
def handle(self):
|
host = self.client_address[0]
|
super().handle() # 可以不调用父类的handler(),方法,父类的handler方法什么都没做
|
# print("-------handler方法被执行----")
|
# print(self.server)
|
# print(self.request) # 服务
|
# print("客户端地址:", self.client_address) # 客户端地址
|
# print(self.__dict__)
|
# print("- " * 30)
|
# print(self.server.__dict__)
|
# print("- " * 30)
|
sk: socket.socket = self.request
|
while True:
|
data = sk.recv(102400)
|
if len(data) == 0:
|
# print("客户端断开连接")
|
break;
|
_str = data.decode()
|
if len(_str) > 0:
|
# print("结果:",_str)
|
type = data_process.parseType(_str)
|
return_str = "OK"
|
if type == 0:
|
|
try:
|
__start_time = round(time.time() * 1000)
|
# level2盘口数据
|
day, client, channel, code, datas = l2_data_manager.parseL2Data(_str)
|
|
try:
|
# print("L2数据接受",day,code,len(datas))
|
# 查询
|
code_ = gpcode_manager.get_listen_code_by_pos(client, channel)
|
if code_ != code:
|
key = "{}-{}-{}".format(client, channel, code)
|
|
# 间隔2s
|
if key not in self.reset_code_dict or round(
|
time.time() * 1000) - self.reset_code_dict[key] > 2000:
|
|
self.l2CodeOperate.set_operate_code_state(client, channel, 0)
|
self.reset_code_dict[key] = round(time.time() * 1000)
|
if code_ is None:
|
code_ = ""
|
if tool.is_trade_time():
|
self.l2CodeOperate.repaire_operate(int(client), int(channel), code_)
|
else:
|
key = "{}-{}".format(client, channel)
|
if key not in self.set_operate_code_state_dict or round(
|
time.time() * 1000) - self.set_operate_code_state_dict[key] > 1000:
|
self.set_operate_code_state_dict[key] = round(time.time() * 1000)
|
self.l2CodeOperate.set_operate_code_state(client, channel, 1)
|
|
if gpcode_manager.is_listen(code):
|
l2_data_manager.process_data(code, datas)
|
except l2_data_manager.L2DataException as l:
|
# 单价不符
|
if l.get_code() == l2_data_manager.L2DataException.CODE_PRICE_ERROR:
|
key = "{}-{}-{}".format(client, channel, code)
|
if key not in self.l2_data_error_dict or round(
|
time.time() * 1000) - self.l2_data_error_dict[key] > 2000:
|
self.l2CodeOperate.repaire_l2_data(code)
|
self.l2_data_error_dict[key] = round(time.time() * 1000)
|
|
except Exception as e:
|
print("异常", str(e))
|
logging.exception(e)
|
logger_l2_error.error("出错:{}".format(str(e)))
|
logger_l2_error.error("内容:{}".format(_str))
|
finally:
|
__end_time = round(time.time() * 1000)
|
# 只记录大于40ms的数据
|
if __end_time - __start_time > 40:
|
logger_l2_process.info("l2处理时间:{}-{}".format(code, __end_time - __start_time));
|
except:
|
pass
|
|
|
|
|
elif type == 10:
|
# level2交易队列
|
try:
|
code, setData = data_process.parseL2TradeQueueData(_str)
|
if gpcode_manager.is_listen(code):
|
data_process.saveL2Data(day, code, setData)
|
except:
|
print("异常")
|
|
elif type == 1:
|
# 设置股票代码
|
data_list = data_process.parseGPCode(_str)
|
data_process.saveZYLTSZ(data_list)
|
code_list = []
|
for data in data_list:
|
code_list.append(data["code"])
|
|
gpcode_manager.set_gp_list(code_list)
|
# 重新订阅
|
self.server.pipe.send(json.dumps({"type": "resub"}))
|
sync_target_codes_to_ths()
|
elif type == 2:
|
# 涨停代码
|
codeList = data_process.parseGPCode(_str)
|
gpcode_manager.set_limit_up_list(codeList)
|
elif type == 3:
|
# 交易成功信息
|
dataList = data_process.parseList(_str)
|
try:
|
trade_manager.process_trade_success_data(dataList)
|
except Exception as e:
|
logging.exception(e)
|
trade_manager.save_trade_success_data(dataList)
|
|
elif type == 5:
|
# 交易委托信息
|
dataList = data_process.parseList(_str)
|
if self.last_trade_delegate_data != _str:
|
self.last_trade_delegate_data = _str
|
# 保存委托信息
|
logger_trade_delegate.info(dataList)
|
try:
|
trade_manager.process_trade_delegate_data(dataList)
|
except Exception as e:
|
logging.exception(e)
|
trade_manager.save_trade_delegate_data(dataList)
|
|
elif type == 4:
|
# 行业代码信息
|
dataList = data_process.parseList(_str)
|
data_process.saveIndustryCode(dataList)
|
elif type == 6:
|
# 可用金额
|
datas = data_process.parseData(_str)
|
client = datas["client"]
|
money = datas["money"]
|
# TODO存入缓存文件
|
trade_manager.set_available_money(client, money)
|
elif type == 20:
|
# 登录
|
data = data_process.parse(_str)["data"]
|
try:
|
client_id, _authoritys = authority.login(data["account"], data["pwd"])
|
return_str = data_process.toJson(
|
{"code": 0, "data": {"client": int(client_id), "authoritys": json.loads(_authoritys)}})
|
except Exception as e:
|
return_str = data_process.toJson({"code": 1, "msg": str(e)})
|
elif type == 40:
|
data = data_process.parse(_str)["data"]
|
for item in data:
|
juejin.accpt_price(item["code"], float(item["price"]))
|
|
elif type == 30:
|
data = data_process.parse(_str)["data"]
|
client_id = data["client"]
|
if "memery" in data:
|
mem = data["memery"]
|
logger_device.info("({})内存使用率:{}".format(client_id, mem))
|
data_process.saveClientActive(int(client_id), host)
|
# print("心跳:", client_id)
|
|
sk.send(return_str.encode())
|
|
# print("----------handler end ----------")
|
|
def finish(self):
|
super().finish() # 可以不调用父类的finish(),方法,父类的finish方法什么都没做
|
# print("--------finish方法被执行---")
|
|
|
def send_msg(client_id, data):
|
_ip = data_process.getActiveClientIP(client_id)
|
print("ip", client_id, _ip)
|
if _ip is None or len(_ip) <= 0:
|
raise Exception("客户端IP为空")
|
socketClient = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
socketClient.connect((_ip, 9006))
|
# 连接socket
|
try:
|
socketClient.send(json.dumps(data).encode())
|
recv = socketClient.recv(1024)
|
result = recv.decode().lstrip()
|
return result
|
finally:
|
socketClient.close()
|
|
|
# 客户端心跳机制
|
def test_client_server():
|
while True:
|
clients = authority.get_l2_clients()
|
for client in clients:
|
print("心跳", client)
|
try:
|
send_msg(client, {"action": "test"})
|
except:
|
pass
|
|
time.sleep(5)
|
|
|
# 获取采集客户端的状态
|
def get_client_env_state(client):
|
result = send_msg(client, {"action": "getEnvState"})
|
result = json.loads(result)
|
if result["code"] == 0:
|
return json.loads(result["data"])
|
else:
|
raise Exception(result["msg"])
|
|
|
# 修复采集客户端
|
def repair_client_env(client):
|
result = send_msg(client, {"action": "repairEnv"})
|
result = json.loads(result)
|
if result["code"] != 0:
|
raise Exception(result["msg"])
|
|
|
# 同步目标标的到同花顺
|
def sync_target_codes_to_ths():
|
codes = gpcode_manager.get_gp_list()
|
code_list = []
|
for code in codes:
|
code_list.append(code)
|
client = authority._get_client_ids_by_rule("client-industry")
|
result = send_msg(client[0], {"action": "syncTargetCodes", "data": code_list})
|
return result
|
|
|
if __name__ == "__main__":
|
try:
|
result = get_client_env_state(3)
|
print(result)
|
except Exception as e:
|
print(str(e))
|