Administrator
2023-07-26 6da815f5ecdedca45e8fc99989115fbf2b9b570c
bug修复
3个文件已修改
1个文件已添加
255 ■■■■■ 已修改文件
trade/huaxin/huaxin_trade_api.py 10 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/huaxin/huaxin_trade_data_update.py 114 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/huaxin/trade_api_server.py 123 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/huaxin/trade_server.py 8 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/huaxin/huaxin_trade_api.py
@@ -8,6 +8,7 @@
import time
from log_module.log import hx_logger_trade_debug, hx_logger_trade_loop
from trade.huaxin import huaxin_trade_data_update
from utils import socket_util
@@ -231,7 +232,11 @@
                                    "price_type": price_type,
                                    "price": price, "sinfo": f"b_{code}_{round(time.time() * 1000)}"})
    try:
    return __read_response(client, request_id, blocking)
    finally:
        huaxin_trade_data_update.add_delegate_list()
        huaxin_trade_data_update.add_money_list()
def cancel_order(direction, code, orderSysID, blocking=True):
@@ -240,8 +245,11 @@
                                    "direction": direction,
                                    "code": code,
                                    "orderSysID": orderSysID, "sinfo": f"cb_{code}_{round(time.time() * 1000)}"})
    try:
    return __read_response(client, request_id, blocking)
    finally:
        huaxin_trade_data_update.add_delegate_list()
        huaxin_trade_data_update.add_money_list()
# CLIENT_TYPE_DELEGATE_LIST = "delegate_list"
trade/huaxin/huaxin_trade_data_update.py
New file
@@ -0,0 +1,114 @@
"""
华鑫交易数据更新
"""
import logging
import queue
import threading
import time
from log_module.log import hx_logger_trade_debug
from trade import trade_huaxin, trade_manager
from trade.huaxin import huaxin_trade_api, huaxin_trade_record_manager
from utils import huaxin_util
trade_data_request_queue = queue.Queue()
def __read_trade_data_queue():
    while True:
        try:
            data = trade_data_request_queue.get()
            if data:
                type_ = data["type"]
                hx_logger_trade_debug.info(f"获取交易数据开始:{type_}")
                try:
                    if type_ == "delegate_list":
                        dataJSON = huaxin_trade_api.get_delegate_list(can_cancel=False)
                        print("获取委托列表", dataJSON)
                        if dataJSON["code"] == 0:
                            data = dataJSON["data"]
                            huaxin_trade_record_manager.DelegateRecordManager.add(data)
                            # 是否可以撤单
                            if data:
                                codes = []
                                for d in data:
                                    if huaxin_util.is_can_cancel(d["orderStatus"]):
                                        codes.append(d["securityID"])
                                        # 设置下单成功
                                        trade_huaxin.order_success(d['securityID'],
                                                                   d['accountID'],
                                                                   d['orderSysID'])
                                if codes:
                                    try:
                                        trade_manager.process_trade_delegate_data([{"code": c} for c in codes])
                                    except Exception as e:
                                        logging.exception(e)
                    elif type_ == "money":
                        dataJSON = huaxin_trade_api.get_money()
                        if dataJSON["code"] == 0:
                            data = dataJSON["data"]
                            huaxin_trade_record_manager.MoneyManager.save_data(data)
                            if data:
                                usefulMoney = data[0]["usefulMoney"]
                                # 设置可用资金
                                trade_manager.set_available_money(0, usefulMoney)
                            # 设置可用资金
                    elif type_ == "deal_list":
                        dataJSON = huaxin_trade_api.get_deal_list()
                        print("成交响应:", dataJSON)
                        if dataJSON["code"] == 0:
                            datas = dataJSON["data"]
                            huaxin_trade_record_manager.DealRecordManager.add(datas)
                            if datas:
                                tempList = [
                                    {"time": d["tradeTime"], "type": int(d['direction']), "code": d['securityID']}
                                    for d in datas]
                                try:
                                    trade_manager.process_trade_success_data(tempList)
                                except Exception as e:
                                    logging.exception(e)
                    # 持仓股
                    elif type_ == "position_list":
                        dataJSON = huaxin_trade_api.get_position_list()
                        if dataJSON["code"] == 0:
                            data = dataJSON["data"]
                            huaxin_trade_record_manager.PositionManager.add(data)
                    hx_logger_trade_debug.info(f"获取交易数据成功:{type_}")
                except Exception as e1:
                    if str(e1).find("超时") >= 0:
                        # 读取结果超时需要重新请求
                        trade_data_request_queue.put_nowait({"type": type_})
                    raise e1
        except Exception as e:
            hx_logger_trade_debug.exception(e)
        finally:
            # 有0.1s的间隔
            time.sleep(0.1)
