| | |
| | | import builtins |
| | | import copy |
| | | import hashlib |
| | | import json |
| | | import logging |
| | |
| | | import time |
| | | |
| | | import constant |
| | | import log |
| | | import socket_manager |
| | | from db import mysql_data |
| | | from db.redis_manager import RedisUtils, RedisManager |
| | | from log import logger_debug, logger_request_debug |
| | | from log_module import log |
| | | from log_module.log import logger_debug |
| | | from middle_l1_data_server import L1DataManager |
| | | from output import push_msg_manager |
| | | from utils import socket_util, kpl_api_util, hosting_api_util, kp_client_msg_manager, global_data_cache_util, tool |
| | | from utils import socket_util, kpl_api_util, hosting_api_util, kp_client_msg_manager, global_data_cache_util, tool, \ |
| | | block_web_api |
| | | from utils.juejin_util import JueJinHttpApi |
| | | |
| | | trade_data_request_queue = queue.Queue() |
| | | |
| | | __mysql_config_dict = {} |
| | | |
| | | |
| | | def get_mysql_config(db_name): |
| | | """ |
| | | 获取mysql的配置 |
| | | :param db_name: |
| | | :return: |
| | | """ |
| | | if db_name in __mysql_config_dict: |
| | | return __mysql_config_dict.get(db_name) |
| | | config = copy.deepcopy(constant.MYSQL_CONFIG) |
| | | config["database"] = db_name |
| | | __mysql_config_dict[db_name] = config |
| | | return config |
| | | |
| | | |
| | | class MyTCPServer(socketserver.TCPServer): |
| | |
| | | encoding='utf-8'))) |
| | | continue |
| | | type_ = data_json["type"] |
| | | log.request_info("middle_server", f"请求开始:{type_}") |
| | | __start_time = time.time() |
| | | try: |
| | | if data_json["type"] == 'register': |
| | | client_type = data_json["data"]["client_type"] |
| | |
| | | try: |
| | | data = data_json["data"] |
| | | datas = data["data"] |
| | | print("l2_subscript_codes", data_json) |
| | | # print("l2_subscript_codes", data_json) |
| | | global_data_cache_util.huaxin_subscript_codes = datas |
| | | global_data_cache_util.huaxin_subscript_codes_update_time = tool.get_now_time_str() |
| | | finally: |
| | | sk.sendall(socket_util.load_header(json.dumps({"code": 0}).encode(encoding='utf-8'))) |
| | | elif data_json["type"] == "l2_subscript_codes_rate": |
| | | # 设置订阅的代码的涨幅 |
| | | try: |
| | | data = data_json["data"] |
| | | datas = data["data"] |
| | | # print("l2_subscript_codes", data_json) |
| | | global_data_cache_util.huaxin_subscript_codes_rate = datas |
| | | finally: |
| | | sk.sendall(socket_util.load_header(json.dumps({"code": 0}).encode(encoding='utf-8'))) |
| | | elif data_json["type"] == "l2_position_subscript_codes": |
| | | # 设置订阅的代码 |
| | | try: |
| | | data = data_json["data"] |
| | | datas = data["data"] |
| | | print("l2_position_subscript_codes", data_json) |
| | | global_data_cache_util.huaxin_position_subscript_codes = datas |
| | | global_data_cache_util.huaxin_position_subscript_codes_update_time = tool.get_now_time_str() |
| | | finally: |
| | | sk.sendall(socket_util.load_header(json.dumps({"code": 0}).encode(encoding='utf-8'))) |
| | | elif data_json["type"] == "redis": |
| | |
| | | db = data["db"] |
| | | cmd = data["cmd"] |
| | | args = data.get("args") |
| | | mysql = mysql_data.Mysqldb() |
| | | mysql_config = get_mysql_config(db) |
| | | mysql = mysql_data.Mysqldb(mysql_config) |
| | | method = getattr(mysql, cmd) |
| | | args_ = [] |
| | | if args: |
| | |
| | | push_msg_manager.push_msg(_type, data) |
| | | result_str = json.dumps({"code": 0, "data": {}}) |
| | | sk.sendall(socket_util.load_header(result_str.encode(encoding='utf-8'))) |
| | | finally: |
| | | log.request_info("middle_server", f"请求结束") |
| | | elif data_json["type"] == 'l1_data': |
| | | datas = data_json["data"] |
| | | L1DataManager().add_datas(datas) |
| | | break |
| | | elif data_json["type"] == 'get_l1_target_codes': |
| | | # 获取目标代码 |
| | | codes = L1DataManager().get_target_codes() |
| | | result_str = json.dumps({"code": 0, "data": list(codes)}) |
| | | sk.sendall(socket_util.load_header(result_str.encode(encoding='utf-8'))) |
| | | break |
| | | # 获取三方板块 |
| | | elif data_json["type"] == 'get_third_blocks': |
| | | data = data_json["data"] |
| | | data = data["data"] |
| | | source = data["source"] |
| | | code = data["code"] |
| | | result_str = json.dumps({"code": 1, "msg": "source不匹配"}) |
| | | if source == 2: |
| | | # 通达信 |
| | | try: |
| | | blocks = block_web_api.get_tdx_blocks(code) |
| | | result_str = json.dumps({"code": 0, "data": list(blocks)}) |
| | | except Exception as e: |
| | | result_str = json.dumps({"code": 1, "msg": str(e)}) |
| | | elif source == 3: |
| | | # 同花顺 |
| | | try: |
| | | blocks = block_web_api.THSBlocksApi().get_ths_blocks(code) |
| | | result_str = json.dumps({"code": 0, "data": list(blocks)}) |
| | | except Exception as e: |
| | | try: |
| | | block_web_api.THSBlocksApi.load_hexin_v() |
| | | blocks = block_web_api.THSBlocksApi().get_ths_blocks(code) |
| | | result_str = json.dumps({"code": 0, "data": list(blocks)}) |
| | | except Exception as e1: |
| | | result_str = json.dumps({"code": 1, "msg": str(e1)}) |
| | | elif source == 4: |
| | | # 东方财富 |
| | | try: |
| | | blocks = block_web_api.get_eastmoney_block(code) |
| | | result_str = json.dumps({"code": 0, "data": list(blocks)}) |
| | | except Exception as e: |
| | | result_str = json.dumps({"code": 1, "msg": str(e)}) |
| | | sk.sendall(socket_util.load_header(result_str.encode(encoding='utf-8'))) |
| | | break |
| | | elif data_json["type"] == 'low_suction': |
| | | # TODO 低吸通道 |
| | | datas = data_json["data"] |
| | | pass |
| | | |
| | | |
| | | |
| | | except Exception as e: |
| | | log.logger_tuoguan_request_debug.exception(e) |
| | | finally: |
| | | if time.time() - __start_time > 2: |
| | | log.logger_tuoguan_request_debug.info( |
| | | f"耗时:{int(time.time() - __start_time)}s 数据:{data_json}") |
| | | else: |
| | | # 断开连接 |
| | | break |
| | | # sk.close() |
| | | except Exception as e: |
| | | # log.logger_tuoguan_request_debug.exception(e) |
| | | logging.exception(e) |
| | | break |
| | | |
| | |
| | | pass |
| | | |
| | | |
| | | def run(): |
| | | def run(port=constant.MIDDLE_SERVER_PORT): |
| | | print("create MiddleServer") |
| | | t1 = threading.Thread(target=lambda: clear_invalid_client(), daemon=True) |
| | | t1.start() |
| | | |
| | | laddr = "0.0.0.0", constant.MIDDLE_SERVER_PORT |
| | | laddr = "0.0.0.0", port |
| | | print("MiddleServer is at: http://%s:%d/" % (laddr)) |
| | | tcpserver = MyThreadingTCPServer(laddr, MyBaseRequestHandle) # 注意:参数是MyBaseRequestHandle |
| | | tcpserver.serve_forever() |
| | | |
| | | |
| | | if __name__ == "__main__": |
| | | print(builtins.type("")==str) |
| | | pass |