From 6da815f5ecdedca45e8fc99989115fbf2b9b570c Mon Sep 17 00:00:00 2001 From: Administrator <admin@example.com> Date: 星期三, 26 七月 2023 12:39:28 +0800 Subject: [PATCH] bug修复 --- trade/huaxin/trade_server.py | 8 trade/huaxin/huaxin_trade_api.py | 14 ++ trade/huaxin/trade_api_server.py | 125 +++--------------------- trade/huaxin/huaxin_trade_data_update.py | 114 ++++++++++++++++++++++ 4 files changed, 148 insertions(+), 113 deletions(-) diff --git a/trade/huaxin/huaxin_trade_api.py b/trade/huaxin/huaxin_trade_api.py index 68eb204..ccb85ea 100644 --- a/trade/huaxin/huaxin_trade_api.py +++ b/trade/huaxin/huaxin_trade_api.py @@ -8,6 +8,7 @@ import time from log_module.log import hx_logger_trade_debug, hx_logger_trade_loop +from trade.huaxin import huaxin_trade_data_update from utils import socket_util @@ -231,7 +232,11 @@ "price_type": price_type, "price": price, "sinfo": f"b_{code}_{round(time.time() * 1000)}"}) - return __read_response(client, request_id, blocking) + try: + return __read_response(client, request_id, blocking) + finally: + huaxin_trade_data_update.add_delegate_list() + huaxin_trade_data_update.add_money_list() def cancel_order(direction, code, orderSysID, blocking=True): @@ -240,8 +245,11 @@ "direction": direction, "code": code, "orderSysID": orderSysID, "sinfo": f"cb_{code}_{round(time.time() * 1000)}"}) - - return __read_response(client, request_id, blocking) + try: + return __read_response(client, request_id, blocking) + finally: + huaxin_trade_data_update.add_delegate_list() + huaxin_trade_data_update.add_money_list() # CLIENT_TYPE_DELEGATE_LIST = "delegate_list" diff --git a/trade/huaxin/huaxin_trade_data_update.py b/trade/huaxin/huaxin_trade_data_update.py new file mode 100644 index 0000000..e3d9664 --- /dev/null +++ b/trade/huaxin/huaxin_trade_data_update.py @@ -0,0 +1,114 @@ +""" +鍗庨懌浜ゆ槗鏁版嵁鏇存柊 +""" +import logging +import queue +import threading +import time + +from log_module.log import hx_logger_trade_debug +from trade import trade_huaxin, trade_manager +from trade.huaxin import huaxin_trade_api, huaxin_trade_record_manager +from utils import huaxin_util + +trade_data_request_queue = queue.Queue() + + +def __read_trade_data_queue(): + while True: + try: + data = trade_data_request_queue.get() + if data: + type_ = data["type"] + hx_logger_trade_debug.info(f"鑾峰彇浜ゆ槗鏁版嵁寮�濮嬶細{type_}") + try: + if type_ == "delegate_list": + dataJSON = huaxin_trade_api.get_delegate_list(can_cancel=False) + print("鑾峰彇濮旀墭鍒楄〃", dataJSON) + if dataJSON["code"] == 0: + data = dataJSON["data"] + huaxin_trade_record_manager.DelegateRecordManager.add(data) + # 鏄惁鍙互鎾ゅ崟 + if data: + codes = [] + for d in data: + if huaxin_util.is_can_cancel(d["orderStatus"]): + codes.append(d["securityID"]) + # 璁剧疆涓嬪崟鎴愬姛 + trade_huaxin.order_success(d['securityID'], + d['accountID'], + d['orderSysID']) + + if codes: + try: + trade_manager.process_trade_delegate_data([{"code": c} for c in codes]) + except Exception as e: + logging.exception(e) + elif type_ == "money": + dataJSON = huaxin_trade_api.get_money() + if dataJSON["code"] == 0: + data = dataJSON["data"] + huaxin_trade_record_manager.MoneyManager.save_data(data) + if data: + usefulMoney = data[0]["usefulMoney"] + # 璁剧疆鍙敤璧勯噾 + trade_manager.set_available_money(0, usefulMoney) + # 璁剧疆鍙敤璧勯噾 + elif type_ == "deal_list": + dataJSON = huaxin_trade_api.get_deal_list() + print("鎴愪氦鍝嶅簲锛�", dataJSON) + if dataJSON["code"] == 0: + datas = dataJSON["data"] + huaxin_trade_record_manager.DealRecordManager.add(datas) + if datas: + tempList = [ + {"time": d["tradeTime"], "type": int(d['direction']), "code": d['securityID']} + for d in datas] + try: + trade_manager.process_trade_success_data(tempList) + except Exception as e: + logging.exception(e) + # 鎸佷粨鑲� + elif type_ == "position_list": + dataJSON = huaxin_trade_api.get_position_list() + if dataJSON["code"] == 0: + data = dataJSON["data"] + huaxin_trade_record_manager.PositionManager.add(data) + + hx_logger_trade_debug.info(f"鑾峰彇浜ゆ槗鏁版嵁鎴愬姛锛歿type_}") + except Exception as e1: + if str(e1).find("瓒呮椂") >= 0: + # 璇诲彇缁撴灉瓒呮椂闇�瑕侀噸鏂拌姹� + trade_data_request_queue.put_nowait({"type": type_}) + raise e1 + except Exception as e: + hx_logger_trade_debug.exception(e) + finally: + # 鏈�0.1s鐨勯棿闅� + time.sleep(0.1) + + +def __add_data(data): + trade_data_request_queue.put_nowait(data) + + +def add_delegate_list(): + __add_data({"type": "delegate_list"}) + + +def add_deal_list(): + __add_data({"type": "deal_list"}) + + +def add_money_list(): + __add_data({"type": "money"}) + + +def add_position_list(): + __add_data({"type": "position_list"}) + + +# 杩愯 +def run(): + t1 = threading.Thread(target=lambda: __read_trade_data_queue(), daemon=True) + t1.start() diff --git a/trade/huaxin/trade_api_server.py b/trade/huaxin/trade_api_server.py index d0c9533..4dfc024 100644 --- a/trade/huaxin/trade_api_server.py +++ b/trade/huaxin/trade_api_server.py @@ -1,27 +1,21 @@ import hashlib import json import logging -import queue -import random import socket import socketserver import threading import time - -import constant import inited_data from code_attribute import gpcode_manager from l2 import l2_data_manager_new from l2.huaxin import huaxin_target_codes_manager -from log_module.log import hx_logger_trade_debug from third_data import block_info from third_data.history_k_data_util import HistoryKDatasUtils, JueJinApi from trade import trade_manager, trade_huaxin -from trade.huaxin import huaxin_trade_api as trade_api, huaxin_trade_api, huaxin_trade_record_manager +from trade.huaxin import huaxin_trade_api, huaxin_trade_record_manager, \ + huaxin_trade_data_update from utils import socket_util, tool, huaxin_util, data_export_util - -trade_data_request_queue = queue.Queue() class MyTCPServer(socketserver.TCPServer): @@ -94,8 +88,8 @@ raise Exception("鐜颁环鑾峰彇澶辫触") price = prices[0][1] # 涓嬪崟 - result = trade_api.order(trade_api.TRADE_DIRECTION_BUY, code, volume, - round(float(price), 2)) + result = huaxin_trade_api.order(huaxin_trade_api.TRADE_DIRECTION_BUY, code, volume, + round(float(price), 2)) if result: resultJSON = result print("涓嬪崟缁撴灉锛�", resultJSON) @@ -119,9 +113,8 @@ return_str = json.dumps({"code": 0}) finally: # 鏇存柊濮旀墭鍒楄〃 - trade_data_request_queue.put_nowait({"type": "delegate_list"}) - # 鏇存柊璧勯噾 - trade_data_request_queue.put_nowait({"type": "money"}) + huaxin_trade_data_update.add_delegate_list() + huaxin_trade_data_update.add_money_list() else: raise Exception(resultJSON['msg']) break @@ -136,7 +129,7 @@ orderSysID = codes_data.get("orderSysID") accountId = codes_data.get("accountId") if code and orderSysID and accountId: - result = trade_api.cancel_order(trade_api.TRADE_DIRECTION_BUY, code, orderSysID, True) + result = huaxin_trade_api.cancel_order(huaxin_trade_api.TRADE_DIRECTION_BUY, code, orderSysID, True) print("---鎾ゅ崟缁撴灉----") print(result) if result["code"] == 0: @@ -162,10 +155,6 @@ return_str = json.dumps({"code": 1, "msg": "鏈浜庡彲鎾ゅ崟鐘舵��"}) else: return_str = json.dumps({"code": 1, "msg": "璇蜂笂浼犱唬鐮�"}) - # 鏇存柊濮旀墭鍒楄〃 - trade_data_request_queue.put_nowait({"type": "delegate_list"}) - # 鏇存柊璧勯噾 - trade_data_request_queue.put_nowait({"type": "money"}) break elif type_ == 'sell': @@ -193,14 +182,13 @@ # 宸茬幇浠风殑5妗d环鍗� price = prices[0][1] - 0.04 - result = trade_api.order(trade_api.TRADE_DIRECTION_SELL, code, volume, price) + result = huaxin_trade_api.order(huaxin_trade_api.TRADE_DIRECTION_SELL, code, volume, price) if result["code"] == 0: if result["data"]["orderStatus"] == huaxin_util.TORA_TSTP_OST_Rejected or ( type(result["data"]["orderStatus"]) == int and result["data"]["orderStatus"] < 0): raise Exception(result["data"]["statusMsg"]) else: return_str = json.dumps({"code": 0, "msg": ""}) - trade_data_request_queue.put_nowait({"type": "delegate_list"}) else: raise Exception(result["msg"]) @@ -244,13 +232,13 @@ # 鍚屾浜ゆ槗鏁版嵁 sync_type = data_json["data"]["type"] if sync_type == "delegate_list": - trade_data_request_queue.put_nowait({"type": "delegate_list"}) + huaxin_trade_data_update.add_delegate_list() elif sync_type == "deal_list": - trade_data_request_queue.put_nowait({"type": "deal_list"}) + huaxin_trade_data_update.add_deal_list() elif sync_type == "money": - trade_data_request_queue.put_nowait({"type": "money"}) + huaxin_trade_data_update.add_money_list() elif sync_type == "position_list": - trade_data_request_queue.put_nowait({"type": "position_list"}) + huaxin_trade_data_update.add_position_list() return_str = json.dumps( {"code": 0, "data": {}, "msg": ""}) elif type_ == "get_huaxin_subscript_codes": @@ -319,32 +307,32 @@ elif type_ == 'test': # 鍗栧嚭 # trade_api.order(trade_api.TRADE_DIRECTION_SELL, "600854", 100, 5.45) - result = trade_api.get_deal_list() + result = huaxin_trade_api.get_deal_list() print("\n\n---鎴愪氦鍒楄〃----") for d in result["data"]: print(d) - result = trade_api.get_delegate_list(True) + result = huaxin_trade_api.get_delegate_list(True) print("\n\n---鍙挙濮旀墭----") for d in result["data"]: print(d) - result = trade_api.get_delegate_list(False) + result = huaxin_trade_api.get_delegate_list(False) print("\n\n---鍏ㄩ儴濮旀墭----") for d in result["data"]: print(d) - result = trade_api.get_position_list() + result = huaxin_trade_api.get_position_list() print("\n\n---鎸佷粨鍒楄〃----") for d in result["data"]: print(d) - result = trade_api.get_money() + result = huaxin_trade_api.get_money() print("\n\n---璐︽埛鍒楄〃----") for d in result["data"]: print(d) elif type_ == 'test_l2': codes_data = data_json["data"] - result = trade_api.set_l2_codes_data(codes_data) + result = huaxin_trade_api.set_l2_codes_data(codes_data) print("\n\n---L2璁剧疆缁撴灉----") print(result) break @@ -358,80 +346,6 @@ def finish(self): super().finish() - - -def __read_trade_data_queue(): - while True: - try: - data = trade_data_request_queue.get() - if data: - type_ = data["type"] - hx_logger_trade_debug.info(f"鑾峰彇浜ゆ槗鏁版嵁寮�濮嬶細{type_}") - try: - if type_ == "delegate_list": - dataJSON = huaxin_trade_api.get_delegate_list(can_cancel=False) - print("鑾峰彇濮旀墭鍒楄〃", dataJSON) - if dataJSON["code"] == 0: - data = dataJSON["data"] - huaxin_trade_record_manager.DelegateRecordManager.add(data) - # 鏄惁鍙互鎾ゅ崟 - if data: - codes = [] - for d in data: - if huaxin_util.is_can_cancel(d["orderStatus"]): - codes.append(d["securityID"]) - # 璁剧疆涓嬪崟鎴愬姛 - trade_huaxin.order_success(d['securityID'], - d['accountID'], - d['orderSysID']) - - if codes: - try: - trade_manager.process_trade_delegate_data([{"code": c} for c in codes]) - except Exception as e: - logging.exception(e) - elif type_ == "money": - dataJSON = huaxin_trade_api.get_money() - if dataJSON["code"] == 0: - data = dataJSON["data"] - huaxin_trade_record_manager.MoneyManager.save_data(data) - if data: - usefulMoney = data[0]["usefulMoney"] - # 璁剧疆鍙敤璧勯噾 - trade_manager.set_available_money(0, usefulMoney) - # 璁剧疆鍙敤璧勯噾 - elif type_ == "deal_list": - dataJSON = huaxin_trade_api.get_deal_list() - print("鎴愪氦鍝嶅簲锛�", dataJSON) - if dataJSON["code"] == 0: - datas = dataJSON["data"] - huaxin_trade_record_manager.DealRecordManager.add(datas) - if datas: - tempList = [ - {"time": d["tradeTime"], "type": int(d['direction']), "code": d['securityID']} - for d in datas] - try: - trade_manager.process_trade_success_data(tempList) - except Exception as e: - logging.exception(e) - # 鎸佷粨鑲� - elif type_ == "position_list": - dataJSON = huaxin_trade_api.get_position_list() - if dataJSON["code"] == 0: - data = dataJSON["data"] - huaxin_trade_record_manager.PositionManager.add(data) - - hx_logger_trade_debug.info(f"鑾峰彇浜ゆ槗鏁版嵁鎴愬姛锛歿type_}") - except Exception as e1: - if str(e1).find("瓒呮椂") >= 0: - # 璇诲彇缁撴灉瓒呮椂闇�瑕侀噸鏂拌姹� - trade_data_request_queue.put_nowait({"type": type_}) - raise e1 - except Exception as e: - hx_logger_trade_debug.exception(e) - finally: - # 鏈�1s鐨勯棿闅� - time.sleep(1) def __set_target_codes(): @@ -453,8 +367,7 @@ def run(): print("create TradeApiServer") # 鎷夊彇浜ゆ槗淇℃伅 - t1 = threading.Thread(target=lambda: __read_trade_data_queue(), daemon=True) - t1.start() + huaxin_trade_data_update.run() t1 = threading.Thread(target=lambda: __set_target_codes(), daemon=True) t1.start() diff --git a/trade/huaxin/trade_server.py b/trade/huaxin/trade_server.py index 0ad3ac8..fb4ce57 100644 --- a/trade/huaxin/trade_server.py +++ b/trade/huaxin/trade_server.py @@ -21,7 +21,7 @@ from third_data.code_plate_key_manager import KPLCodeJXBlockManager from trade import deal_big_money_manager, current_price_process_manager -from trade.huaxin import huaxin_trade_api as trade_api, trade_api_server, huaxin_trade_api +from trade.huaxin import huaxin_trade_api as trade_api, trade_api_server, huaxin_trade_api, huaxin_trade_data_update from utils import socket_util trade_data_request_queue = queue.Queue() @@ -150,9 +150,9 @@ # 璁板綍浜ゆ槗鍙嶉鏃ュ織 hx_logger_trade_callback.info(data_json) # 閲嶆柊璇锋眰濮旀墭鍒楄〃涓庤祫閲� - trade_api_server.trade_data_request_queue.put_nowait({"type": "delegate_list"}) - trade_api_server.trade_data_request_queue.put_nowait({"type": "money"}) - trade_api_server.trade_data_request_queue.put_nowait({"type": "deal_list"}) + huaxin_trade_data_update.add_delegate_list() + huaxin_trade_data_update.add_deal_list() + huaxin_trade_data_update.add_money_list() # print("鍝嶅簲缁撴灉锛�", data_json['data']) finally: sk.sendall(socket_util.load_header(json.dumps({"code": 0}).encode(encoding='utf-8'))) -- Gitblit v1.8.0