def __add_data(data):
    trade_data_request_queue.put_nowait(data)
def add_delegate_list():
    __add_data({"type": "delegate_list"})
def add_deal_list():
    __add_data({"type": "deal_list"})
def add_money_list():
    __add_data({"type": "money"})
def add_position_list():
    __add_data({"type": "position_list"})
# 运行
def run():
    t1 = threading.Thread(target=lambda: __read_trade_data_queue(), daemon=True)
    t1.start()
trade/huaxin/trade_api_server.py
@@ -1,27 +1,21 @@
import hashlib
import json
import logging
import queue
import random
import socket
import socketserver
import threading
import time
import constant
import inited_data
from code_attribute import gpcode_manager
from l2 import l2_data_manager_new
from l2.huaxin import huaxin_target_codes_manager
from log_module.log import hx_logger_trade_debug
from third_data import block_info
from third_data.history_k_data_util import HistoryKDatasUtils, JueJinApi
from trade import trade_manager, trade_huaxin
from trade.huaxin import huaxin_trade_api as trade_api, huaxin_trade_api, huaxin_trade_record_manager
from trade.huaxin import huaxin_trade_api, huaxin_trade_record_manager, \
    huaxin_trade_data_update
from utils import socket_util, tool, huaxin_util, data_export_util
trade_data_request_queue = queue.Queue()
class MyTCPServer(socketserver.TCPServer):
@@ -94,7 +88,7 @@
                                    raise Exception("现价获取失败")
                                price = prices[0][1]
                            # 下单
                            result = trade_api.order(trade_api.TRADE_DIRECTION_BUY, code, volume,
                            result = huaxin_trade_api.order(huaxin_trade_api.TRADE_DIRECTION_BUY, code, volume,
                                                     round(float(price), 2))
                            if result:
                                resultJSON = result
@@ -119,9 +113,8 @@
                                            return_str = json.dumps({"code": 0})
                                    finally:
                                        # 更新委托列表
                                        trade_data_request_queue.put_nowait({"type": "delegate_list"})
                                        # 更新资金
                                        trade_data_request_queue.put_nowait({"type": "money"})
                                        huaxin_trade_data_update.add_delegate_list()
                                        huaxin_trade_data_update.add_money_list()
                                else:
                                    raise Exception(resultJSON['msg'])
                            break
@@ -136,7 +129,7 @@
                        orderSysID = codes_data.get("orderSysID")
                        accountId = codes_data.get("accountId")
                        if code and orderSysID and accountId:
                            result = trade_api.cancel_order(trade_api.TRADE_DIRECTION_BUY, code, orderSysID, True)
                            result = huaxin_trade_api.cancel_order(huaxin_trade_api.TRADE_DIRECTION_BUY, code, orderSysID, True)
                            print("---撤单结果----")
                            print(result)
                            if result["code"] == 0:
@@ -162,10 +155,6 @@
                                return_str = json.dumps({"code": 1, "msg": "未处于可撤单状态"})
                        else:
                            return_str = json.dumps({"code": 1, "msg": "请上传代码"})
                        # 更新委托列表
                        trade_data_request_queue.put_nowait({"type": "delegate_list"})
                        # 更新资金
                        trade_data_request_queue.put_nowait({"type": "money"})
                        break
                    elif type_ == 'sell':
@@ -193,14 +182,13 @@
                            # 已现价的5档价卖
                            price = prices[0][1] - 0.04
                        result = trade_api.order(trade_api.TRADE_DIRECTION_SELL, code, volume, price)
                        result = huaxin_trade_api.order(huaxin_trade_api.TRADE_DIRECTION_SELL, code, volume, price)
                        if result["code"] == 0:
                            if result["data"]["orderStatus"] == huaxin_util.TORA_TSTP_OST_Rejected or (
                                    type(result["data"]["orderStatus"]) == int and result["data"]["orderStatus"] < 0):
                                raise Exception(result["data"]["statusMsg"])
                            else:
                                return_str = json.dumps({"code": 0, "msg": ""})
                                trade_data_request_queue.put_nowait({"type": "delegate_list"})
                        else:
                            raise Exception(result["msg"])
@@ -244,13 +232,13 @@
                        # 同步交易数据
                        sync_type = data_json["data"]["type"]
                        if sync_type == "delegate_list":
                            trade_data_request_queue.put_nowait({"type": "delegate_list"})
                            huaxin_trade_data_update.add_delegate_list()
                        elif sync_type == "deal_list":
                            trade_data_request_queue.put_nowait({"type": "deal_list"})
                            huaxin_trade_data_update.add_deal_list()
                        elif sync_type == "money":
                            trade_data_request_queue.put_nowait({"type": "money"})
                            huaxin_trade_data_update.add_money_list()
                        elif sync_type == "position_list":
                            trade_data_request_queue.put_nowait({"type": "position_list"})
                            huaxin_trade_data_update.add_position_list()
                        return_str = json.dumps(
                            {"code": 0, "data": {}, "msg": ""})
                    elif type_ == "get_huaxin_subscript_codes":
@@ -319,32 +307,32 @@
                    elif type_ == 'test':
                        # 卖出
                        # trade_api.order(trade_api.TRADE_DIRECTION_SELL, "600854", 100, 5.45)
                        result = trade_api.get_deal_list()
                        result = huaxin_trade_api.get_deal_list()
                        print("\n\n---成交列表----")
                        for d in result["data"]:
                            print(d)
                        result = trade_api.get_delegate_list(True)
                        result = huaxin_trade_api.get_delegate_list(True)
                        print("\n\n---可撤委托----")
                        for d in result["data"]:
                            print(d)
                        result = trade_api.get_delegate_list(False)
                        result = huaxin_trade_api.get_delegate_list(False)
                        print("\n\n---全部委托----")
                        for d in result["data"]:
                            print(d)
                        result = trade_api.get_position_list()
                        result = huaxin_trade_api.get_position_list()
                        print("\n\n---持仓列表----")
                        for d in result["data"]:
                            print(d)
                        result = trade_api.get_money()
                        result = huaxin_trade_api.get_money()
                        print("\n\n---账户列表----")
                        for d in result["data"]:
                            print(d)
                    elif type_ == 'test_l2':
                        codes_data = data_json["data"]
                        result = trade_api.set_l2_codes_data(codes_data)
                        result = huaxin_trade_api.set_l2_codes_data(codes_data)
                        print("\n\n---L2设置结果----")
                        print(result)
                break
@@ -358,80 +346,6 @@
    def finish(self):
        super().finish()
def __read_trade_data_queue():
    while True:
        try:
            data = trade_data_request_queue.get()
            if data:
                type_ = data["type"]
                hx_logger_trade_debug.info(f"获取交易数据开始:{type_}")
                try:
                    if type_ == "delegate_list":
                        dataJSON = huaxin_trade_api.get_delegate_list(can_cancel=False)
                        print("获取委托列表", dataJSON)
                        if dataJSON["code"] == 0:
                            data = dataJSON["data"]
                            huaxin_trade_record_manager.DelegateRecordManager.add(data)
                            # 是否可以撤单
                            if data:
                                codes = []
                                for d in data:
                                    if huaxin_util.is_can_cancel(d["orderStatus"]):
                                        codes.append(d["securityID"])
                                        # 设置下单成功
                                        trade_huaxin.order_success(d['securityID'],
                                                                   d['accountID'],
                                                                   d['orderSysID'])
                                if codes:
                                    try:
                                        trade_manager.process_trade_delegate_data([{"code": c} for c in codes])
                                    except Exception as e:
                                        logging.exception(e)
                    elif type_ == "money":
                        dataJSON = huaxin_trade_api.get_money()
                        if dataJSON["code"] == 0:
                            data = dataJSON["data"]
                            huaxin_trade_record_manager.MoneyManager.save_data(data)
                            if data:
                                usefulMoney = data[0]["usefulMoney"]
                                # 设置可用资金
                                trade_manager.set_available_money(0, usefulMoney)
                            # 设置可用资金
                    elif type_ == "deal_list":
                        dataJSON = huaxin_trade_api.get_deal_list()
                        print("成交响应:", dataJSON)
                        if dataJSON["code"] == 0:
                            datas = dataJSON["data"]
                            huaxin_trade_record_manager.DealRecordManager.add(datas)
                            if datas:
                                tempList = [
                                    {"time": d["tradeTime"], "type": int(d['direction']), "code": d['securityID']}
                                    for d in datas]
                                try:
                                    trade_manager.process_trade_success_data(tempList)
                                except Exception as e:
                                    logging.exception(e)
                    # 持仓股
                    elif type_ == "position_list":
                        dataJSON = huaxin_trade_api.get_position_list()
                        if dataJSON["code"] == 0:
                            data = dataJSON["data"]
                            huaxin_trade_record_manager.PositionManager.add(data)
                    hx_logger_trade_debug.info(f"获取交易数据成功:{type_}")
                except Exception as e1:
                    if str(e1).find("超时") >= 0:
                        # 读取结果超时需要重新请求
                        trade_data_request_queue.put_nowait({"type": type_})
                    raise e1
        except Exception as e:
            hx_logger_trade_debug.exception(e)
        finally:
            # 有1s的间隔
            time.sleep(1)
def __set_target_codes():
@@ -453,8 +367,7 @@
def run():
    print("create TradeApiServer")
    # 拉取交易信息
    t1 = threading.Thread(target=lambda: __read_trade_data_queue(), daemon=True)
    t1.start()
    huaxin_trade_data_update.run()
    t1 = threading.Thread(target=lambda: __set_target_codes(), daemon=True)
    t1.start()
trade/huaxin/trade_server.py
@@ -21,7 +21,7 @@
from third_data.code_plate_key_manager import KPLCodeJXBlockManager
from trade import deal_big_money_manager, current_price_process_manager
from trade.huaxin import huaxin_trade_api as trade_api, trade_api_server, huaxin_trade_api
from trade.huaxin import huaxin_trade_api as trade_api, trade_api_server, huaxin_trade_api, huaxin_trade_data_update
from utils import socket_util
trade_data_request_queue = queue.Queue()
@@ -150,9 +150,9 @@
                            # 记录交易反馈日志
                            hx_logger_trade_callback.info(data_json)
                            # 重新请求委托列表与资金
                            trade_api_server.trade_data_request_queue.put_nowait({"type": "delegate_list"})
                            trade_api_server.trade_data_request_queue.put_nowait({"type": "money"})
                            trade_api_server.trade_data_request_queue.put_nowait({"type": "deal_list"})
                            huaxin_trade_data_update.add_delegate_list()
                            huaxin_trade_data_update.add_deal_list()
                            huaxin_trade_data_update.add_money_list()
                            # print("响应结果:", data_json['data'])
                        finally:
                            sk.sendall(socket_util.load_header(json.dumps({"code": 0}).encode(encoding='utf-8')))