import builtins
|
import copy
|
import hashlib
|
import json
|
import logging
|
import queue
|
import random
|
import socket
|
import socketserver
|
import threading
|
import time
|
|
import constant
|
from log_module import log
|
|
from trade import huaxin_trade_api
|
from utils import socket_util
|
|
trade_data_request_queue = queue.Queue()
|
|
__mysql_config_dict = {}
|
|
|
def get_mysql_config(db_name):
|
"""
|
获取mysql的配置
|
:param db_name:
|
:return:
|
"""
|
if db_name in __mysql_config_dict:
|
return __mysql_config_dict.get(db_name)
|
config = copy.deepcopy(constant.MYSQL_CONFIG)
|
config["database"] = db_name
|
__mysql_config_dict[db_name] = config
|
return config
|
|
|
class MyTCPServer(socketserver.TCPServer):
|
def __init__(self, server_address, RequestHandlerClass):
|
socketserver.TCPServer.__init__(self, server_address, RequestHandlerClass, bind_and_activate=True)
|
|
|
# 如果使用异步的形式则需要再重写ThreadingTCPServer
|
class MyThreadingTCPServer(socketserver.ThreadingMixIn, MyTCPServer): pass
|
|
|
class MyBaseRequestHandle(socketserver.BaseRequestHandler):
|
__inited = False
|
|
def setup(self):
|
self.__init()
|
|
@classmethod
|
def __init(cls):
|
if cls.__inited:
|
return True
|
cls.__inited = True
|
cls.__req_socket_dict = {}
|
|
def __is_sign_right(self, data_json):
|
list_str = []
|
sign = data_json["sign"]
|
data_json.pop("sign")
|
for k in data_json:
|
list_str.append(f"{k}={data_json[k]}")
|
list_str.sort()
|
__str = "&".join(list_str) + "JiaBei@!*."
|
md5 = hashlib.md5(__str.encode(encoding='utf-8')).hexdigest()
|
if md5 != sign:
|
raise Exception("签名出错")
|
|
@classmethod
|
def getRecvData(cls, skk):
|
data = ""
|
header_size = 10
|
buf = skk.recv(header_size)
|
header_str = buf
|
if buf:
|
start_time = time.time()
|
buf = buf.decode('utf-8')
|
if buf.startswith("##"):
|
content_length = int(buf[2:10])
|
received_size = 0
|
while not received_size == content_length:
|
r_data = skk.recv(10240)
|
received_size += len(r_data)
|
data += r_data.decode('utf-8')
|
else:
|
data = skk.recv(1024 * 1024)
|
data = buf + data.decode('utf-8')
|
return data, header_str
|
|
def handle(self):
|
host = self.client_address[0]
|
super().handle()
|
sk: socket.socket = self.request
|
while True:
|
try:
|
data, header = self.getRecvData(sk)
|
if data:
|
data_str = data
|
# print("收到数据------", f"{data_str[:20]}......{data_str[-20:]}")
|
data_json = None
|
try:
|
data_json = json.loads(data_str)
|
except json.decoder.JSONDecodeError as e:
|
# JSON解析失败
|
sk.sendall(socket_util.load_header(json.dumps(
|
{"code": 100, "msg": f"JSON解析失败"}).encode(
|
encoding='utf-8')))
|
continue
|
type_ = data_json["type"]
|
__start_time = time.time()
|
try:
|
if data_json["type"] == "simulation_trade":
|
datas = data_json["data"]
|
ctype = datas["ctype"]
|
data = datas["data"]
|
result = huaxin_trade_api.request(ctype,data)
|
result_str = json.dumps({"code": 0, "data": result}, default=str)
|
sk.sendall(socket_util.load_header(result_str.encode(encoding='utf-8')))
|
|
|
except Exception as e:
|
log.logger_tuoguan_request_debug.exception(e)
|
finally:
|
if time.time() - __start_time > 2:
|
log.logger_tuoguan_request_debug.info(
|
f"耗时:{int(time.time() - __start_time)}s 数据:{data_json}")
|
else:
|
# 断开连接
|
break
|
# sk.close()
|
except Exception as e:
|
# log.logger_tuoguan_request_debug.exception(e)
|
logging.exception(e)
|
break
|
|
def finish(self):
|
super().finish()
|
|
|
def __recv_pipe_l1(pipe_trade, pipe_l1):
|
if pipe_trade is not None and pipe_l1 is not None:
|
while True:
|
try:
|
val = pipe_l1.recv()
|
if val:
|
val = json.loads(val)
|
print("收到来自L1的数据:", val)
|
# 处理数据
|
except:
|
pass
|
|
|
def run(port=constant.MIDDLE_SERVER_PORT):
|
print("create MiddleServer")
|
laddr = "0.0.0.0", port
|
print("MiddleServer is at: http://%s:%d/" % (laddr))
|
tcpserver = MyThreadingTCPServer(laddr, MyBaseRequestHandle) # 注意:参数是MyBaseRequestHandle
|
tcpserver.serve_forever()
|
|
|
if __name__ == "__main__":
|
pass
|