"""
|
接受客户端数据的服务器
|
"""
|
|
import json
|
import logging
|
import socketserver
|
import socket
|
import threading
|
import time
|
|
import alert_util
|
import client_manager
|
import code_volumn_manager
|
import data_process
|
import global_data_loader
|
import global_util
|
import gpcode_manager
|
import authority
|
import juejin
|
import l2_data_log
|
import l2_data_manager
|
import l2_data_manager_new
|
import l2_data_util
|
import ths_industry_util
|
import ths_util
|
import tool
|
import trade_manager
|
import l2_code_operate
|
from code_data_util import ZYLTGBUtil
|
|
from log import logger_l2_error, logger_device, logger_trade_delegate
|
from trade_queue_manager import THSBuy1VolumnManager
|
|
|
class MyTCPServer(socketserver.TCPServer):
|
def __init__(self, server_address, RequestHandlerClass, bind_and_activate=True, pipe_juejin=None, pipe_ui=None):
|
self.pipe_juejin = pipe_juejin # 增加的参数
|
self.pipe_ui = pipe_ui
|
socketserver.TCPServer.__init__(self, server_address, RequestHandlerClass, bind_and_activate=bind_and_activate)
|
|
|
# 如果使用异步的形式则需要再重写ThreadingTCPServer
|
class MyThreadingTCPServer(socketserver.ThreadingMixIn, MyTCPServer): pass
|
|
|
class MyBaseRequestHandle(socketserver.BaseRequestHandler):
|
l2_data_error_dict = {}
|
last_trade_delegate_data = None
|
buy1_volumn_manager = THSBuy1VolumnManager()
|
|
def setup(self):
|
super().setup() # 可以不调用父类的setup()方法,父类的setup方法什么都没做
|
# print("----setup方法被执行-----")
|
# print("打印传入的参数:", self.server.pipe)
|
self.l2CodeOperate = l2_code_operate.L2CodeOperate.get_instance()
|
|
def handle(self):
|
host = self.client_address[0]
|
super().handle() # 可以不调用父类的handler(),方法,父类的handler方法什么都没做
|
# print("-------handler方法被执行----")
|
# print(self.server)
|
# print(self.request) # 服务
|
# print("客户端地址:", self.client_address) # 客户端地址
|
# print(self.__dict__)
|
# print("- " * 30)
|
# print(self.server.__dict__)
|
# print("- " * 30)
|
sk: socket.socket = self.request
|
while True:
|
data = sk.recv(102400)
|
if len(data) == 0:
|
# print("客户端断开连接")
|
break
|
_str = str(data, encoding="gbk")
|
if len(_str) > 0:
|
# print("结果:",_str)
|
type = data_process.parseType(_str)
|
return_str = "OK"
|
if type == 0:
|
|
try:
|
origin_start_time = round(time.time() * 1000)
|
__start_time = round(time.time() * 1000)
|
|
# level2盘口数据
|
day, client, channel, code, capture_time, process_time, datas, origin_datas = l2_data_manager.parseL2Data(
|
_str)
|
# 10ms的网络传输延时
|
capture_timestamp = __start_time - process_time - 10
|
__start_time = l2_data_log.l2_time(code, round(time.time() * 1000) - __start_time,
|
"截图时间:{} 数据解析时间".format(process_time))
|
|
cid, pid = gpcode_manager.get_listen_code_pos(code)
|
|
__start_time = l2_data_log.l2_time(code, round(time.time() * 1000) - __start_time,
|
"l2获取代码位置耗时")
|
# 判断目标代码位置是否与上传数据位置一致
|
if cid is not None and pid is not None and client == int(cid) and channel == int(pid):
|
try:
|
# 校验客户端代码
|
l2_code_operate.verify_with_l2_data_pos_info(code, client, channel)
|
__start_time = round(time.time() * 1000)
|
if gpcode_manager.is_listen(code):
|
__start_time = l2_data_log.l2_time(code, round(time.time() * 1000) - __start_time,
|
"l2外部数据预处理耗时")
|
l2_data_manager_new.L2TradeDataProcessor.process(code, datas, capture_timestamp)
|
__start_time = l2_data_log.l2_time(code, round(time.time() * 1000) - __start_time,
|
"l2数据有效处理外部耗时",
|
False)
|
# 保存原始数据数量
|
l2_data_util.save_l2_latest_data_number(code, len(origin_datas))
|
if round(time.time() * 1000) - __start_time > 20:
|
l2_data_log.l2_time(code, round(time.time() * 1000) - __start_time,
|
"异步保存原始数据条数耗时",
|
False)
|
|
except l2_data_manager.L2DataException as l:
|
# 单价不符
|
if l.get_code() == l2_data_manager.L2DataException.CODE_PRICE_ERROR:
|
key = "{}-{}-{}".format(client, channel, code)
|
if key not in self.l2_data_error_dict or round(
|
time.time() * 1000) - self.l2_data_error_dict[key] > 10000:
|
# self.l2CodeOperate.repaire_l2_data(code)
|
# todo 太敏感移除代码
|
logger_l2_error.warning("code-{} l2单价错误:{}", code, l.msg)
|
# 单价不一致时需要移除代码重新添加
|
l2_code_operate.L2CodeOperate().remove_l2_listen(code, "l2监听单价错误")
|
self.l2_data_error_dict[key] = round(time.time() * 1000)
|
|
except Exception as e:
|
print("异常", str(e))
|
logging.exception(e)
|
logger_l2_error.error("出错:{}".format(str(e)))
|
logger_l2_error.error("内容:{}".format(_str))
|
finally:
|
|
__end_time = round(time.time() * 1000)
|
# 只记录大于40ms的数据
|
if __end_time - origin_start_time > 100:
|
l2_data_log.l2_time(code, round(time.time() * 1000) - origin_start_time,
|
"l2数据处理总耗时",
|
True)
|
except Exception as e:
|
logging.exception(e)
|
elif type == 1:
|
# 设置股票代码
|
data_list = data_process.parseGPCode(_str)
|
ZYLTGBUtil.save_list(data_list)
|
code_list = []
|
for data in data_list:
|
code_list.append(data["code"])
|
|
# 获取基本信息
|
code_datas = juejin.JueJinManager.get_gp_latest_info(code_list)
|
gpcode_manager.set_gp_list(code_datas)
|
|
# 同步同花顺目标代码
|
t1 = threading.Thread(target=lambda: sync_target_codes_to_ths())
|
t1.setDaemon(True)
|
t1.start()
|
elif type == 2:
|
# 涨停代码
|
dataList = data_process.parseGPCode(_str)
|
# 设置涨停时间
|
gpcode_manager.set_limit_up_list(dataList)
|
# 保存到内存中
|
if dataList:
|
global_data_loader.add_limit_up_codes(dataList)
|
ths_industry_util.set_industry_hot_num(dataList)
|
elif type == 3:
|
# 交易成功信息
|
dataList = data_process.parseList(_str)
|
try:
|
trade_manager.process_trade_success_data(dataList)
|
except Exception as e:
|
logging.exception(e)
|
trade_manager.save_trade_success_data(dataList)
|
|
elif type == 5:
|
# 交易委托信息
|
dataList = data_process.parseList(_str)
|
if self.last_trade_delegate_data != _str:
|
self.last_trade_delegate_data = _str
|
# 保存委托信息
|
logger_trade_delegate.info(dataList)
|
try:
|
trade_manager.process_trade_delegate_data(dataList)
|
except Exception as e:
|
logging.exception(e)
|
trade_manager.save_trade_delegate_data(dataList)
|
|
elif type == 4:
|
# 行业代码信息
|
dataList = data_process.parseList(_str)
|
ths_industry_util.save_industry_code(dataList)
|
elif type == 6:
|
# 可用金额
|
datas = data_process.parseData(_str)
|
client = datas["client"]
|
money = datas["money"]
|
# TODO存入缓存文件
|
trade_manager.set_available_money(client, money)
|
elif type == 20:
|
# 登录
|
data = data_process.parse(_str)["data"]
|
try:
|
client_id, _authoritys = authority.login(data["account"], data["pwd"])
|
return_str = data_process.toJson(
|
{"code": 0, "data": {"client": int(client_id), "authoritys": json.loads(_authoritys)}})
|
except Exception as e:
|
return_str = data_process.toJson({"code": 1, "msg": str(e)})
|
# 现价更新
|
elif type == 40:
|
data = data_process.parse(_str)["data"]
|
if data is not None:
|
print("现价数量", len(data))
|
for item in data:
|
volumn = item["volumn"]
|
volumnUnit = item["volumnUnit"]
|
code_volumn_manager.save_today_volumn(item["code"], volumn, volumnUnit)
|
juejin.accpt_prices(data)
|
elif type == 50:
|
data = data_process.parse(_str)["data"]
|
if data is not None:
|
index = data["index"]
|
code_name = data["codeName"]
|
volumn = data["volumn"]
|
price = data["price"]
|
time_ = data["time"]
|
code = global_util.name_codes.get(code_name)
|
if code is None:
|
global_data_loader.load_name_codes()
|
code = global_util.name_codes.get(code_name)
|
if code is not None:
|
# 校正时间
|
seconds = tool.get_time_as_second(time_)
|
if seconds % 3 > 0:
|
seconds = seconds - seconds % 3
|
time_ = tool.time_seconds_format(seconds)
|
# 保存数据
|
need_sync, need_cancel, cancel_msg = self.buy1_volumn_manager.save(code, time_, volumn,
|
price)
|
if need_cancel:
|
l2_data_manager_new.L2TradeDataProcessor.cancel_buy(code, cancel_msg)
|
if need_sync:
|
# 同步数据
|
l2_data_manager.L2LimitUpMoneyStatisticUtil.verify_num(code, volumn, time_)
|
|
elif type == 30:
|
# 心跳信息
|
data = data_process.parse(_str)["data"]
|
client_id = data["client"]
|
thsDead = data.get("thsDead")
|
logger_device.info("({})客户端信息:{}".format(client_id, json.dumps(data)))
|
client_manager.saveClientActive(int(client_id), host, thsDead)
|
if ths_util.is_ths_dead(client_id):
|
# TODO 重启同花顺
|
# 报警
|
l2_clients = authority.get_l2_clients()
|
if client_id in l2_clients:
|
alert_util.alarm()
|
# print("心跳:", client_id)
|
sk.send(return_str.encode())
|
|
# print("----------handler end ----------")
|
|
def finish(self):
|
super().finish() # 可以不调用父类的finish(),方法,父类的finish方法什么都没做
|
# print("--------finish方法被执行---")
|
|
|
def send_msg(client_id, data):
|
_ip = client_manager.getActiveClientIP(client_id)
|
print("ip", client_id, _ip)
|
if _ip is None or len(_ip) <= 0:
|
raise Exception("客户端IP为空")
|
socketClient = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
socketClient.connect((_ip, 9006))
|
# 连接socket
|
try:
|
socketClient.send(json.dumps(data).encode())
|
recv = socketClient.recv(1024)
|
result = str(recv, encoding="gbk")
|
return result
|
finally:
|
socketClient.close()
|
|
|
# 客户端心跳机制
|
def test_client_server():
|
while True:
|
clients = authority.get_l2_clients()
|
for client in clients:
|
print("心跳", client)
|
try:
|
send_msg(client, {"action": "test"})
|
except:
|
pass
|
# 矫正客户端代码
|
l2_code_operate.correct_client_codes()
|
time.sleep(5)
|
|
|
# 获取采集客户端的状态
|
def get_client_env_state(client):
|
result = send_msg(client, {"action": "getEnvState"})
|
result = json.loads(result)
|
if result["code"] == 0:
|
return json.loads(result["data"])
|
else:
|
raise Exception(result["msg"])
|
|
|
# 修复采集客户端
|
def repair_client_env(client):
|
result = send_msg(client, {"action": "repairEnv"})
|
result = json.loads(result)
|
if result["code"] != 0:
|
raise Exception(result["msg"])
|
|
|
# 同步目标标的到同花顺
|
def sync_target_codes_to_ths():
|
codes = gpcode_manager.get_gp_list()
|
code_list = []
|
for code in codes:
|
code_list.append(code)
|
client = authority._get_client_ids_by_rule("client-industry")
|
result = send_msg(client[0], {"action": "syncTargetCodes", "data": code_list})
|
return result
|
|
|
# 修复同花顺主站
|
def repair_ths_main_site(client):
|
result = send_msg(client, {"action": "updateTHSSite"})
|
result = json.loads(result)
|
if result["code"] != 0:
|
raise Exception(result["msg"])
|
|
|
if __name__ == "__main__":
|
try:
|
repair_ths_main_site(2)
|
except Exception as e:
|
print(str(e))
|