From a0f4a1d5bed0b4be8be122e90d2f95b76f178a94 Mon Sep 17 00:00:00 2001 From: Administrator <admin@example.com> Date: 星期四, 21 十一月 2024 17:41:22 +0800 Subject: [PATCH] 精简代码/代码归类 --- /dev/null | 538 ----------------------------------- task/task_manager.py | 111 +++++++ main.py | 59 +-- servers/huaxin_trade_server.py | 142 --------- trade/huaxin/huaxin_trade_data_update.py | 8 5 files changed, 138 insertions(+), 720 deletions(-) diff --git a/main.py b/main.py index f463ac8..1a98d6f 100644 --- a/main.py +++ b/main.py @@ -11,35 +11,29 @@ import os import threading +from task import task_manager + logger_system.info("绋嬪簭鍚姩Pre锛歿}", os.getpid()) -from db import redis_manager_delegate as redis_manager import huaxin_client.trade_client import huaxin_client.l2_client import huaxin_client.l1_client from huaxin_client import l2_market_client -# 浜ゆ槗鏈嶅姟 - -# from huaxin_api import trade_client, l2_client, l1_client -from servers import server_util, huaxin_trade_api_server, huaxin_trade_server, server +from servers import server_util, huaxin_trade_server, server -def createTradeServer(pipe_server, queue_strategy_r_trade_w_: multiprocessing.Queue, - queue_l1_w_strategy_r_: multiprocessing.Queue, - queue_strategy_w_trade_r_: multiprocessing.Queue, - queue_strategy_w_trade_r_for_read_: multiprocessing.Queue, queue_l1_trade_r_strategy_w_, - queue_l1_trade_w_strategy_r_, trade_ipc_addr): +def run_strategy(queue_strategy_r_trade_w_: multiprocessing.Queue, + queue_l1_w_strategy_r_: multiprocessing.Queue, + queue_strategy_w_trade_r_: multiprocessing.Queue, + queue_strategy_w_trade_r_for_read_: multiprocessing.Queue, + trade_ipc_addr): """ 绛栫暐杩涚▼ - - @param pipe_server: @param queue_strategy_r_trade_w_: @param queue_l1_w_strategy_r_: @param queue_strategy_w_trade_r_: @param queue_strategy_w_trade_r_for_read_: - @param queue_l1_trade_r_strategy_w_: - @param queue_l1_trade_w_strategy_r_: @param trade_ipc_addr: 浜ゆ槗ipc鍦板潃(涓嬪崟鍦板潃, 鎾ゅ崟鍦板潃) @return: """ @@ -48,25 +42,18 @@ # 鍒濆鍖栧弬鏁� server.global_data_loader.init() - # 鏁版嵁鏈嶅姟 - t1 = threading.Thread(target=server_util.run_data_server, name="createDataServer", daemon=True) - t1.start() + # 寮�鍚暟鎹湇鍔″櫒 + threading.Thread(target=server_util.run_data_server, name="createDataServer", daemon=True).start() - # 浜ゆ槗鎺ュ彛鏈嶅姟 - t1 = threading.Thread(target=huaxin_trade_api_server.run, name="trade_api_server", - args=(pipe_server, queue_other_w_l2_r, queue_l1_trade_r_strategy_w_), - daemon=True) - t1.start() - # - # redis鍚庡彴鏈嶅姟 - t1 = threading.Thread(target=redis_manager.RedisUtils.run_loop, name="redis", daemon=True) - t1.start() - + # 杩愯鏁版嵁鐩戝惉鏈嶅姟 + threading.Thread(target=task_manager.run_data_listener, name="task_manager", + args=(queue_other_w_l2_r, queue_l1_w_strategy_r_), + daemon=True).start() # # 鍚姩鍗庨懌浜ゆ槗鏈嶅姟 - huaxin_trade_server.run(queue_strategy_r_trade_w_, queue_l1_w_strategy_r_, queue_strategy_w_trade_r_, + huaxin_trade_server.run(queue_strategy_r_trade_w_, queue_strategy_w_trade_r_, queue_strategy_w_trade_r_for_read_, - queue_l1_trade_w_strategy_r_, trade_ipc_addr) + trade_ipc_addr) # 涓绘湇鍔� @@ -91,17 +78,12 @@ logger_l2_trade.info("鍚姩绋嬪簭") logger_system.info("鍚姩绋嬪簭--------") log.close_print() - # 绛栫暐涓巗erver闂寸殑閫氫俊 - pss_server, pss_strategy = multiprocessing.Pipe() # L2璇诲叾浠栧啓 queue_other_w_l2_r = multiprocessing.Queue() # l1 queue_l1_w_strategy_r = multiprocessing.Queue() queue_l1_r_strategy_w = multiprocessing.Queue() - # l1浜ゆ槗 - queue_l1_trade_w_strategy_r = multiprocessing.Queue() - queue_l1_trade_r_strategy_w = multiprocessing.Queue() # 浜ゆ槗璇荤瓥鐣ュ啓 queue_strategy_w_trade_r = multiprocessing.Queue() @@ -112,9 +94,6 @@ # 涓嬪崟,鎾ゅ崟ipc鍦板潃 order_ipc_addr, cancel_order_ipc_addr = "ipc://trade_order.ipc", "ipc://trade_cancel_order.ipc" - # 鎵樼鐜涓嬩笉鍒涘缓 - # serverProcess = multiprocessing.Process(target=createServer, args=(pss_server,)) - # serverProcess.start() logger_system.info("涓昏繘绋婭D锛歿}", os.getpid()) # L1璁㈤槄鏁版嵁 @@ -149,9 +128,9 @@ # cpu_indexes = [i for i in range(23, 30)] # psutil.Process(os.getpid()).cpu_affinity(cpu_indexes) # 涓昏繘绋� - createTradeServer(pss_strategy, queue_strategy_r_trade_w, queue_l1_w_strategy_r, queue_strategy_w_trade_r, - queue_strategy_w_trade_r_for_read, queue_l1_trade_r_strategy_w, - queue_l1_trade_w_strategy_r, (order_ipc_addr, cancel_order_ipc_addr)) + run_strategy(queue_strategy_r_trade_w, queue_l1_w_strategy_r, queue_strategy_w_trade_r, + queue_strategy_w_trade_r_for_read, + (order_ipc_addr, cancel_order_ipc_addr)) # 灏唗radeServer浣滀负涓昏繘绋� l1Process.join() diff --git a/servers/huaxin_trade_api_server.py b/servers/huaxin_trade_api_server.py deleted file mode 100644 index b6c1b09..0000000 --- a/servers/huaxin_trade_api_server.py +++ /dev/null @@ -1,538 +0,0 @@ -import hashlib -import json -import logging -import multiprocessing -import socket -import socketserver -import threading -import time - -import psutil - -import inited_data -from code_attribute import gpcode_manager -from db import mysql_data_delegate as mysql_data, redis_manager_delegate as redis_manager -from db.redis_manager_delegate import RedisUtils -from l2 import l2_data_manager_new -from l2.huaxin import huaxin_target_codes_manager -from log_module.log import logger_system, logger_l2_codes_subscript -from third_data import block_info, kpl_api -from third_data.code_plate_key_manager import KPLCodeJXBlockManager -from third_data.history_k_data_util import HistoryKDatasUtils, JueJinApi -from third_data.kpl_data_manager import KPLDataManager, KPLLimitUpDataRecordManager -from third_data.kpl_util import KPLDataType -from trade import trade_manager, trade_huaxin, l2_trade_util, trade_constant - -from trade.huaxin import huaxin_trade_api, huaxin_trade_record_manager, \ - huaxin_trade_data_update -from trade.huaxin.huaxin_trade_api import ClientSocketManager -from trade.huaxin.huaxin_trade_order_processor import TradeResultProcessor, HuaxinOrderEntity -from utils import socket_util, tool, huaxin_util, data_export_util - - -class MyTCPServer(socketserver.TCPServer): - def __init__(self, server_address, RequestHandlerClass): - socketserver.TCPServer.__init__(self, server_address, RequestHandlerClass, bind_and_activate=True) - - -# 濡傛灉浣跨敤寮傛鐨勫舰寮忓垯闇�瑕佸啀閲嶅啓ThreadingTCPServer -class MyThreadingTCPServer(socketserver.ThreadingMixIn, MyTCPServer): pass - - -class MyBaseRequestHandle(socketserver.BaseRequestHandler): - __inited = False - - def setup(self): - self.__init() - - @classmethod - def __init(cls): - if cls.__inited: - return True - cls.__inited = True - cls.__req_socket_dict = {} - - def __is_sign_right(self, data_json): - list_str = [] - sign = data_json["sign"] - data_json.pop("sign") - for k in data_json: - list_str.append(f"{k}={data_json[k]}") - list_str.sort() - __str = "&".join(list_str) + "JiaBei@!*." - md5 = hashlib.md5(__str.encode(encoding='utf-8')).hexdigest() - if md5 != sign: - raise Exception("绛惧悕鍑洪敊") - - def handle(self): - host = self.client_address[0] - super().handle() - sk: socket.socket = self.request - while True: - return_str = "" - try: - data, header = socket_util.recv_data(sk) - if data: - data_str = data - data_json = json.loads(data_str) - type_ = data_json['type'] - - is_sign_right = socket_util.is_client_params_sign_right(data_json) - # ------瀹㈡埛绔姹傛帴鍙�------- - if type_ == 'buy': - # 楠岃瘉绛惧悕 - if not is_sign_right: - raise Exception("绛惧悕閿欒") - codes_data = data_json["data"] - code = codes_data["code"] - volume = codes_data["volume"] - price = codes_data["price"] - try: - if not code: - raise Exception("璇蜂笂浼燾ode") - if not volume: - raise Exception("璇蜂笂浼爒olume") - - if round(float(price), 2) <= 0: - prices = HistoryKDatasUtils.get_now_price([code]) - if not prices: - raise Exception("鐜颁环鑾峰彇澶辫触") - price = prices[0][1] - # 涓嬪崟 - result = huaxin_trade_api.order(huaxin_trade_api.TRADE_DIRECTION_BUY, code, volume, - round(float(price), 2)) - if result: - resultJSON = result - # - # {'code': 0, 'data': {'sinfo': 'b_600480_1689060343812', 'securityId': '600480', - # 'orderLocalId': '0190000809', 'orderStatus': '7', 'statusMsg': - # '10932:浜у搧鐘舵�佽祫婧愯闂巿鏉冧笉瓒�', 'orderSysID': '110010190000809', 'accountId': - # '38800001334901'}} - if resultJSON['code'] == 0: - try: - resultJSON = resultJSON['data'] - statusCode = resultJSON['orderStatus'] - - if statusCode == huaxin_util.TORA_TSTP_OST_Rejected: - # 浜ゆ槗鎵�鎷掔粷 - raise Exception(resultJSON['statusMsg']) - else: - # code, orderStatus, orderRef, accountID, orderSysID, insertTime=None - order = HuaxinOrderEntity(resultJSON['securityId'], statusCode, - resultJSON['orderRef'], resultJSON['accountID'], - resultJSON['orderSysID'], - resultJSON['insertTime'], - acceptTime=resultJSON['acceptTime'], - direction=resultJSON['direction']) - TradeResultProcessor.order_success(order) - return_str = json.dumps({"code": 0}) - finally: - # 鏇存柊濮旀墭鍒楄〃 - huaxin_trade_data_update.add_delegate_list("鎺ュ彛") - huaxin_trade_data_update.add_money_list() - else: - raise Exception(resultJSON['msg']) - break - except Exception as e: - raise e - elif type_ == 'cancel_order': - # 楠岃瘉绛惧悕 - if not is_sign_right: - raise Exception("绛惧悕閿欒") - codes_data = data_json["data"] - code = codes_data["code"] - orderSysID = codes_data.get("orderSysID") - accountId = codes_data.get("accountId") - if code and orderSysID and accountId: - result = huaxin_trade_api.cancel_order(huaxin_trade_api.TRADE_DIRECTION_BUY, code, - orderSysID, True) - print(result) - if result["code"] == 0: - if result["data"]["cancel"] == 1: - # 鎾ゅ崟鎴愬姛 - TradeResultProcessor.cancel_order_success(code, accountId, orderSysID) - return_str = json.dumps({"code": 0}) - else: - # 鎾ゅ崟澶辫触 - raise Exception(result["data"]["errorMsg"]) - else: - raise Exception(result["msg"]) - - elif code: - state = trade_manager.CodesTradeStateManager().get_trade_state_cache(code) - if state == trade_constant.TRADE_STATE_BUY_PLACE_ORDER or state == trade_constant.TRADE_STATE_BUY_DELEGATED or state == trade_constant.TRADE_STATE_BUY_CANCEL_ING: - try: - l2_data_manager_new.L2TradeDataProcessor.cancel_buy(code, "鎵嬪姩鎾ら攢", cancel_type=trade_constant.CANCEL_TYPE_HUMAN) - return_str = json.dumps({"code": 0}) - except Exception as e: - logging.exception(e) - return_str = json.dumps({"code": 2, "msg": str(e)}) - else: - return_str = json.dumps({"code": 1, "msg": "鏈浜庡彲鎾ゅ崟鐘舵��"}) - else: - return_str = json.dumps({"code": 1, "msg": "璇蜂笂浼犱唬鐮�"}) - break - - elif type_ == 'sell': - # 楠岃瘉绛惧悕 - if not is_sign_right: - raise Exception("绛惧悕閿欒") - codes_data = data_json["data"] - code = codes_data["code"] - volume = codes_data["volume"] - price = codes_data["price"] - # 鏄惁寮哄埗鍗�0/1 - force_sell = codes_data["force"] - # TODO 寮哄埗鍗栫瓥鐣� - if volume == 0: - # 鏌ヨ鎸佷粨閲� - volume = huaxin_trade_record_manager.PositionManager.get_code_volume(code) - if volume <= 0: - raise Exception("浠g爜鏃犳寔浠�") - - if not price: - # 鑾峰彇鐜颁环 - prices = HistoryKDatasUtils.get_now_price([code]) - if not prices: - raise Exception("鐜颁环鑾峰彇澶辫触") - # 宸茬幇浠风殑5妗d环鍗� - price = prices[0][1] - 0.04 - - result = huaxin_trade_api.order(huaxin_trade_api.TRADE_DIRECTION_SELL, code, volume, price) - if result["code"] == 0: - if result["data"]["orderStatus"] == huaxin_util.TORA_TSTP_OST_Rejected or ( - type(result["data"]["orderStatus"]) == int and result["data"]["orderStatus"] < 0): - raise Exception(result["data"]["statusMsg"]) - else: - return_str = json.dumps({"code": 0, "msg": ""}) - else: - raise Exception(result["msg"]) - - print("---鍗栧嚭缁撴灉----") - print(result) - break - elif type_ == 'delegate_list': - # 濮旀墭鍒楄〃 - update_time = data_json["data"]["update_time"] - # 鏄惁鍙挙 0/1 - can_cancel = data_json["data"]["can_cancel"] - results, update_time = None, None - if can_cancel: - results, update_time = huaxin_trade_record_manager.DelegateRecordManager.list_by_day( - tool.get_now_date_str("%Y%m%d"), None, - [huaxin_util.TORA_TSTP_OST_Accepted, huaxin_util.TORA_TSTP_OST_PartTraded]) - else: - results, update_time = huaxin_trade_record_manager.DelegateRecordManager.list_by_day( - tool.get_now_date_str("%Y%m%d"), update_time) - return_str = json.dumps( - {"code": 0, "data": {"list": results, "updateTime": update_time}, "msg": "璇蜂笂浼犱唬鐮�"}) - break - elif type_ == 'deal_list': - # 鎴愪氦鍒楄〃 - results = huaxin_trade_record_manager.DealRecordManager.list_by_day( - tool.get_now_date_str("%Y%m%d")) - return_str = json.dumps( - {"code": 0, "data": {"list": results}, "msg": ""}) - elif type_ == 'position_list': - # 鎸佷粨鑲″垪琛� - results, update_time = huaxin_trade_record_manager.PositionManager.list_by_day( - tool.get_now_date_str("%Y%m%d")) - return_str = json.dumps( - {"code": 0, "data": {"list": results}, "msg": ""}) - elif type_ == 'money_list': - # 璧勯噾璇︽儏 - money_data = huaxin_trade_record_manager.MoneyManager.get_data() - return_str = json.dumps( - {"code": 0, "data": money_data, "msg": ""}) - elif type_ == 'sync_trade_data': - # 鍚屾浜ゆ槗鏁版嵁 - sync_type = data_json["data"]["type"] - if sync_type == "delegate_list": - huaxin_trade_data_update.add_delegate_list("鎺ュ彛") - elif sync_type == "deal_list": - huaxin_trade_data_update.add_deal_list() - elif sync_type == "money": - huaxin_trade_data_update.add_money_list() - elif sync_type == "position_list": - huaxin_trade_data_update.add_position_list() - return_str = json.dumps( - {"code": 0, "data": {}, "msg": ""}) - elif type_ == "get_huaxin_subscript_codes": - # 鑾峰彇鍗庨懌璁㈤槄鐨勪唬鐮� - codes = huaxin_target_codes_manager.HuaXinL2SubscriptCodesManager.get_subscript_codes() - fresults = [] - if codes: - for code in codes: - code_name = gpcode_manager.get_code_name(code) - fresults.append((code, code_name)) - return_str = json.dumps( - {"code": 0, "data": {"count": len(fresults), "list": fresults}, "msg": ""}) - elif type_ == "export_l2_data": - # 瀵煎嚭L2鏁版嵁 - code = data_json["data"]["code"] - try: - data_export_util.export_l2_excel(code) - return_str = json.dumps( - {"code": 0, "data": {}, "msg": ""}) - except Exception as e: - logging.exception(e) - return_str = json.dumps( - {"code": 1, "msg": str(e)}) - elif type_ == 'everyday_init': - # 姣忔棩鍒濆鍖� - inited_data.everyday_init() - return_str = json.dumps( - {"code": 0, "data": {}, "msg": ""}) - elif type_ == 'huaxin_channel_state': - # 鍗庨懌閫氶亾鐘舵�� - types = [huaxin_trade_api.ClientSocketManager.CLIENT_TYPE_TRADE, - huaxin_trade_api.ClientSocketManager.CLIENT_TYPE_CMD_L2, - huaxin_trade_api.ClientSocketManager.CLIENT_TYPE_DEAL_LIST, - huaxin_trade_api.ClientSocketManager.CLIENT_TYPE_DELEGATE_LIST, - huaxin_trade_api.ClientSocketManager.CLIENT_TYPE_MONEY, - huaxin_trade_api.ClientSocketManager.CLIENT_TYPE_POSITION_LIST] - fdata = {} - for t in types: - client_list = huaxin_trade_api.ClientSocketManager.list_client(t) - client_state_list = [] - for client in client_list: - # 鍒ゆ柇鏄惁宸茬粡涓婇攣 - lock_state = huaxin_trade_api.ClientSocketManager.is_client_locked(client[0]) - lock_state_desc = "" - if lock_state is None: - lock_state_desc = "鏈煡" - elif lock_state: - lock_state_desc = "宸查攣" - else: - lock_state_desc = "鏈攣" - client_state_list.append((client[0], lock_state_desc)) - fdata[t] = client_state_list - return_str = json.dumps( - {"code": 0, "data": fdata, "msg": ""}) - elif type_ == 'juejin_is_valid': - # 鎺橀噾鏄惁鍙敤 - try: - date = JueJinApi.get_previous_trading_date(tool.get_now_date_str()) - if date: - return_str = json.dumps( - {"code": 0, "msg": ""}) - except Exception as e: - return_str = json.dumps( - {"code": 0, "msg": str(e)}) - elif type_ == 'get_env_info': - # 鑾峰彇鐜淇℃伅 - fdata = {} - try: - date = JueJinApi.get_previous_trading_date(tool.get_now_date_str()) - if date: - fdata["juejin"] = 1 - except Exception as e: - fdata["juejin"] = 0 - fdata["kpl"] = {} - # 鑾峰彇寮�鐩樺暒鏁版嵁 - kpl_types = [KPLDataType.LIMIT_UP.value, KPLDataType.JINGXUAN_RANK.value, - KPLDataType.INDUSTRY_RANK.value] - for kpl_type in kpl_types: - if kpl_type in KPLDataManager.kpl_data_update_info: - fdata["kpl"][kpl_type] = KPLDataManager.kpl_data_update_info.get(kpl_type) - - try: - # 楠岃瘉redis - RedisUtils.get(redis_manager.RedisManager(0).getRedis(), "test") - fdata["redis"] = 1 - except: - fdata["redis"] = 0 - - try: - # 楠岃瘉mysql - mysql_data.Mysqldb().select_one("select 1") - fdata["mysql"] = 1 - except: - fdata["mysql"] = 0 - - try: - # redis寮傛浠诲姟鏁伴噺 - fdata["redis_async_task_count"] = redis_manager.RedisUtils.get_async_task_count() - except: - pass - - # 鑾峰彇CPU涓庡唴瀛橀�傜敤鎯呭喌 - memory_info = psutil.virtual_memory() - cpu_percent = psutil.cpu_percent(interval=1) - fdata["device"] = {"cpu": cpu_percent, "memery": memory_info.percent} - return_str = json.dumps( - {"code": 0, "data": fdata, "msg": ""}) - elif type_ == 'test_redis': - redis = redis_manager.RedisManager(5).getRedisNoPool() - try: - _start_time = time.time() - times = [] - for i in range(0, 100): - RedisUtils.sadd(redis, "test_set", f"000000:{i}", auto_free=False) - times.append(time.time() - _start_time) - _start_time = time.time() - for i in range(0, 20): - RedisUtils.smembers(redis, "test_set", auto_free=False) - times.append(time.time() - _start_time) - return_str = json.dumps( - {"code": 0, "data": times, "msg": ""}) - finally: - redis.close() - # RedisUtils.realse(redis) - - # 鏌ヨ濮旀墭鍒楄〃 - elif type_ == 'test': - # 鍗栧嚭 - # trade_api.order(trade_api.TRADE_DIRECTION_SELL, "600854", 100, 5.45) - result = huaxin_trade_api.get_deal_list() - print("\n\n---鎴愪氦鍒楄〃----") - for d in result["data"]: - print(d) - - result = huaxin_trade_api.get_delegate_list(True) - print("\n\n---鍙挙濮旀墭----") - for d in result["data"]: - print(d) - result = huaxin_trade_api.get_delegate_list(False) - print("\n\n---鍏ㄩ儴濮旀墭----") - for d in result["data"]: - print(d) - - result = huaxin_trade_api.get_position_list() - print("\n\n---鎸佷粨鍒楄〃----") - for d in result["data"]: - print(d) - - result = huaxin_trade_api.get_money() - print("\n\n---璐︽埛鍒楄〃----") - for d in result["data"]: - print(d) - elif type_ == 'test_l2': - codes_data = data_json["data"] - result = huaxin_trade_api.set_l2_codes_data(codes_data) - print("\n\n---L2璁剧疆缁撴灉----") - print(result) - break - # sk.close() - except Exception as e: - logging.exception(e) - return_str = json.dumps({"code": 401, "msg": str(e)}) - break - finally: - sk.sendall(socket_util.load_header(return_str.encode(encoding='utf-8'))) - - def finish(self): - super().finish() - - -def __set_target_codes(queue_other_w_l2_r: multiprocessing.Queue): - logger_system.info("鍚姩璇诲彇L2璁㈤槄浠g爜闃熷垪") - while True: - try: - _datas = huaxin_target_codes_manager.HuaXinL2SubscriptCodesManager.pop() - if _datas: - times = _datas[0] - datas = _datas[1] - request_id = _datas[2] - logger_l2_codes_subscript.info("({})璇诲彇L2浠g爜澶勭悊闃熷垪锛氭暟閲�-{}", request_id, len(datas)) - # 鍙鐞�20s鍐呯殑鏁版嵁 - if time.time() - times < 20: - # 鑾峰彇娑ㄥ仠鍒楄〃涓殑鏁版嵁 - # datas涓殑鏁版嵁鏍煎紡:(浠g爜, 鐜颁环, 娑ㄥ箙, 閲�, 鏃堕棿) - if not datas: - # 娌℃湁鏁版嵁闇�瑕佸鐞� - continue - - # 鍐嶆鑾峰彇浠g爜 - codes = [d[0] for d in datas] - for code in codes: - block_info.init_code(code) - root_data = {"type": ClientSocketManager.CLIENT_TYPE_CMD_L2, - "data": datas} - queue_other_w_l2_r.put_nowait(json.dumps(root_data)) - # 濡傛灉鍦�9:25-9:29 闇�瑕佸姞杞芥澘鍧� - if int("092500") < int(tool.get_now_time_str().replace(":", "")) < int("092900"): - for d in datas: - threading.Thread(target=lambda: KPLCodeJXBlockManager().load_jx_blocks(d[0], - gpcode_manager.get_price( - d[0]), - float(d[2]), - KPLLimitUpDataRecordManager.get_current_reasons()), - daemon=True).start() - time.sleep(0.2) - logger_l2_codes_subscript.info("({})鍙戦�佸埌鍗庨懌L2浠g爜澶勭悊闃熷垪锛氭暟閲�-{}", request_id, len(datas)) - except Exception as e: - logging.exception(e) - logger_l2_codes_subscript.exception(e) - finally: - time.sleep(0.01) - - -def __read_sync_task(pipe): - logger_system.info("鍚姩璇诲彇鏁版嵁鍚屾鏈嶅姟") - while True: - try: - if pipe: - val = pipe.recv() - if val: - print("鎺ユ敹鍒版洿鏂颁换鍔★細", val) - val = json.loads(val) - type_ = val["type"] - if type_ == "want_list": - print("want_list before", gpcode_manager.WantBuyCodesManager().list_code_cache()) - gpcode_manager.WantBuyCodesManager().sync() - print("want_list after", gpcode_manager.WantBuyCodesManager().list_code_cache()) - elif type_ == "white_list": - print("white_list before", gpcode_manager.WhiteListCodeManager().list_codes_cache()) - gpcode_manager.WhiteListCodeManager().sync() - print("white_list after", gpcode_manager.WhiteListCodeManager().list_codes_cache()) - elif type_ == "black_list": - print("black_list before", gpcode_manager.BlackListCodeManager().list_codes_cache()) - gpcode_manager.BlackListCodeManager().sync() - print("black_list after", gpcode_manager.BlackListCodeManager().list_codes_cache()) - elif type_ == "pause_buy_list": - print("pause_buy_list before", gpcode_manager.PauseBuyCodesManager().list_code_cache()) - gpcode_manager.PauseBuyCodesManager().sync() - print("pause_buy_list after", gpcode_manager.PauseBuyCodesManager().list_code_cache()) - elif type_ == "trade_state": - print("trade_state before", trade_manager.TradeStateManager().is_can_buy_cache()) - trade_manager.TradeStateManager().sync() - print("trade_state after", trade_manager.TradeStateManager().is_can_buy_cache()) - elif type_ == "trade_mode": - print("trade_mode before", trade_manager.TradeTargetCodeModeManager().get_mode_cache()) - trade_manager.TradeTargetCodeModeManager().sync() - print("trade_mode after", trade_manager.TradeTargetCodeModeManager().get_mode_cache()) - - - - - - - except Exception as e: - logging.exception(e) - finally: - time.sleep(1) - - -def run(pipe_server, queue_other_w_l2_r, queue_l1_trade_r_strategy_w): - logger_system.info("create TradeApiServer") - logger_system.info(f"trade_api_server 绾跨▼ID:{tool.get_thread_id()}") - # 鎷夊彇浜ゆ槗淇℃伅 - huaxin_trade_data_update.run(queue_l1_trade_r_strategy_w, queue_other_w_l2_r) - t1 = threading.Thread(target=lambda: __set_target_codes(queue_other_w_l2_r), daemon=True) - t1.start() - - t1 = threading.Thread(target=lambda: __read_sync_task(pipe_server), daemon=True) - t1.start() - - while True: - time.sleep(5) - - # 鎵樼鐜涓嬩笉寮�鍚帴鍙� - # laddr = "0.0.0.0", 10009 - # tcpserver = MyThreadingTCPServer(laddr, MyBaseRequestHandle) # 娉ㄦ剰锛氬弬鏁版槸MyBaseRequestHandle - # tcpserver.serve_forever() - - -if __name__ == "__main__": - pass diff --git a/servers/huaxin_trade_server.py b/servers/huaxin_trade_server.py index c72c2a5..5623b2b 100644 --- a/servers/huaxin_trade_server.py +++ b/servers/huaxin_trade_server.py @@ -291,59 +291,6 @@ __process_l1_data_thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=10) __updating_jx_blocks_codes = set() - @classmethod - def sell(cls, datas): - rules = TradeRuleManager().list_can_excut_rules_cache(types=[TradeRuleManager.TYPE_SELL]) - excuted_rule_ids = set() - if rules: - for d in datas: - code = d[0] - # 鏍煎紡 (浠g爜,鐜颁环,娑ㄥ箙,閲�,鏇存柊鏃堕棿,涔�1浠锋牸,涔�1閲�) - buy1_volume = d[6] - buy1_price = d[5] - if buy1_volume: - for r in rules: - # 鐢熸晥鏃堕棿 - if r.code == code: - # --------鍒ゆ柇鏄惁鍙互鎵ц-------- - can_excute = False - if round(float(buy1_price), 2) <= round(float(r.buy1_price), 2): - # 浠锋牸宸茬粡瑙﹀彂 - if r.buy1_volume: - if r.buy1_volume >= buy1_volume: - # 閲忎环瑙﹀彂 - can_excute = True - async_log_util.info(logger_trade, f"瑙﹀彂鍗栬鍒欙細閲忚Е鍙憑buy1_volume}/{r.buy1_volume}") - else: - can_excute = True - async_log_util.info(logger_trade, f"瑙﹀彂鍗栬鍒欙細浠锋牸瑙﹀彂{buy1_price}/{r.buy1_price}") - # 浠锋牸瑙﹀彂 - # 鑾峰彇浠锋牸绫诲瀷 - if not can_excute: - continue - - # 璇锋眰鍗栧嚭閿� - TradeRuleManager().require_sell_lock(r.id_) - try: - if r.id_ in excuted_rule_ids: - continue - excuted_rule_ids.add(r.id_) - # 鑾峰彇鏈�鏂扮殑鎵ц鐘跺喌 - r = TradeRuleManager().get_by_id(r.id_) - if r.excuted: - continue - # 鎻愪氦鍗� - limit_down_price = gpcode_manager.get_limit_down_price(code) - limit_up_price = gpcode_manager.get_limit_up_price(code) - huaxin_sell_util.start_sell(code, r.sell_volume, r.sell_price_type, limit_up_price, - limit_down_price, - buy1_price) - TradeRuleManager().excuted(r.id_) - except Exception as e: - logger_debug.exception(e) - finally: - TradeRuleManager().release_sell_lock(r.id_) - # 淇濆瓨鐜颁环 @classmethod def __save_l1_current_price(cls, datas): @@ -400,14 +347,6 @@ else: cls.__process_l1_data_thread_pool.submit( lambda: HuaXinL1TargetCodesManager.set_level_1_codes_datas(datas, request_id=request_id)) - - @classmethod - def set_l1_trade_codes_info(cls, data_json): - data = data_json["data"] - request_id = data_json["request_id"] - datas = data["data"] - cls.__save_l1_current_price(datas) - cls.sell(datas) @classmethod def l2_order(cls, code, _datas, timestamp): @@ -536,13 +475,6 @@ def trading_order_canceled(cls, code, order_no): pass - @classmethod - def test_sell(cls): - # (浠g爜, 鐜颁环, 娑ㄥ箙, 閲�, 鏇存柊鏃堕棿, 涔�1浠锋牸, 涔�1閲�) - datas = [("600571", 12.14, 9.96, 100000000, tool.get_now_time_str(), 12.14, 10210), - ("600571", 12.04, 9.96, 100000000, tool.get_now_time_str(), 12.04, 10210)] - cls.sell(datas) - def clear_invalid_client(): logger_system.info(f"trade_server clear_invalid_client 绾跨▼ID:{tool.get_thread_id()}") @@ -553,28 +485,6 @@ pass finally: time.sleep(2) - - -def __recv_pipe_l1(queue_l1_w_strategy_r: multiprocessing.Queue): - logger_system.info(f"trade_server __recv_pipe_l1 绾跨▼ID:{tool.get_thread_id()}") - if queue_l1_w_strategy_r is not None: - while True: - try: - val = queue_l1_w_strategy_r.get() - if val: - val = json.loads(val) - # print("鏀跺埌鏉ヨ嚜L1鐨勬暟鎹細", val["type"]) - # 澶勭悊鏁版嵁 - type_ = val["type"] - timestamp = val.get("time") - # 澶т簬10s鐨勬暟鎹斁寮冨鐞� - if type_ == "set_target_codes": - async_log_util.info(logger_l2_codes_subscript, f"绛栫暐鎺ユ敹鍒版暟鎹�") - if time.time() * 1000 - timestamp > 10 * 1000: - continue - TradeServerProcessor.set_target_codes(val) - except Exception as e: - logger_debug.exception(e) # 鎺掑緱澶繙鎾ゅ崟 @@ -641,28 +551,6 @@ logger_debug.exception(e) finally: time.sleep(3) - - -def __recv_pipe_l1_trade(queue_l1_trade_w_strategy_r: multiprocessing.Queue): - logger_system.info(f"trade_server __recv_pipe_l1_trade 绾跨▼ID:{tool.get_thread_id()}") - if queue_l1_trade_w_strategy_r is not None: - while True: - try: - val = queue_l1_trade_w_strategy_r.get() - if val: - async_log_util.info(logger_local_huaxin_l1_trade_info, f"瀹㈡埛绔帴鏀讹細{val}") - val = json.loads(val) - # print("鏀跺埌鏉ヨ嚜L1鐨勬暟鎹細", val["type"]) - # 澶勭悊鏁版嵁 - type_ = val["type"] - if type_ == "upload_l1_trade_datas": - # 澶勭悊涓撲负浜ゆ槗鎻愪緵鐨凩1鏁版嵁 - TradeServerProcessor.set_l1_trade_codes_info(val) - async_log_util.info(logger_local_huaxin_l1_trade_info, val) - - except Exception as e: - logger_local_huaxin_l1_trade_info.exception(e) - logging.exception(e) class MyL2DataCallback(l2_data_transform_protocol.L2DataCallBack): @@ -873,7 +761,8 @@ result_by_volume = radical_buy_strategy.process_limit_up_active_buy_deal(code, transaction_datas) async_log_util.info(logger_l2_radical_buy, f"閲忎拱鍏ョ粨鏋滃垽鏂細{code}, 缁撴灉锛歿result_by_volume} 鏉垮潡锛歿buy_blocks}") in_blocks = RealTimeKplMarketData.get_top_market_jingxuan_blocks() - buy_blocks_with_money = [(b, RealTimeKplMarketData.get_jx_block_in_money(b),in_blocks.index(b) if b in in_blocks else -1) for b in buy_blocks] + buy_blocks_with_money = [(b, RealTimeKplMarketData.get_jx_block_in_money(b), + in_blocks.index(b) if b in in_blocks else -1) for b in buy_blocks] if result_by_volume[0] != radical_buy_strategy.BUY_MODE_NONE: if tool.get_now_time_as_int() < 93200: radical_buy_data_manager.ExcludeIndexComputeCodesManager.add_code(code) @@ -1018,19 +907,17 @@ threading.Thread(target=run_pending, daemon=True).start() l2_data_util.load_l2_data_all(True) + # L2鎴愪氦淇″彿鍥炶皟 L2TradeSingleDataManager.set_callback(MyL2TradeSingleCallback()) # 鍔犺浇鑷敱娴侀�氶噺 global_data_loader.load_zyltgb_volume_from_db() -def run(queue_strategy_r_trade_w, queue_l1_w_strategy_r, queue_strategy_w_trade_r, queue_strategy_w_trade_r_for_read, - queue_l1_trade_w_strategy_r, trade_ipc_addr): +def run(queue_strategy_r_trade_w, queue_strategy_w_trade_r, queue_strategy_w_trade_r_for_read, trade_ipc_addr): """ @param queue_strategy_r_trade_w: - @param queue_l1_w_strategy_r: @param queue_strategy_w_trade_r: @param queue_strategy_w_trade_r_for_read: - @param queue_l1_trade_w_strategy_r: @param trade_ipc_addr: 浜ゆ槗IPC鍦板潃锛氾紙涓嬪崟ipc鍦板潃,鎾ゅ崟ipc鍦板潃锛� @return: """ @@ -1050,32 +937,15 @@ huaxin_trade_api.run_pipe_trade(queue_strategy_r_trade_w, queue_strategy_w_trade_r, queue_strategy_w_trade_r_for_read, trade_ipc_addr) - # 鐩戝惉l1閭h竟浼犺繃鏉ョ殑浠g爜 - t1 = threading.Thread(target=lambda: __recv_pipe_l1(queue_l1_w_strategy_r), daemon=True) - t1.start() - - # 鐩戝惉l1浜ゆ槗閭h竟浼犺繃鏉ョ殑浠g爜 - t1 = threading.Thread(target=lambda: __recv_pipe_l1_trade(queue_l1_trade_w_strategy_r), daemon=True) - t1.start() - # 涓嬪崟璺濈澶繙鍙栨秷璁㈠崟 t1 = threading.Thread(target=lambda: __cancel_buy_for_too_far(), daemon=True) t1.start() - # 鍚屾寮傛鏃ュ織 - t1 = threading.Thread(target=lambda: async_log_util.run_sync(), daemon=True) - t1.start() - - # 鍚屾L2鐨勫紓姝ユ棩蹇� - l2_log.codeLogQueueDistributeManager.run_async() - - t1 = threading.Thread(target=lambda: async_log_util.l2_data_log.run_sync(), daemon=True) - t1.start() - - logger_system.info("create TradeServer") + # 娓呯悊鏃犵敤鐨勫鎴风 t1 = threading.Thread(target=lambda: clear_invalid_client(), daemon=True) t1.start() + logger_system.info("create TradeServer") laddr = "0.0.0.0", 10008 try: tcpserver = MyThreadingTCPServer(laddr, MyBaseRequestHandle) # 娉ㄦ剰锛氬弬鏁版槸MyBaseRequestHandle diff --git a/task/task_manager.py b/task/task_manager.py new file mode 100644 index 0000000..de8ed51 --- /dev/null +++ b/task/task_manager.py @@ -0,0 +1,111 @@ +import json +import logging +import multiprocessing +import threading +import time + +from db import redis_manager_delegate as redis_manager +from l2 import l2_log +from l2.huaxin import huaxin_target_codes_manager +from log_module import async_log_util +from log_module.log import logger_system, logger_l2_codes_subscript, logger_debug +from servers.huaxin_trade_server import TradeServerProcessor +from third_data import block_info +from trade.huaxin import huaxin_trade_data_update +from trade.huaxin.huaxin_trade_api import ClientSocketManager +from utils import tool + + +def __listen_l1_target_codes(queue_l1_w_strategy_r: multiprocessing.Queue): + logger_system.info(f"__listen_l1_target_codes 绾跨▼ID:{tool.get_thread_id()}") + if queue_l1_w_strategy_r is not None: + while True: + try: + val = queue_l1_w_strategy_r.get() + if val: + val = json.loads(val) + # print("鏀跺埌鏉ヨ嚜L1鐨勬暟鎹細", val["type"]) + # 澶勭悊鏁版嵁 + type_ = val["type"] + timestamp = val.get("time") + # 澶т簬10s鐨勬暟鎹斁寮冨鐞� + if type_ == "set_target_codes": + async_log_util.info(logger_l2_codes_subscript, f"绛栫暐鎺ユ敹鍒版暟鎹�") + if time.time() * 1000 - timestamp > 10 * 1000: + continue + TradeServerProcessor.set_target_codes(val) + except Exception as e: + logger_debug.exception(e) + + +def __listen_l2_subscript_target_codes(queue_other_w_l2_r: multiprocessing.Queue): + """ + 鐩戝惉L2璁㈤槄鐩爣浠g爜 + @param queue_other_w_l2_r: + @return: + """ + logger_system.info("鍚姩璇诲彇L2璁㈤槄浠g爜闃熷垪") + while True: + try: + _datas = huaxin_target_codes_manager.HuaXinL2SubscriptCodesManager.pop() + if _datas: + times = _datas[0] + datas = _datas[1] + request_id = _datas[2] + logger_l2_codes_subscript.info("({})璇诲彇L2浠g爜澶勭悊闃熷垪锛氭暟閲�-{}", request_id, len(datas)) + # 鍙鐞�20s鍐呯殑鏁版嵁 + if time.time() - times < 20: + # 鑾峰彇娑ㄥ仠鍒楄〃涓殑鏁版嵁 + # datas涓殑鏁版嵁鏍煎紡:(浠g爜, 鐜颁环, 娑ㄥ箙, 閲�, 鏃堕棿) + if not datas: + # 娌℃湁鏁版嵁闇�瑕佸鐞� + continue + + # 鍐嶆鑾峰彇浠g爜 + codes = [d[0] for d in datas] + for code in codes: + block_info.init_code(code) + root_data = {"type": ClientSocketManager.CLIENT_TYPE_CMD_L2, + "data": datas} + queue_other_w_l2_r.put_nowait(json.dumps(root_data)) + # 濡傛灉鍦�9:25-9:29 闇�瑕佸姞杞芥澘鍧� + # if int("092500") < int(tool.get_now_time_str().replace(":", "")) < int("092900"): + # for d in datas: + # threading.Thread(target=lambda: KPLCodeJXBlockManager().load_jx_blocks(d[0], + # gpcode_manager.get_price( + # d[0]), + # float(d[2]), + # KPLLimitUpDataRecordManager.get_current_reasons()), + # daemon=True).start() + # time.sleep(0.2) + logger_l2_codes_subscript.info("({})鍙戦�佸埌鍗庨懌L2浠g爜澶勭悊闃熷垪锛氭暟閲�-{}", request_id, len(datas)) + except Exception as e: + logging.exception(e) + logger_l2_codes_subscript.exception(e) + finally: + time.sleep(0.01) + + +def run_data_listener(queue_other_w_l2_r, queue_l1_w_strategy_r): + """ + 杩愯鏁版嵁鐩戝惉鍣� + @param queue_other_w_l2_r: + @return: + """ + # 浜ゆ槗鏁版嵁鏇存柊浠诲姟 + huaxin_trade_data_update.run() + + # 鎺ユ敹鏉ヨ嚜L1鐨勬暟鎹� + threading.Thread(target=lambda: __listen_l1_target_codes(queue_l1_w_strategy_r), daemon=True).start() + + # 鎺ユ敹L2璁㈤槄 + threading.Thread(target=lambda: __listen_l2_subscript_target_codes(queue_other_w_l2_r), daemon=True).start() + # 杩愯寮傛redis鍚屾鏈嶅姟 + threading.Thread(target=redis_manager.RedisUtils.run_loop, name="redis", daemon=True).start() + # 鍚屾寮傛鏃ュ織 + threading.Thread(target=lambda: async_log_util.run_sync(), daemon=True).start() + # 鍚屾L2鐨勫紓姝ユ棩蹇� + l2_log.codeLogQueueDistributeManager.run_async() + threading.Thread(target=lambda: async_log_util.l2_data_log.run_sync(), daemon=True).start() + while True: + time.sleep(5) diff --git a/trade/huaxin/huaxin_trade_data_update.py b/trade/huaxin/huaxin_trade_data_update.py index d07fa3a..8a7b4d5 100644 --- a/trade/huaxin/huaxin_trade_data_update.py +++ b/trade/huaxin/huaxin_trade_data_update.py @@ -1,7 +1,6 @@ """ 鍗庨懌浜ゆ槗鏁版嵁鏇存柊 """ -import json import logging import queue import threading @@ -16,7 +15,7 @@ from trade.huaxin import huaxin_trade_api, huaxin_trade_record_manager from trade.huaxin.huaxin_trade_order_processor import HuaxinOrderEntity, TradeResultProcessor -from utils import huaxin_util, tool, init_data_util +from utils import huaxin_util import concurrent.futures trade_data_request_queue = queue.Queue() @@ -185,9 +184,6 @@ # 杩愯 -def run(queue_l1_trade_r_strategy_w_, queue_other_w_l2_r_): - global queue_l1_trade_r_strategy_w, queue_other_w_l2_r - queue_l1_trade_r_strategy_w = queue_l1_trade_r_strategy_w_ - queue_other_w_l2_r = queue_other_w_l2_r_ +def run(): t1 = threading.Thread(target=lambda: __read_update_task_queue(), daemon=True) t1.start() -- Gitblit v1.8.0