1个文件已删除
14个文件已修改
12个文件已添加
| | |
| | | "passwd": "Yeshi2016@" |
| | | } |
| | | |
| | | LOG_DIR = "logs" |
| | | |
| | | # 获取根路径 |
| | | def get_path_prefix(): |
| | | return 'D:' if is_windows() else '/home' |
| | |
| | | |
| | | import dask |
| | | |
| | | import log |
| | | from code_attribute import gpcode_manager |
| | | from log import logger_request_debug |
| | | from log_module import log_analyse, log_export |
| | | from log_module import log_analyse, log_export, log, request_log_util |
| | | from output import limit_up_data_filter, output_util |
| | | from output.limit_up_data_filter import IgnoreCodeManager |
| | | from third_data import kpl_util, kpl_data_manager, kpl_api |
| | |
| | | def do_GET(self): |
| | | path = self.path |
| | | url = urlparse.urlparse(path) |
| | | log.request_info("DATA_SERVER_GET", f"GET 请求开始:{url.path}") |
| | | request_log_util.request_info("DATA_SERVER_GET", f"GET 请求开始:{url.path}") |
| | | try: |
| | | if url.path == "/kpl/get_limit_up_list": |
| | | response_data = self.__get_limit_up_list() |
| | |
| | | result = hosting_api_util.get_from_data_server(url.path, ps_dict) |
| | | self.__send_response(result) |
| | | finally: |
| | | log.request_info("DATA_SERVER_GET", f"GET 请求结束") |
| | | request_log_util.request_info("DATA_SERVER_GET", f"GET 请求结束") |
| | | |
| | | def do_POST(self): |
| | | path = self.path |
| | | url = urlparse.urlparse(path) |
| | | log.request_info("DATA_SERVER_POST", f"POST 请求开始:{url.path}") |
| | | request_log_util.request_info("DATA_SERVER_POST", f"POST 请求开始:{url.path}") |
| | | try: |
| | | if url.path == "/upload_kpl_data": |
| | | # 接受开盘啦数据 |
| | |
| | | result_str = self.__process_kpl_data(params) |
| | | self.__send_response(result_str) |
| | | finally: |
| | | log.request_info("DATA_SERVER_POST", f"POST 请求结束") |
| | | request_log_util.request_info("DATA_SERVER_POST", f"POST 请求结束") |
| | | |
| | | def __process_kpl_data(self, data): |
| | | data = json.loads(json.dumps(data).replace("概念", "")) |
| | |
| | | # 把连接参数定义成字典 |
| | | import constant |
| | | |
| | | config = constant.MYSQL_CONFIG |
| | | |
| | | class Mysqldb: |
| | | # 初始化方法 |
| | | def __init__(self): |
| | | def __init__(self, config=constant.MYSQL_CONFIG): |
| | | self.config = config |
| | | # 初始化方法中调用连接数据库的方法 |
| | | self.conn = self.get_conn() |
| | | # 调用获取游标的方法 |
| | |
| | | # 连接数据库的方法 |
| | | def get_conn(self): |
| | | # **config代表不定长参数 |
| | | conn = pymysql.connect(**config) |
| | | conn = pymysql.connect(**self.config) |
| | | return conn |
| | | |
| | | # 获取游标 |
| | |
| | | # 插入单条数据 |
| | | mysqldb.execute("insert into clients(account,pwd,rule) values(%s,%s,%s)", ("test", 123456, "\"123")) |
| | | # 插入多条数据 |
| | | mysqldb.execute_many("insert into clients(account,pwd,rule) values(%s,%s,%s)", [("test", 123456, "\"123"),("test", 123456, "\"123")]) |
| | | mysqldb.execute_many("insert into clients(account,pwd,rule) values(%s,%s,%s)", |
| | | [("test", 123456, "\"123"), ("test", 123456, "\"123")]) |
| | |
| | | import redis |
| | | |
| | | import constant |
| | | from log import logger_redis_debug |
| | | from log_module.log import logger_redis_debug |
| | | |
| | | config = constant.REDIS_CONFIG |
| | | |
New file |
| | |
| | | # -*- coding: utf-8 -*- |
| | | import json |
| | | import socket |
| | | |
| | | from utils import socket_util |
| | | |
| | | SERVER_IP = "127.0.0.1" |
| | | SERVER_PORT = 10008 |
| | | |
| | | |
| | | class SendResponseSkManager: |
| | | __send_response_sk_dict = {} |
| | | |
| | | @classmethod |
| | | def get_send_response_sk(cls, type): |
| | | if type not in cls.__send_response_sk_dict: |
| | | client = cls.create_send_response_sk() |
| | | cls.__send_response_sk_dict[type] = client |
| | | return cls.__send_response_sk_dict[type] |
| | | |
| | | @classmethod |
| | | def del_send_response_sk(cls, type_): |
| | | if type_ in cls.__send_response_sk_dict: |
| | | sk = cls.__send_response_sk_dict[type_] |
| | | cls.__send_response_sk_dict.pop(type_) |
| | | try: |
| | | sk.close() |
| | | except: |
| | | pass |
| | | |
| | | @classmethod |
| | | def create_send_response_sk(cls, addr=SERVER_IP, port=SERVER_PORT): |
| | | client = socket.socket() # 生成socket,连接server |
| | | client.connect((addr, port)) |
| | | return client |
| | | |
| | | @classmethod |
| | | def send_error_response(cls, type, request_id, client_id, msg): |
| | | cls.send_normal_response(type, cls.load_response(client_id, request_id, {"code": 1, "msg": msg})) |
| | | |
| | | @classmethod |
| | | def __send_normal_response(cls, sk, msg): |
| | | # 添加内容长度头 |
| | | msg = cls.format_response(msg) |
| | | sk.send(msg) |
| | | result, header_str = socket_util.recv_data(sk) |
| | | # printlog("响应", result) |
| | | if result: |
| | | result_json = json.loads(result) |
| | | if result_json.get("code") == 0: |
| | | return True |
| | | return False |
| | | |
| | | # 发送消息 |
| | | @classmethod |
| | | def send_normal_response(cls, type, msg): |
| | | try: |
| | | sk = SendResponseSkManager.get_send_response_sk(type) |
| | | if cls.__send_normal_response(sk, msg): |
| | | return True |
| | | else: |
| | | # 再次发送 |
| | | sk = SendResponseSkManager.get_send_response_sk(type) |
| | | return cls.__send_normal_response(sk, msg) |
| | | except ConnectionResetError as e: |
| | | SendResponseSkManager.del_send_response_sk(type) |
| | | sk = SendResponseSkManager.get_send_response_sk(type) |
| | | return cls.__send_normal_response(sk, msg) |
| | | except BrokenPipeError as e: |
| | | SendResponseSkManager.del_send_response_sk(type) |
| | | sk = SendResponseSkManager.get_send_response_sk(type) |
| | | return cls.__send_normal_response(sk, msg) |
| | | |
| | | @classmethod |
| | | def load_response(cls, client_id, request_id, data_json): |
| | | return json.dumps({"type": "response", "data": data_json, "client_id": client_id, |
| | | "request_id": request_id}).encode( |
| | | 'utf-8') |
| | | |
| | | @classmethod |
| | | def format_response(cls, data_bytes): |
| | | slen = '##%08d' % len(data_bytes) |
| | | return slen.encode("utf-8") + data_bytes |
New file |
| | |
| | | # -*- coding: utf-8 -*- |
| | | """ |
| | | 命令管理器 |
| | | """ |
| | | import concurrent.futures |
| | | import logging |
| | | import multiprocessing |
| | | import threading |
| | | |
| | | from log_module import async_log_util |
| | | from log_module.log import logger_local_huaxin_trade_debug, logger_trade, logger_local_huaxin_contact_debug, logger_info |
| | | |
| | | MSG_TYPE_HEART = "heart" |
| | | # 命令信息 |
| | | MSG_TYPE_CMD = "cmd" |
| | | |
| | | CLIENT_TYPE_TRADE = "trade" |
| | | CLIENT_TYPE_DELEGATE_LIST = "delegate_list" |
| | | CLIENT_TYPE_DEAL_LIST = "deal_list" |
| | | CLIENT_TYPE_POSITION_LIST = "position_list" |
| | | CLIENT_TYPE_MONEY = "money" |
| | | CLIENT_TYPE_DEAL = "deal" |
| | | |
| | | CLIENT_TYPE_CMD_L2 = "l2_cmd" |
| | | |
| | | # 心跳时间间隔 |
| | | HEART_SPACE_TIME = 3 |
| | | |
| | | |
| | | class TradeActionCallback(object): |
| | | # 交易 |
| | | def OnTrade(self, client_id, request_id, sk, type_, data): |
| | | pass |
| | | |
| | | # 委托列表 |
| | | def OnDelegateList(self, client_id, request_id, sk, can_cancel): |
| | | pass |
| | | |
| | | # 成交列表 |
| | | def OnDealList(self, client_id, request_id, sk): |
| | | pass |
| | | |
| | | # 成交列表 |
| | | def OnPositionList(self, client_id, request_id, sk): |
| | | pass |
| | | |
| | | # 获取资金信息 |
| | | def OnMoney(self, client_id, request_id, sk): |
| | | pass |
| | | |
| | | # 测试 |
| | | def OnTest(self, client_id, request_id, data, sk): |
| | | pass |
| | | |
| | | |
| | | class L2ActionCallback(object): |
| | | # 监听L2数据 |
| | | def OnSetL2Position(self, codes_data): |
| | | pass |
| | | |
| | | |
| | | # 交易指令管理 |
| | | class TradeCommandManager: |
| | | trade_client_dict = {} |
| | | _instance = None |
| | | process_command_thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=30) |
| | | |
| | | def __new__(cls, *args, **kwargs): |
| | | if not cls._instance: |
| | | cls._instance = super().__new__(cls, *args, **kwargs) |
| | | return cls._instance |
| | | |
| | | @classmethod |
| | | def init(cls, trade_action_callback: TradeActionCallback, |
| | | queue_strategy_trade_read_for_trade: multiprocessing.Queue, |
| | | queue_strategy_w_trade_r_for_query: multiprocessing.Queue): |
| | | cls.action_callback = trade_action_callback |
| | | cls.queue_strategy_w_trade_r = queue_strategy_trade_read_for_trade |
| | | cls.queue_strategy_w_trade_r_for_query = queue_strategy_w_trade_r_for_query |
| | | |
| | | @classmethod |
| | | def process_command(cls, _type, client_id, result_json, sk=None): |
| | | async_log_util.info(logger_local_huaxin_contact_debug, f"process_command: {result_json}") |
| | | # 查看是否是设置L2的代码 |
| | | try: |
| | | data = result_json["data"] |
| | | request_id = result_json.get('request_id') |
| | | |
| | | if _type == CLIENT_TYPE_TRADE: |
| | | # 交易 |
| | | ctype = data["trade_type"] |
| | | async_log_util.info(logger_trade, f"交易开始:{request_id}") |
| | | cls.action_callback.OnTrade(client_id, request_id, sk, ctype, data) |
| | | async_log_util.info(logger_trade, f"交易结束:{request_id}") |
| | | elif _type == CLIENT_TYPE_MONEY: |
| | | cls.action_callback.OnMoney(client_id, request_id, sk) |
| | | elif _type == CLIENT_TYPE_DEAL_LIST: |
| | | cls.action_callback.OnDealList(client_id, request_id, sk) |
| | | elif _type == CLIENT_TYPE_DELEGATE_LIST: |
| | | can_cancel = data["can_cancel"] |
| | | cls.action_callback.OnDelegateList(client_id, request_id, sk, can_cancel) |
| | | elif _type == CLIENT_TYPE_POSITION_LIST: |
| | | cls.action_callback.OnPositionList(client_id, request_id, sk) |
| | | elif _type == "test": |
| | | cls.action_callback.OnTest(client_id, request_id, data, sk) |
| | | except Exception as e: |
| | | async_log_util.error(logger_local_huaxin_contact_debug, f"process_command出错: {result_json}") |
| | | # logging.exception(e) |
| | | # logging.error(result_json) |
| | | |
| | | @classmethod |
| | | def run_process_command(cls, queue_strategy_trade: multiprocessing.Queue): |
| | | if queue_strategy_trade is None: |
| | | return |
| | | # 本地命令接收 |
| | | try: |
| | | while True: |
| | | try: |
| | | val = queue_strategy_trade.get() |
| | | if val: |
| | | _type = val["type"] |
| | | if _type != "test": |
| | | async_log_util.info(logger_local_huaxin_contact_debug, f"接受到信息: {val}") |
| | | # TODO 测试 |
| | | logger_info.info(f"接受到信息: {val}") |
| | | cls.process_command(_type, None, val) |
| | | except Exception as e: |
| | | async_log_util.exception(logger_local_huaxin_trade_debug, e) |
| | | logging.exception(e) |
| | | except Exception as e: |
| | | async_log_util.exception(logger_local_huaxin_trade_debug, e) |
| | | |
| | | # 维护连接数的稳定 |
| | | def run(self, blocking=True): |
| | | if blocking: |
| | | threading.Thread(target=lambda: self.run_process_command(self.queue_strategy_w_trade_r_for_query), |
| | | daemon=True).start() |
| | | self.run_process_command(self.queue_strategy_w_trade_r) |
| | | else: |
| | | # 接受命令 |
| | | threading.Thread(target=lambda: self.run_process_command(self.queue_strategy_w_trade_r), daemon=True).start() |
| | | threading.Thread(target=lambda: self.run_process_command(self.queue_strategy_w_trade_r_for_query), |
| | | daemon=True).start() |
| | | |
| | | # L2指令管理 |
| | | class L2CommandManager: |
| | | action_callback = None |
| | | |
| | | @classmethod |
| | | def init(cls, l2_action_callback): |
| | | cls.action_callback = l2_action_callback |
| | | |
| | | @classmethod |
| | | def process_command(cls, _type, client_id, result_json): |
| | | data = result_json["data"] |
| | | ctype = result_json["type"] |
| | | if ctype == CLIENT_TYPE_CMD_L2: |
| | | cls.action_callback.OnSetL2Position(data) |
| | | |
| | | |
| | | if __name__ == "__main__": |
| | | manager = TradeCommandManager("127.0.0.1", 10008, None) |
| | | manager.run() |
| | | input() |
New file |
| | |
| | | # -*- coding: utf-8 -*- |
| | | import concurrent.futures |
| | | import threading |
| | | import time |
| | | |
| | | from loguru import logger |
| | | |
| | | import traderapi |
| | | from huaxin_client import trade_manager |
| | | from huaxin_client.command_manager import TradeCommandManager |
| | | from huaxin_client.trade_manager import TradeSimpleApi |
| | | from log_module import async_log_util |
| | | from log_module.log import logger_system, printlog, logger_local_huaxin_trade_debug, logger_info |
| | | |
| | | TEST_TRADE = True |
| | | |
| | | ########B类######## |
| | | UserID = '388000013349' |
| | | # 登陆密码 |
| | | Password = '110808' |
| | | # 投资者账户 |
| | | InvestorID = '388000013349' |
| | | # 经济公司部门代码 |
| | | DepartmentID = '0003' |
| | | # 资金账户 |
| | | AccountID = '388000013349' |
| | | # 沪市股东账号 |
| | | SSE_ShareHolderID = 'A641420991' |
| | | # 深市股东账号 |
| | | SZSE_ShareHolderID = '0345104949' |
| | | |
| | | LOCAL_IP = "192.168.84.75" |
| | | FRONT_ADDRESS = "tcp://192.168.84.31:6500" |
| | | FRONT_ADDRESS1 = "tcp://192.168.84.32:26500" |
| | | |
| | | if TEST_TRADE: |
| | | # # 仿真 |
| | | # from mylog import logger_trade_debug |
| | | # |
| | | UserID = '00032047' |
| | | # 登陆密码 |
| | | Password = '59009218' |
| | | # 投资者账户 |
| | | InvestorID = '00032047' |
| | | # 经济公司部门代码 |
| | | DepartmentID = '0003' |
| | | # 资金账户 |
| | | AccountID = '00032047' |
| | | # 沪市股东账号 |
| | | SSE_ShareHolderID = 'A00032047' |
| | | # 深市股东账号 |
| | | SZSE_ShareHolderID = '700032047' |
| | | |
| | | |
| | | class TraderSpi(traderapi.CTORATstpTraderSpi): |
| | | def __init__(self, api, callback): |
| | | traderapi.CTORATstpTraderSpi.__init__(self) |
| | | self.__api = api |
| | | self.__front_id = 0 |
| | | self.__session_id = 0 |
| | | self.__data_callback = callback |
| | | self.__temp_order_list_dict = {} |
| | | self.__temp_position_list_dict = {} |
| | | self.__temp_money_account_list_dict = {} |
| | | self.call_back_thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=10) |
| | | |
| | | def OnFrontConnected(self) -> "void": |
| | | |
| | | # 获取终端信息 |
| | | TradeSimpleApi.req_id += 1 |
| | | |
| | | ret = self.__api.ReqGetConnectionInfo(TradeSimpleApi.req_id) |
| | | if ret != 0: |
| | | logger_system.info('ReqGetConnectionInfo fail, ret[%d]' % ret) |
| | | |
| | | def OnFrontDisconnected(self, nReason: "int") -> "void": |
| | | printlog('OnFrontDisconnected: [%d]' % nReason) |
| | | |
| | | def OnRspGetConnectionInfo(self, pConnectionInfoField: "CTORATstpConnectionInfoField", |
| | | pRspInfoField: "CTORATstpRspInfoField", nRequestID: "int") -> "void": |
| | | if pRspInfoField.ErrorID == 0: |
| | | logger.info('inner_ip_address[%s]' % pConnectionInfoField.InnerIPAddress) |
| | | logger.info('inner_port[%d]' % pConnectionInfoField.InnerPort) |
| | | logger.info('outer_ip_address[%s]' % pConnectionInfoField.OuterIPAddress) |
| | | logger.info('outer_port[%d]' % pConnectionInfoField.OuterPort) |
| | | logger.info('mac_address[%s]' % pConnectionInfoField.MacAddress) |
| | | |
| | | # 请求登录 |
| | | login_req = traderapi.CTORATstpReqUserLoginField() |
| | | |
| | | # 支持以用户代码、资金账号和股东账号方式登录 |
| | | # (1)以用户代码方式登录 |
| | | login_req.LogInAccount = UserID |
| | | login_req.LogInAccountType = traderapi.TORA_TSTP_LACT_UserID |
| | | # (2)以资金账号方式登录 |
| | | # login_req.DepartmentID = DepartmentID |
| | | # login_req.LogInAccount = AccountID |
| | | # login_req.LogInAccountType = traderapi.TORA_TSTP_LACT_AccountID |
| | | # (3)以上海股东账号方式登录 |
| | | # login_req.LogInAccount = SSE_ShareHolderID |
| | | # login_req.LogInAccountType = traderapi.TORA_TSTP_LACT_SHAStock |
| | | # (4)以深圳股东账号方式登录 |
| | | # login_req.LogInAccount = SZSE_ShareHolderID |
| | | # login_req.LogInAccountType = traderapi.TORA_TSTP_LACT_SZAStock |
| | | |
| | | # 支持以密码和指纹(移动设备)方式认证 |
| | | # (1)密码认证 |
| | | # 密码认证时AuthMode可不填 |
| | | # login_req.AuthMode = traderapi.TORA_TSTP_AM_Password |
| | | login_req.Password = Password |
| | | # (2)指纹认证 |
| | | # 非密码认证时AuthMode必填 |
| | | # login_req.AuthMode = traderapi.TORA_TSTP_AM_FingerPrint |
| | | # login_req.DeviceID = '03873902' |
| | | # login_req.CertSerial = '9FAC09383D3920CAEFF039' |
| | | |
| | | # 终端信息采集 |
| | | # UserProductInfo填写终端名称 |
| | | login_req.UserProductInfo = 'jiabei' |
| | | # 按照监管要求填写终端信息 |
| | | login_req.TerminalInfo = f'PC;IIP=NA;IPORT=NA;LIP={LOCAL_IP};MAC=5C6F69CC2B40;HD=004bc76004aff0882b9052ba0eb00506;@jiabei' |
| | | # 以下内外网IP地址若不填则柜台系统自动采集,若填写则以终端填值为准报送 |
| | | # login_req.MacAddress = '5C-87-9C-96-F3-E3' |
| | | # login_req.InnerIPAddress = '10.0.1.102' |
| | | # login_req.OuterIPAddress = '58.246.43.50' |
| | | |
| | | TradeSimpleApi.req_id += 1 |
| | | ret = self.__api.ReqUserLogin(login_req, TradeSimpleApi.req_id) |
| | | if ret != 0: |
| | | printlog('ReqUserLogin fail, ret[%d]' % ret) |
| | | else: |
| | | printlog('GetConnectionInfo fail, [%d] [%d] [%s]!!!' % ( |
| | | nRequestID, pRspInfoField.ErrorID, pRspInfoField.ErrorMsg)) |
| | | |
| | | def OnRspUserLogin(self, pRspUserLoginField: "CTORATstpRspUserLoginField", pRspInfoField: "CTORATstpRspInfoField", |
| | | nRequestID: "int") -> "void": |
| | | if pRspInfoField.ErrorID == 0: |
| | | logger_system.info('Login success! [%d]' % nRequestID) |
| | | self.__front_id = pRspUserLoginField.FrontID |
| | | self.__session_id = pRspUserLoginField.SessionID |
| | | # TradeSimpleApi.set_login_info(self.__session_id, self.__front_id) |
| | | |
| | | # if 1: |
| | | # # 查询股东账号 |
| | | # req_field = traderapi.CTORATstpQryShareholderAccountField() |
| | | # |
| | | # # 以下字段不填表示不设过滤条件,即查询所有股东账号 |
| | | # # req_field.ExchangeID = traderapi.TORA_TSTP_EXD_SSE |
| | | # |
| | | # TradeSimpleApi.req_id += 1 |
| | | # ret = api.ReqQryShareholderAccount(req_field, TradeSimpleApi.req_id) |
| | | # if ret != 0: |
| | | # logger_info.info('ReqQryShareholderAccount fail, ret[%d]' % ret) |
| | | |
| | | else: |
| | | logger_system.info('Login fail!!! [%d] [%d] [%s]' |
| | | % (nRequestID, pRspInfoField.ErrorID, pRspInfoField.ErrorMsg)) |
| | | |
| | | def OnRspUserPasswordUpdate(self, pUserPasswordUpdateField: "CTORATstpUserPasswordUpdateField", |
| | | pRspInfoField: "CTORATstpRspInfoField", nRequestID: "int") -> "void": |
| | | if pRspInfoField.ErrorID == 0: |
| | | printlog('OnRspUserPasswordUpdate: OK! [%d]' % nRequestID) |
| | | else: |
| | | printlog('OnRspUserPasswordUpdate: Error! [%d] [%d] [%s]' |
| | | % (nRequestID, pRspInfoField.ErrorID, pRspInfoField.ErrorMsg)) |
| | | |
| | | def OnRspOrderInsert(self, pInputOrderField: "CTORATstpInputOrderField", pRspInfoField: "CTORATstpRspInfoField", |
| | | nRequestID: "int") -> "void": |
| | | try: |
| | | if pRspInfoField.ErrorID == 0: |
| | | async_log_util.info(logger_local_huaxin_trade_debug, |
| | | '[%d] OnRspOrderInsert: OK! [%d]' % (round(time.time() * 1000), nRequestID)) |
| | | else: |
| | | async_log_util.error(logger_local_huaxin_trade_debug, |
| | | f"OnRspOrderInsert 报单出错:{pRspInfoField.ErrorID}-{pRspInfoField.ErrorMsg}") |
| | | self.call_back_thread_pool.submit(self.__data_callback, trade_manager.TYPE_ORDER, nRequestID, |
| | | {"sinfo": pInputOrderField.SInfo, |
| | | "orderStatus": -1, |
| | | "orderStatusMsg": pRspInfoField.ErrorMsg}) |
| | | except: |
| | | pass |
| | | |
| | | # 撤单响应 |
| | | def OnRspOrderAction(self, pInputOrderActionField: "CTORATstpInputOrderActionField", |
| | | pRspInfoField: "CTORATstpRspInfoField", nRequestID: "int") -> "void": |
| | | try: |
| | | if pRspInfoField.ErrorID == 0: |
| | | async_log_util.info(logger_local_huaxin_trade_debug, 'OnRspOrderAction: OK! [%d]' % nRequestID) |
| | | self.call_back_thread_pool.submit(self.__data_callback, trade_manager.TYPE_CANCEL_ORDER, nRequestID, |
| | | {"sinfo": pInputOrderActionField.SInfo, |
| | | "orderSysID": pInputOrderActionField.OrderSysID, |
| | | "cancel": 1}) |
| | | |
| | | else: |
| | | async_log_util.info(logger_local_huaxin_trade_debug, 'OnRspOrderAction: Error! [%d] [%d] [%s]' |
| | | % (nRequestID, pRspInfoField.ErrorID, pRspInfoField.ErrorMsg)) |
| | | self.call_back_thread_pool.submit(self.__data_callback, trade_manager.TYPE_CANCEL_ORDER, nRequestID, |
| | | {"sinfo": pInputOrderActionField.SInfo, |
| | | "orderSysID": pInputOrderActionField.OrderSysID, |
| | | "cancel": 0, "errorID": pRspInfoField.ErrorID, |
| | | "errorMsg": pRspInfoField.ErrorMsg}) |
| | | except: |
| | | pass |
| | | |
| | | # 撤单错误回报 |
| | | def OnErrRtnOrderAction(self, pInputOrderActionField: "CTORATstpInputOrderActionField", |
| | | pRspInfoField: "CTORATstpRspInfoField", nRequestID: "int") -> "void": |
| | | try: |
| | | if pInputOrderActionField and pRspInfoField: |
| | | async_log_util.info(logger_local_huaxin_trade_debug, 'OnErrRtnOrderAction: Error! [%d] [%d] [%d] [%s]' |
| | | % (nRequestID, pInputOrderActionField.OrderSysID, |
| | | pRspInfoField.ErrorID, |
| | | pRspInfoField.ErrorMsg)) |
| | | except: |
| | | async_log_util.info(logger_local_huaxin_trade_debug, "OnErrRtnOrderAction: 撤单出错") |
| | | |
| | | def OnRspInquiryJZFund(self, pRspInquiryJZFundField: "CTORATstpRspInquiryJZFundField", |
| | | pRspInfoField: "CTORATstpRspInfoField", nRequestID: "int") -> "void": |
| | | # try: |
| | | # if pRspInfoField.ErrorID == 0: |
| | | # logger.info('OnRspInquiryJZFund: OK! [%d] [%.2f] [%.2f]' |
| | | # % (nRequestID, pRspInquiryJZFundField.UsefulMoney, pRspInquiryJZFundField.FetchLimit)) |
| | | # else: |
| | | # logger.info('OnRspInquiryJZFund: Error! [%d] [%d] [%s]' |
| | | # % (nRequestID, pRspInfoField.ErrorID, pRspInfoField.ErrorMsg)) |
| | | # except: |
| | | # pass |
| | | pass |
| | | |
| | | def OnRspTransferFund(self, pInputTransferFundField: "CTORATstpInputTransferFundField", |
| | | pRspInfoField: "CTORATstpRspInfoField", nRequestID: "int") -> "void": |
| | | # try: |
| | | # if pRspInfoField.ErrorID == 0: |
| | | # logger.info('OnRspTransferFund: OK! [%d]' % nRequestID) |
| | | # else: |
| | | # logger.info('OnRspTransferFund: Error! [%d] [%d] [%s]' |
| | | # % (nRequestID, pRspInfoField.ErrorID, pRspInfoField.ErrorMsg)) |
| | | # except: |
| | | # pass |
| | | pass |
| | | |
| | | def OnRtnOrder(self, pOrderField: "CTORATstpOrderField") -> "void": |
| | | try: |
| | | async_log_util.info(logger_local_huaxin_trade_debug, |
| | | '[%d] OnRtnOrder: SInfo[%s] InvestorID[%s] SecurityID[%s] OrderRef[%d] OrderLocalID[%s] LimitPrice[%.2f] VolumeTotalOriginal[%d] OrderSysID[%s] OrderStatus[%s] InsertTime[%s]' |
| | | % (round(time.time() * 1000), pOrderField.SInfo, pOrderField.InvestorID, |
| | | pOrderField.SecurityID, |
| | | pOrderField.OrderRef, pOrderField.OrderLocalID, |
| | | pOrderField.LimitPrice, pOrderField.VolumeTotalOriginal, pOrderField.OrderSysID, |
| | | pOrderField.OrderStatus, pOrderField.InsertTime)) |
| | | if pOrderField.OrderStatus == traderapi.TORA_TSTP_OST_Unknown: |
| | | pass |
| | | # if queue_trade_w_l2_r is not None: |
| | | # queue_trade_w_l2_r.put_nowait( |
| | | # json.dumps({"type": "listen_volume", "data": {"code": pOrderField.SecurityID, |
| | | # "volume": pOrderField.VolumeTotalOriginal}}).encode( |
| | | # 'utf-8')) |
| | | else: |
| | | order_data = {"sinfo": pOrderField.SInfo, "securityID": pOrderField.SecurityID, |
| | | "orderLocalID": pOrderField.OrderLocalID, |
| | | "direction": pOrderField.Direction, "orderSysID": pOrderField.OrderSysID, |
| | | "insertTime": pOrderField.InsertTime, "insertDate": pOrderField.InsertDate, |
| | | "acceptTime": pOrderField.AcceptTime, "cancelTime": pOrderField.CancelTime, |
| | | "limitPrice": pOrderField.LimitPrice, "accountID": pOrderField.AccountID, |
| | | "orderRef": pOrderField.OrderRef, "turnover": pOrderField.Turnover, |
| | | "volume": pOrderField.VolumeTotalOriginal, "volumeTraded": pOrderField.VolumeTraded, |
| | | "orderStatus": pOrderField.OrderStatus, |
| | | "orderSubmitStatus": pOrderField.OrderSubmitStatus, |
| | | "statusMsg": pOrderField.StatusMsg} |
| | | self.call_back_thread_pool.submit(self.__data_callback, trade_manager.TYPE_ORDER, 0, order_data) |
| | | except Exception as e: |
| | | async_log_util.error(logger_local_huaxin_trade_debug, "OnRtnOrder 出错") |
| | | except: |
| | | async_log_util.error(logger_local_huaxin_trade_debug, "OnRtnOrder 出错") |
| | | |
| | | def OnRtnTrade(self, pTradeField: "CTORATstpTradeField") -> "void": |
| | | try: |
| | | async_log_util.info(logger_local_huaxin_trade_debug, |
| | | 'OnRtnTrade: TradeID[%s] InvestorID[%s] SecurityID[%s] OrderRef[%d] OrderLocalID[%s] Price[%.2f] Volume[%d]' |
| | | % (pTradeField.TradeID, pTradeField.InvestorID, pTradeField.SecurityID, |
| | | pTradeField.OrderRef, pTradeField.OrderLocalID, pTradeField.Price, |
| | | pTradeField.Volume)) |
| | | except: |
| | | pass |
| | | |
| | | def OnRtnMarketStatus(self, pMarketStatusField: "CTORATstpMarketStatusField") -> "void": |
| | | # TORA_TSTP_MKD_SHA(1): 上海A股 |
| | | # TORA_TSTP_MKD_SZA(2): 深圳A股 |
| | | # TORA_TSTP_MKD_BJMain(a):北京主板 |
| | | |
| | | # TORA_TSTP_MST_UnKnown( # ):未知 |
| | | # TORA_TSTP_MST_BeforeTrading(0): 开盘前 |
| | | # TORA_TSTP_MST_Continous(1): 连续交易 |
| | | # TORA_TSTP_MST_Closed(2): 收盘 |
| | | # TORA_TSTP_MST_OpenCallAuction(3): 开盘集合竞价 |
| | | try: |
| | | logger.info('OnRtnMarketStatus: MarketID[%s] MarketStatus[%s]' |
| | | % (pMarketStatusField.MarketID, pMarketStatusField.MarketStatus)) |
| | | except: |
| | | pass |
| | | |
| | | def OnRspQrySecurity(self, pSecurityField: "CTORATstpSecurityField", pRspInfoField: "CTORATstpRspInfoField", |
| | | nRequestID: "int", bIsLast: "bool") -> "void": |
| | | if bIsLast != 1: |
| | | logger.info( |
| | | 'OnRspQrySecurity[%d]: SecurityID[%s] SecurityName[%s] MarketID[%s] OrderUnit[%s] OpenDate[%s] UpperLimitPrice[%.2f] LowerLimitPrice[%.2f]' |
| | | % (nRequestID, pSecurityField.SecurityID, pSecurityField.SecurityName, pSecurityField.MarketID, |
| | | pSecurityField.OrderUnit, pSecurityField.OpenDate, pSecurityField.UpperLimitPrice, |
| | | pSecurityField.LowerLimitPrice)) |
| | | else: |
| | | logger.info('查询合约结束[%d] ErrorID[%d] ErrorMsg[%s]' |
| | | % (nRequestID, pRspInfoField.ErrorID, pRspInfoField.ErrorMsg)) |
| | | |
| | | def OnRspQryInvestor(self, pInvestorField: "CTORATstpInvestorField", pRspInfoField: "CTORATstpRspInfoField", |
| | | nRequestID: "int", bIsLast: "bool") -> "void": |
| | | if bIsLast != 1: |
| | | logger.info('OnRspQryInvestor[%d]: InvestorID[%s] InvestorName[%s] Operways[%s]' |
| | | % (nRequestID, pInvestorField.InvestorID, pInvestorField.InvestorName, |
| | | pInvestorField.Operways)) |
| | | else: |
| | | logger.info('查询投资者结束[%d] ErrorID[%d] ErrorMsg[%s]' |
| | | % (nRequestID, pRspInfoField.ErrorID, pRspInfoField.ErrorMsg)) |
| | | |
| | | def OnRspQryShareholderAccount(self, pShareholderAccountField: "CTORATstpShareholderAccountField", |
| | | pRspInfoField: "CTORATstpRspInfoField", nRequestID: "int", |
| | | bIsLast: "bool") -> "void": |
| | | if bIsLast != 1: |
| | | logger_local_huaxin_trade_debug.info( |
| | | 'OnRspQryShareholderAccount[%d]: InvestorID[%s] ExchangeID[%s] ShareholderID[%s]' |
| | | % (nRequestID, pShareholderAccountField.InvestorID, pShareholderAccountField.ExchangeID, |
| | | pShareholderAccountField.ShareholderID)) |
| | | else: |
| | | logger.info('查询股东账户结束[%d] ErrorID[%d] ErrorMsg[%s]' |
| | | % (nRequestID, pRspInfoField.ErrorID, pRspInfoField.ErrorMsg)) |
| | | |
| | | def OnRspQryTradingAccount(self, pTradingAccountField: "CTORATstpTradingAccountField", |
| | | pRspInfoField: "CTORATstpRspInfoField", nRequestID: "int", bIsLast: "bool") -> "void": |
| | | try: |
| | | if nRequestID not in self.__temp_money_account_list_dict: |
| | | self.__temp_money_account_list_dict[nRequestID] = [] |
| | | if bIsLast != 1: |
| | | self.__temp_money_account_list_dict[nRequestID].append( |
| | | {"departmentID": pTradingAccountField.DepartmentID, "investorID": pTradingAccountField.InvestorID, |
| | | "accountID": pTradingAccountField.AccountID, "currencyID": pTradingAccountField.CurrencyID, |
| | | "usefulMoney": round(pTradingAccountField.UsefulMoney, 2), |
| | | "frozenCash": round(pTradingAccountField.FrozenCash, 2), |
| | | "fetchLimit": round(pTradingAccountField.FetchLimit, 2), |
| | | "preDeposit": round(pTradingAccountField.PreDeposit, 2)}) |
| | | # logger.info( |
| | | # 'OnRspQryTradingAccount[%d]: DepartmentID[%s] InvestorID[%s] AccountID[%s] CurrencyID[%s] UsefulMoney[%.2f] FetchLimit[%.2f]' |
| | | # % (nRequestID, pTradingAccountField.DepartmentID, pTradingAccountField.InvestorID, |
| | | # pTradingAccountField.AccountID, pTradingAccountField.CurrencyID, |
| | | # pTradingAccountField.UsefulMoney, pTradingAccountField.FetchLimit)) |
| | | else: |
| | | results = self.__temp_money_account_list_dict.pop(nRequestID) |
| | | self.call_back_thread_pool.submit(self.__data_callback, trade_manager.TYPE_LIST_MONEY, nRequestID, |
| | | results) |
| | | # logger.info('查询资金账号结束[%d] ErrorID[%d] ErrorMsg[%s]' |
| | | # % (nRequestID, pRspInfoField.ErrorID, pRspInfoField.ErrorMsg)) |
| | | except: |
| | | pass |
| | | |
| | | def OnRspQryOrder(self, pOrderField: "CTORATstpOrderField", pRspInfoField: "CTORATstpRspInfoField", |
| | | nRequestID: "int", bIsLast: "bool") -> "void": |
| | | try: |
| | | if nRequestID not in self.__temp_order_list_dict: |
| | | self.__temp_order_list_dict[nRequestID] = [] |
| | | if not bIsLast: |
| | | # logger.info( |
| | | # 'OnRspQryOrder[%d]: SecurityID[%s] OrderLocalID[%s] Direction[%s] OrderRef[%d] OrderSysID[%s] VolumeTraded[%d] OrderStatus[%s] OrderSubmitStatus[%s], StatusMsg[%s]' |
| | | # % (nRequestID, pOrderField.SecurityID, pOrderField.OrderLocalID, pOrderField.Direction, |
| | | # pOrderField.OrderRef, pOrderField.OrderSysID, |
| | | # pOrderField.VolumeTraded, pOrderField.OrderStatus, pOrderField.OrderSubmitStatus, |
| | | # pOrderField.StatusMsg)) |
| | | self.__temp_order_list_dict[nRequestID].append( |
| | | {"securityID": pOrderField.SecurityID, "orderLocalID": pOrderField.OrderLocalID, |
| | | "direction": pOrderField.Direction, "orderSysID": pOrderField.OrderSysID, |
| | | "insertTime": pOrderField.InsertTime, "insertDate": pOrderField.InsertDate, |
| | | "acceptTime": pOrderField.AcceptTime, "cancelTime": pOrderField.CancelTime, |
| | | "limitPrice": pOrderField.LimitPrice, "accountID": pOrderField.AccountID, |
| | | "turnover": pOrderField.Turnover, "orderRef": pOrderField.OrderRef, |
| | | "volume": pOrderField.VolumeTotalOriginal, |
| | | "volumeTraded": pOrderField.VolumeTraded, "orderStatus": pOrderField.OrderStatus, |
| | | "orderSubmitStatus": pOrderField.OrderSubmitStatus, "statusMsg": pOrderField.StatusMsg}) |
| | | else: |
| | | # logger.info('查询报单结束[%d] ErrorID[%d] ErrorMsg[%s]' |
| | | # % (nRequestID, pRspInfoField.ErrorID, pRspInfoField.ErrorMsg)) |
| | | self.call_back_thread_pool.submit(self.__data_callback, trade_manager.TYPE_LIST_DELEGATE, nRequestID, |
| | | self.__temp_order_list_dict[nRequestID]) |
| | | |
| | | self.__temp_order_list_dict.pop(nRequestID) |
| | | except: |
| | | pass |
| | | |
| | | def OnRspQryPosition(self, pPositionField: "CTORATstpPositionField", pRspInfoField: "CTORATstpRspInfoField", |
| | | nRequestID: "int", bIsLast: "bool") -> "void": |
| | | try: |
| | | if nRequestID not in self.__temp_position_list_dict: |
| | | self.__temp_position_list_dict[nRequestID] = [] |
| | | if bIsLast != 1: |
| | | self.__temp_position_list_dict[nRequestID].append( |
| | | {"investorID": pPositionField.InvestorID, "tradingDay": pPositionField.TradingDay, |
| | | "securityName": pPositionField.SecurityName, |
| | | "securityID": pPositionField.SecurityID, "historyPos": pPositionField.HistoryPos, |
| | | "historyPosFrozen": pPositionField.HistoryPosFrozen, |
| | | "todayBSPos": pPositionField.TodayBSPos, "todayBSPosFrozen": pPositionField.TodayBSPosFrozen, |
| | | "historyPosPrice": pPositionField.HistoryPosPrice, "totalPosCost": pPositionField.TotalPosCost, |
| | | "prePosition": pPositionField.PrePosition, "availablePosition": pPositionField.AvailablePosition, |
| | | "currentPosition": pPositionField.CurrentPosition, "openPosCost": pPositionField.OpenPosCost, |
| | | "todayCommission": pPositionField.TodayCommission, |
| | | "todayTotalBuyAmount": pPositionField.TodayTotalBuyAmount, |
| | | "todayTotalSellAmount": pPositionField.TodayTotalSellAmount}) |
| | | else: |
| | | # logger.info('查询持仓结束[%d] ErrorID[%d] ErrorMsg[%s]' |
| | | # % (nRequestID, pRspInfoField.ErrorID, pRspInfoField.ErrorMsg)) |
| | | self.call_back_thread_pool.submit(self.__data_callback, trade_manager.TYPE_LIST_POSITION, nRequestID, |
| | | self.__temp_position_list_dict[nRequestID]) |
| | | |
| | | self.__temp_position_list_dict.pop(nRequestID) |
| | | except: |
| | | pass |
| | | |
| | | # 成交回报,参数pTradeField是一个CTORATstpTradeField类对象 |
| | | def OnRtnTrade(self, pTradeField: "CTORATstpTradeField") -> "void": |
| | | pass |
| | | # logger.info("OnRtnTrade") |
| | | |
| | | # 查询成交响应,参数pTradeField是一个CTORATstpTradeField类对象 |
| | | def OnRspQryTrade(self, pTradeField: "CTORATstpTradeField", pRspInfoField: "CTORATstpRspInfoField", |
| | | nRequestID: "int", bIsLast: "bool") -> "void": |
| | | try: |
| | | # logger.info("查询成交响应") |
| | | pass |
| | | if nRequestID not in self.__temp_order_list_dict: |
| | | self.__temp_order_list_dict[nRequestID] = [] |
| | | if not bIsLast: |
| | | self.__temp_order_list_dict[nRequestID].append( |
| | | {"tradeID": pTradeField.TradeID, "securityID": pTradeField.SecurityID, |
| | | "orderLocalID": pTradeField.OrderLocalID, |
| | | "direction": pTradeField.Direction, "orderSysID": pTradeField.OrderSysID, |
| | | "price": pTradeField.Price, |
| | | "tradeTime": pTradeField.TradeTime, |
| | | "volume": pTradeField.Volume, "tradeDate": pTradeField.TradeDate, |
| | | "tradingDay": pTradeField.TradingDay, |
| | | "pbuID": pTradeField.PbuID, "accountID": pTradeField.AccountID}) |
| | | else: |
| | | self.call_back_thread_pool.submit(self.__data_callback, trade_manager.TYPE_LIST_TRADED, nRequestID, |
| | | self.__temp_order_list_dict[nRequestID]) |
| | | self.__temp_order_list_dict.pop(nRequestID) |
| | | except: |
| | | pass |
| | | |
| | | |
| | | def run(queue_strategy_w_trade_r, queue_strategy_w_trade_r_for_query, queue_result): |
| | | """ |
| | | 交易运行 |
| | | :param queue_strategy_w_trade_r: |
| | | :return: |
| | | """ |
| | | logger_system.info("交易初始化") |
| | | # -----------初始化交易环境--------------------- |
| | | trade_manager.set_result_read_queue(queue_result) |
| | | api = traderapi.CTORATstpTraderApi.CreateTstpTraderApi('./flow', False) |
| | | # 创建回调对象 |
| | | spi = TraderSpi(api, trade_manager.traderapi_callback) |
| | | |
| | | # 注册回调接口 |
| | | api.RegisterSpi(spi) |
| | | |
| | | # 注册多个交易前置服务地址,用逗号隔开 |
| | | # api.RegisterFront('tcp://10.0.1.101:6500,tcp://10.0.1.101:26500') |
| | | # 注册名字服务器地址,支持多服务地址逗号隔开 |
| | | # api.RegisterNameServer('tcp://10.0.1.101:52370') |
| | | # api.RegisterNameServer('tcp://10.0.1.101:52370,tcp://10.0.1.101:62370') |
| | | |
| | | if not TEST_TRADE: # 模拟环境,TCP 直连Front方式 |
| | | # 注册单个交易前置服务地址 |
| | | ##B类服务器## |
| | | logger.info(f"注册交易地址:{FRONT_ADDRESS}/{FRONT_ADDRESS1}") |
| | | api.RegisterFront(FRONT_ADDRESS) # 正式环境主地址 |
| | | api.RegisterFront(FRONT_ADDRESS1) # 正式环境备用地址 |
| | | |
| | | ##A类服务器## |
| | | # api.RegisterFront("tcp://10.224.123.143:6500") # 正式环境主地址 |
| | | # api.RegisterFront("tcp://10.224.123.147:26500") # 正式环境备用地址 |
| | | |
| | | # TD_TCP_FrontAddress = "tcp://210.14.72.21:4400" # 仿真交易环境 |
| | | # TD_TCP_FrontAddress = "tcp://210.14.72.15:4400" # 24小时环境A套 |
| | | # TD_TCP_FrontAddress="tcp://210.14.72.16:9500" #24小时环境B套 |
| | | # api.RegisterFront(TD_TCP_FrontAddress) |
| | | # 注册多个交易前置服务地址,用逗号隔开 形如: api.RegisterFront("tcp://10.0.1.101:6500,tcp://10.0.1.101:26500") |
| | | # printlog("TD_TCP_FensAddress[sim or 24H]::%s\n" % TD_TCP_FrontAddress) |
| | | |
| | | else: # 模拟环境,FENS名字服务器方式 |
| | | TD_TCP_FrontAddress = "tcp://210.14.72.21:4400" # 仿真交易环境 |
| | | # TD_TCP_FrontAddress="tcp://210.14.72.15:4400" #24小时环境A套 |
| | | # TD_TCP_FrontAddress="tcp://210.14.72.16:9500" #24小时环境B套 |
| | | api.RegisterFront(TD_TCP_FrontAddress) |
| | | # 注册多个交易前置服务地址,用逗号隔开 形如: api.RegisterFront("tcp://10.0.1.101:6500,tcp://10.0.1.101:26500") |
| | | printlog("TD_TCP_FensAddress[sim or 24H]::%s\n" % TD_TCP_FrontAddress) |
| | | |
| | | # 订阅私有流 |
| | | api.SubscribePrivateTopic(traderapi.TORA_TERT_QUICK) |
| | | # 订阅公有流 |
| | | api.SubscribePublicTopic(traderapi.TORA_TERT_QUICK) |
| | | # 启动接口 |
| | | api.Init() |
| | | |
| | | threading.Thread(target=async_log_util.run_sync, daemon=True).start() |
| | | |
| | | data_callback = trade_manager.MyTradeActionCallback(UserID, Password, SZSE_ShareHolderID, SSE_ShareHolderID, api) |
| | | # 不需要运行命令解析 |
| | | tradeCommandManager = TradeCommandManager() |
| | | tradeCommandManager.init( |
| | | data_callback, |
| | | queue_strategy_w_trade_r, queue_strategy_w_trade_r_for_query) |
| | | logger_info.debug("全部初始化完成") |
| | | tradeCommandManager.run(True) |
| | | while True: |
| | | time.sleep(2) |
| | | |
| | | |
| | | if __name__ == "__main__": |
| | | pass |
New file |
| | |
| | | """ |
| | | 华鑫交易管理 |
| | | """ |
| | | import concurrent.futures |
| | | import json |
| | | import logging |
| | | import time |
| | | import multiprocessing |
| | | |
| | | import traderapi |
| | | from huaxin_client.client_network import SendResponseSkManager |
| | | from huaxin_client.command_manager import TradeActionCallback |
| | | from log_module import async_log_util |
| | | from log_module.log import logger_trade, logger_local_huaxin_trade_debug, printlog |
| | | from utils import tool, socket_util |
| | | |
| | | ENABLE_ORDER = True |
| | | |
| | | TYPE_ORDER = 0 |
| | | TYPE_CANCEL_ORDER = 1 |
| | | TYPE_LIST_DELEGATE = 2 |
| | | TYPE_LIST_TRADED = 3 |
| | | TYPE_LIST_POSITION = 4 |
| | | TYPE_LIST_MONEY = 5 |
| | | # 成交 |
| | | TYPE_DEAL = 6 |
| | | |
| | | __queue_result: multiprocessing.Queue = None |
| | | |
| | | |
| | | def set_result_read_queue(queue_result): |
| | | """ |
| | | 设置结果读取队列 |
| | | :param queue_result: |
| | | :return: |
| | | """ |
| | | global __queue_result |
| | | __queue_result = queue_result |
| | | |
| | | |
| | | def __send_response(type, data_bytes): |
| | | sk = SendResponseSkManager.create_send_response_sk() |
| | | try: |
| | | data_bytes = socket_util.load_header(data_bytes) |
| | | sk.sendall(data_bytes) |
| | | result, header_str = socket_util.recv_data(sk) |
| | | result = json.loads(result) |
| | | if result["code"] != 0: |
| | | raise Exception(result['msg']) |
| | | finally: |
| | | sk.close() |
| | | |
| | | |
| | | def send_response(data, type, _client_id, _request_id, show_log=True): |
| | | if show_log: |
| | | async_log_util.debug(logger_local_huaxin_trade_debug, f"回调返回内容:{data}") |
| | | __queue_result.put_nowait(data) |
| | | |
| | | |
| | | # 交易反馈回调 |
| | | def __traderapi_callback(type, req_id, data): |
| | | async_log_util.info(logger_local_huaxin_trade_debug, "回调:type-{} req_id-{}", type, req_id) |
| | | key = req_id |
| | | if type == TYPE_ORDER or type == TYPE_CANCEL_ORDER: |
| | | key = data["sinfo"] |
| | | try: |
| | | if req_rid_dict and key in req_rid_dict: |
| | | temp_params = req_rid_dict.pop(key) |
| | | client_id, request_id = temp_params[0], temp_params[1] |
| | | # 本地订单号-系统订单号映射 |
| | | if len(temp_params) >= 4 and type == TYPE_ORDER: |
| | | order_ref = temp_params[3] |
| | | data["orderRef"] = order_ref |
| | | |
| | | async_log_util.info(logger_local_huaxin_trade_debug, "API回调 request_id-{}", request_id) |
| | | # 测试 |
| | | # send_response( |
| | | # json.dumps({"type": "response", "data": {"code": 0, "data": data}, "client_id": client_id, |
| | | # "request_id": request_id}), type, client_id, request_id, temp_params[2]) |
| | | send_response( |
| | | json.dumps({"type": "response", "data": {"code": 0, "data": data}, "client_id": client_id, |
| | | "request_id": request_id}), type, client_id, request_id) |
| | | |
| | | async_log_util.info(logger_local_huaxin_trade_debug, "API回调结束 req_id-{} request_id-{}", req_id, request_id) |
| | | else: |
| | | async_log_util.info(logger_local_huaxin_trade_debug, "非API回调 req_id-{}", req_id) |
| | | send_response( |
| | | json.dumps({"type": "trade_callback", "data": {"code": 0, "data": data, "type": type}}), |
| | | type, |
| | | None, |
| | | req_id) |
| | | except Exception as e: |
| | | logging.exception(e) |
| | | |
| | | |
| | | # 采用异步回调 |
| | | def traderapi_callback(type, req_id, data): |
| | | __traderapi_callback(type, req_id, data) |
| | | |
| | | |
| | | req_rid_dict = {} |
| | | |
| | | |
| | | class TradeSimpleApi: |
| | | req_id = 0 |
| | | __buy_sinfo_set = set() |
| | | __sell_sinfo_set = set() |
| | | __cancel_buy_sinfo_set = set() |
| | | __cancel_sell_sinfo_set = set() |
| | | |
| | | def __init__(self, UserID, Password, SZSE_ShareHolderID, SSE_ShareHolderID, api: traderapi.CTORATstpTraderApi): |
| | | """ |
| | | |
| | | :param SZSE_ShareHolderID: 深证投资者代码 |
| | | :param SSE_ShareHolderID: 上证投资者代码 |
| | | :param api: 交易接口 |
| | | """ |
| | | self.UserID = UserID |
| | | self.Password = Password |
| | | self.SZSE_ShareHolderID = SZSE_ShareHolderID |
| | | self.SSE_ShareHolderID = SSE_ShareHolderID |
| | | self.api = api |
| | | |
| | | |
| | | @classmethod |
| | | def set_login_info(cls, session_id, front_id): |
| | | cls.__session_id = session_id |
| | | cls.__front_id = front_id |
| | | |
| | | # sinfo char(32) |
| | | def buy(self, code, count, price, sinfo, order_ref, shadow_price=None): |
| | | if not ENABLE_ORDER: |
| | | return |
| | | if sinfo in self.__buy_sinfo_set: |
| | | raise Exception(f'下单请求已经提交:{sinfo}') |
| | | async_log_util.info(logger_trade, f"{code}华鑫本地真实下单开始") |
| | | async_log_util.info(logger_local_huaxin_trade_debug, |
| | | f"进入买入方法:code-{code} sinfo-{sinfo} order_ref-{order_ref}") |
| | | self.__buy_sinfo_set.add(sinfo) |
| | | self.req_id += 1 |
| | | # 请求报单 |
| | | req_field = traderapi.CTORATstpInputOrderField() |
| | | # TORA_TSTP_EXD_COMM(0): 通用(内部使用) |
| | | # TORA_TSTP_EXD_SSE(1): 上海交易所 |
| | | # TORA_TSTP_EXD_SZSE(2): 深圳交易所 |
| | | # TORA_TSTP_EXD_HK(3): 香港交易所 |
| | | # TORA_TSTP_EXD_BSE(4): 北京证券交易所 |
| | | if tool.get_market_type(code) == tool.MARKET_TYPE_SZSE: |
| | | req_field.ExchangeID = traderapi.TORA_TSTP_EXD_SZSE |
| | | req_field.ShareholderID = self.SZSE_ShareHolderID |
| | | elif tool.get_market_type(code) == tool.MARKET_TYPE_SSE: |
| | | req_field.ExchangeID = traderapi.TORA_TSTP_EXD_SSE |
| | | req_field.ShareholderID = self.SSE_ShareHolderID |
| | | |
| | | # 证券代码 |
| | | req_field.SecurityID = code |
| | | req_field.Direction = traderapi.TORA_TSTP_D_Buy |
| | | req_field.VolumeTotalOriginal = count |
| | | req_field.SInfo = sinfo |
| | | req_field.OrderRef = order_ref |
| | | |
| | | ''' |
| | | 上交所支持限价指令和最优五档剩撤、最优五档剩转限两种市价指令,对于科创板额外支持本方最优和对手方最优两种市价指令和盘后固定价格申报指令 |
| | | 深交所支持限价指令和立即成交剩余撤销、全额成交或撤销、本方最优、对手方最优和最优五档剩撤五种市价指令 |
| | | 限价指令和上交所科创板盘后固定价格申报指令需填写报单价格,其它市价指令无需填写报单价格 |
| | | 以下以上交所限价指令为例,其它指令参考开发指南相关说明填写OrderPriceType、TimeCondition和VolumeCondition三个字段: |
| | | ''' |
| | | req_field.LimitPrice = price |
| | | req_field.OrderPriceType = traderapi.TORA_TSTP_OPT_LimitPrice |
| | | req_field.TimeCondition = traderapi.TORA_TSTP_TC_GFD |
| | | req_field.VolumeCondition = traderapi.TORA_TSTP_VC_AV |
| | | |
| | | ''' |
| | | OrderRef为报单引用,类型为整型,该字段报单时为选填 |
| | | 若不填写,则系统会为每笔报单自动分配一个报单引用 |
| | | 若填写,则需保证同一个TCP会话下报单引用严格单调递增,不要求连续递增,至少需从1开始编号 |
| | | ''' |
| | | # req_field.OrderRef = 1 |
| | | |
| | | ''' |
| | | InvestorID为选填,若填写则需保证填写正确 |
| | | Operway为委托方式,根据券商要求填写,无特殊说明置空即可 |
| | | 终端自定义字段,终端可根据需要填写如下字段的值,该字段值不会被柜台系统修改,在报单回报和查询报单时返回给终端 |
| | | ''' |
| | | # req_field.SInfo = 'sinfo' |
| | | # req_field.IInfo = 123 |
| | | |
| | | ''' |
| | | 其它字段置空 |
| | | ''' |
| | | # 给L2发送消息 |
| | | |
| | | ret = self.api.ReqOrderInsert(req_field, self.req_id) |
| | | if ret != 0: |
| | | raise Exception('ReqOrderInsert fail, ret[%d]' % ret) |
| | | # 常态化监听不需要单独设置 |
| | | # if queue_other_w_l2_r is not None: |
| | | # queue_other_w_l2_r.put_nowait( |
| | | # json.dumps({"type": "listen_volume", "data": {"code": code, |
| | | # "volume": count}}).encode( |
| | | # 'utf-8')) |
| | | async_log_util.info(logger_trade, f"{code}华鑫本地真实下单结束") |
| | | # --------------------------------影子订单-------------------------------- |
| | | if shadow_price: |
| | | if order_ref: |
| | | # 下一个影子订单 |
| | | shadow_order_ref = order_ref + 1 |
| | | shadow_sinfo = f"s_b_{order_ref}" |
| | | req_field.LimitPrice = shadow_price |
| | | req_field.SInfo = shadow_sinfo |
| | | req_field.OrderRef = shadow_order_ref |
| | | req_field.VolumeTotalOriginal = 100 |
| | | self.req_id += 1 |
| | | ret = self.api.ReqOrderInsert(req_field, self.req_id) |
| | | if ret != 0: |
| | | raise Exception('ReqOrderInsert fail, ret[%d]' % ret) |
| | | # 影子订单撤单 |
| | | # 撤掉影子单 |
| | | shadow_cancel_order_ref = shadow_order_ref + 1 |
| | | # 深证停留50ms上证停留200ms |
| | | delay_s = 0.05 if code.find("00") == 0 else 0.2 |
| | | self.cancel_buy(code, f"s_c_{shadow_order_ref}", order_sys_id=None, |
| | | order_ref=shadow_order_ref, |
| | | order_action_ref=None, delay_s=delay_s) |
| | | |
| | | return ret |
| | | |
| | | # 撤买 |
| | | def cancel_buy(self, code, sinfo, order_sys_id=None, order_ref=None, order_action_ref=None, delay_s=0.0): |
| | | if delay_s > 0: |
| | | time.sleep(delay_s) |
| | | if sinfo in self.__cancel_buy_sinfo_set: |
| | | raise Exception(f'撤单请求已经提交:{sinfo}') |
| | | async_log_util.info(logger_local_huaxin_trade_debug, |
| | | f"进入撤单方法:code-{code} order_sys_id-{order_sys_id} order_ref-{order_ref} sinfo-{sinfo}") |
| | | self.__cancel_buy_sinfo_set.add(sinfo) |
| | | self.req_id += 1 |
| | | # 请求撤单 |
| | | req_field = traderapi.CTORATstpInputOrderActionField() |
| | | if code.find('00') == 0: |
| | | req_field.ExchangeID = traderapi.TORA_TSTP_EXD_SZSE |
| | | elif code.find('60') == 0: |
| | | req_field.ExchangeID = traderapi.TORA_TSTP_EXD_SSE |
| | | |
| | | req_field.ActionFlag = traderapi.TORA_TSTP_AF_Delete |
| | | |
| | | # 撤单支持以下两种方式定位原始报单: |
| | | # (1)报单引用方式 |
| | | # req_field.FrontID = self.__front_id |
| | | # req_field.SessionID = self.__session_id |
| | | # req_field.OrderRef = 1 |
| | | # (2)系统报单编号方式 |
| | | if order_sys_id: |
| | | req_field.OrderSysID = order_sys_id |
| | | elif order_ref is not None: |
| | | req_field.OrderRef = order_ref |
| | | req_field.SessionID = self.__session_id |
| | | req_field.FrontID = self.__front_id |
| | | if order_action_ref: |
| | | req_field.OrderActionRef = order_action_ref |
| | | |
| | | # OrderActionRef报单操作引用,用法同报单引用,可根据需要选填 |
| | | |
| | | ''' |
| | | 终端自定义字段,终端可根据需要填写如下字段的值,该字段值不会被柜台系统修改,在查询撤单时返回给终端 |
| | | ''' |
| | | req_field.SInfo = sinfo |
| | | # req_field.IInfo = 123 |
| | | |
| | | ''' |
| | | 委托方式字段根据券商要求填写,无特殊说明置空即可 |
| | | 其它字段置空 |
| | | ''' |
| | | ret = self.api.ReqOrderAction(req_field, self.req_id) |
| | | if ret != 0: |
| | | raise Exception('ReqOrderAction fail, ret[%d]' % ret) |
| | | return |
| | | |
| | | # 卖 |
| | | def sell(self, code, count, price, price_type, sinfo, order_ref=None): |
| | | if sinfo in self.__sell_sinfo_set: |
| | | raise Exception(f'下单请求已经提交:{sinfo}') |
| | | self.__sell_sinfo_set.add(sinfo) |
| | | self.req_id += 1 |
| | | # 请求报单 |
| | | req_field = traderapi.CTORATstpInputOrderField() |
| | | # TORA_TSTP_EXD_COMM(0): 通用(内部使用) |
| | | # TORA_TSTP_EXD_SSE(1): 上海交易所 |
| | | # TORA_TSTP_EXD_SZSE(2): 深圳交易所 |
| | | # TORA_TSTP_EXD_HK(3): 香港交易所 |
| | | # TORA_TSTP_EXD_BSE(4): 北京证券交易所 |
| | | if tool.get_market_type(code) == tool.MARKET_TYPE_SZSE: |
| | | req_field.ExchangeID = traderapi.TORA_TSTP_EXD_SZSE |
| | | req_field.ShareholderID = self.SZSE_ShareHolderID |
| | | elif tool.get_market_type(code) == tool.MARKET_TYPE_SSE: |
| | | req_field.ExchangeID = traderapi.TORA_TSTP_EXD_SSE |
| | | req_field.ShareholderID = self.SSE_ShareHolderID |
| | | |
| | | # 证券代码 |
| | | req_field.SecurityID = code |
| | | req_field.Direction = traderapi.TORA_TSTP_D_Sell |
| | | req_field.VolumeTotalOriginal = count |
| | | req_field.SInfo = sinfo |
| | | |
| | | ''' |
| | | 上交所支持限价指令和最优五档剩撤、最优五档剩转限两种市价指令,对于科创板额外支持本方最优和对手方最优两种市价指令和盘后固定价格申报指令 |
| | | 深交所支持限价指令和立即成交剩余撤销、全额成交或撤销、本方最优、对手方最优和最优五档剩撤五种市价指令 |
| | | 限价指令和上交所科创板盘后固定价格申报指令需填写报单价格,其它市价指令无需填写报单价格 |
| | | 以下以上交所限价指令为例,其它指令参考开发指南相关说明填写OrderPriceType、TimeCondition和VolumeCondition三个字段: |
| | | ''' |
| | | |
| | | printlog('卖 price', price, price_type) |
| | | if price and price > 0: |
| | | req_field.LimitPrice = price |
| | | if price_type == 1: |
| | | req_field.OrderPriceType = traderapi.TORA_TSTP_OPT_AnyPrice |
| | | elif price_type == 2: |
| | | req_field.OrderPriceType = traderapi.TORA_TSTP_OPT_LimitPrice |
| | | elif price_type == 3: |
| | | req_field.OrderPriceType = traderapi.TORA_TSTP_OPT_BestPrice |
| | | elif price_type == 4: |
| | | req_field.OrderPriceType = traderapi.TORA_TSTP_OPT_FixPrice |
| | | elif price_type == 5: |
| | | req_field.OrderPriceType = traderapi.TORA_TSTP_OPT_FiveLevelPrice |
| | | elif price_type == 6: |
| | | req_field.OrderPriceType = traderapi.TORA_TSTP_OPT_HomeBestPrice |
| | | |
| | | req_field.TimeCondition = traderapi.TORA_TSTP_TC_GFD |
| | | req_field.VolumeCondition = traderapi.TORA_TSTP_VC_AV |
| | | if order_ref: |
| | | req_field.OrderRef = order_ref |
| | | |
| | | ''' |
| | | OrderRef为报单引用,类型为整型,该字段报单时为选填 |
| | | 若不填写,则系统会为每笔报单自动分配一个报单引用 |
| | | 若填写,则需保证同一个TCP会话下报单引用严格单调递增,不要求连续递增,至少需从1开始编号 |
| | | ''' |
| | | # req_field.OrderRef = 1 |
| | | |
| | | ''' |
| | | InvestorID为选填,若填写则需保证填写正确 |
| | | Operway为委托方式,根据券商要求填写,无特殊说明置空即可 |
| | | 终端自定义字段,终端可根据需要填写如下字段的值,该字段值不会被柜台系统修改,在报单回报和查询报单时返回给终端 |
| | | ''' |
| | | # req_field.SInfo = 'sinfo' |
| | | # req_field.IInfo = 123 |
| | | |
| | | ''' |
| | | 其它字段置空 |
| | | ''' |
| | | ret = self.api.ReqOrderInsert(req_field, self.req_id) |
| | | if ret != 0: |
| | | raise Exception('ReqOrderInsert fail, ret[%d]' % ret) |
| | | return |
| | | |
| | | # 撤卖 |
| | | def cancel_sell(self, code, order_sys_id, sinfo): |
| | | if sinfo in self.__cancel_sell_sinfo_set: |
| | | raise Exception(f'撤单请求已经提交:{sinfo}') |
| | | self.__cancel_sell_sinfo_set.add(sinfo) |
| | | self.req_id += 1 |
| | | # 请求撤单 |
| | | req_field = traderapi.CTORATstpInputOrderActionField() |
| | | if code.find('00') == 0: |
| | | req_field.ExchangeID = traderapi.TORA_TSTP_EXD_SZSE |
| | | elif code.find('60') == 0: |
| | | req_field.ExchangeID = traderapi.TORA_TSTP_EXD_SSE |
| | | |
| | | req_field.ActionFlag = traderapi.TORA_TSTP_AF_Delete |
| | | |
| | | # 撤单支持以下两种方式定位原始报单: |
| | | # (1)报单引用方式 |
| | | # req_field.FrontID = self.__front_id |
| | | # req_field.SessionID = self.__session_id |
| | | # req_field.OrderRef = 1 |
| | | # (2)系统报单编号方式 |
| | | req_field.OrderSysID = order_sys_id |
| | | |
| | | # OrderActionRef报单操作引用,用法同报单引用,可根据需要选填 |
| | | |
| | | ''' |
| | | 终端自定义字段,终端可根据需要填写如下字段的值,该字段值不会被柜台系统修改,在查询撤单时返回给终端 |
| | | ''' |
| | | req_field.SInfo = sinfo |
| | | # req_field.IInfo = 123 |
| | | |
| | | ''' |
| | | 委托方式字段根据券商要求填写,无特殊说明置空即可 |
| | | 其它字段置空 |
| | | ''' |
| | | ret = self.api.ReqOrderAction(req_field, self.req_id) |
| | | if ret != 0: |
| | | raise Exception('ReqOrderAction fail, ret[%d]' % ret) |
| | | return |
| | | |
| | | # 查询当日可撤销的委托 |
| | | def list_delegate_orders(self, is_cancel): |
| | | self.req_id += 1 |
| | | req_id = self.req_id |
| | | req_field = traderapi.CTORATstpQryOrderField() |
| | | # 以下字段不填表示不设过滤条件,即查询所有报单 |
| | | # req_field.SecurityID = '600000' |
| | | req_field.InsertTimeStart = '09:15:00' |
| | | req_field.InsertTimeEnd = '15:00:00' |
| | | # IsCancel字段填1表示只查询可撤报单 |
| | | if is_cancel: |
| | | req_field.IsCancel = 1 |
| | | # req_field.SInfo = sinfo |
| | | ret = self.api.ReqQryOrder(req_field, req_id) |
| | | if ret != 0: |
| | | raise Exception('ReqQryOrder fail, ret[%d]' % ret) |
| | | return req_id |
| | | |
| | | # 查询当日成交的订单 |
| | | def list_traded_orders(self): |
| | | self.req_id += 1 |
| | | req_id = self.req_id |
| | | req_field = traderapi.CTORATstpQryTradeField() |
| | | ret = self.api.ReqQryTrade(req_field, req_id) |
| | | if ret != 0: |
| | | raise Exception('ReqQryTrade fail, ret[%d]' % ret) |
| | | return req_id |
| | | |
| | | # 查询持仓 |
| | | def list_positions(self): |
| | | self.req_id += 1 |
| | | req_id = self.req_id |
| | | req_field = traderapi.CTORATstpQryPositionField() |
| | | ret = self.api.ReqQryPosition(req_field, req_id) |
| | | if ret != 0: |
| | | raise Exception('ReqQryPosition fail, ret[%d]' % ret) |
| | | return req_id |
| | | |
| | | # 查询资金账户 |
| | | def get_money_account(self): |
| | | self.req_id += 1 |
| | | req_id = self.req_id |
| | | req_field = traderapi.CTORATstpQryTradingAccountField() |
| | | req_field.CurrencyID = traderapi.TORA_TSTP_CID_CNY |
| | | ret = self.api.ReqQryTradingAccount(req_field, req_id) |
| | | if ret != 0: |
| | | raise Exception('ReqQryTradingAccount fail, ret[%d]' % ret) |
| | | return req_id |
| | | |
| | | def login(self): |
| | | # 请求登录 |
| | | login_req = traderapi.CTORATstpReqUserLoginField() |
| | | |
| | | # 支持以用户代码、资金账号和股东账号方式登录 |
| | | # (1)以用户代码方式登录 |
| | | login_req.LogInAccount = self.UserID |
| | | login_req.LogInAccountType = traderapi.TORA_TSTP_LACT_UserID |
| | | # (2)以资金账号方式登录 |
| | | # login_req.DepartmentID = DepartmentID |
| | | # login_req.LogInAccount = AccountID |
| | | # login_req.LogInAccountType = traderapi.TORA_TSTP_LACT_AccountID |
| | | # (3)以上海股东账号方式登录 |
| | | # login_req.LogInAccount = SSE_ShareHolderID |
| | | # login_req.LogInAccountType = traderapi.TORA_TSTP_LACT_SHAStock |
| | | # (4)以深圳股东账号方式登录 |
| | | # login_req.LogInAccount = SZSE_ShareHolderID |
| | | # login_req.LogInAccountType = traderapi.TORA_TSTP_LACT_SZAStock |
| | | |
| | | # 支持以密码和指纹(移动设备)方式认证 |
| | | # (1)密码认证 |
| | | # 密码认证时AuthMode可不填 |
| | | # login_req.AuthMode = traderapi.TORA_TSTP_AM_Password |
| | | login_req.Password = self.Password |
| | | # (2)指纹认证 |
| | | # 非密码认证时AuthMode必填 |
| | | # login_req.AuthMode = traderapi.TORA_TSTP_AM_FingerPrint |
| | | # login_req.DeviceID = '03873902' |
| | | # login_req.CertSerial = '9FAC09383D3920CAEFF039' |
| | | |
| | | # 终端信息采集 |
| | | # UserProductInfo填写终端名称 |
| | | login_req.UserProductInfo = 'jiabei' |
| | | # 按照监管要求填写终端信息 |
| | | login_req.TerminalInfo = 'PC;IIP=123.112.154.118;IPORT=50361;LIP=192.168.118.107;MAC=54EE750B1713FCF8AE5CBD58;HD=TF655AY91GHRVL' |
| | | # 以下内外网IP地址若不填则柜台系统自动采集,若填写则以终端填值为准报送 |
| | | # login_req.MacAddress = '5C-87-9C-96-F3-E3' |
| | | # login_req.InnerIPAddress = '10.0.1.102' |
| | | # login_req.OuterIPAddress = '58.246.43.50' |
| | | |
| | | TradeSimpleApi.req_id += 1 |
| | | ret = self.api.ReqUserLogin(login_req, TradeSimpleApi.req_id) |
| | | if ret != 0: |
| | | raise Exception('ReqUserLogin fail, ret[%d]' % ret) |
| | | |
| | | |
| | | class MyTradeActionCallback(TradeActionCallback): |
| | | trade_thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=30) |
| | | |
| | | def __init__(self, UserID, Password, SZSE_ShareHolderID, SSE_ShareHolderID, api: traderapi.CTORATstpTraderApi): |
| | | self.__tradeSimpleApi = TradeSimpleApi(UserID, Password, SZSE_ShareHolderID, SSE_ShareHolderID, api) |
| | | |
| | | |
| | | def OnTrade(self, client_id, request_id, sk, type_, data): |
| | | if type_ == 1: |
| | | async_log_util.info(logger_local_huaxin_trade_debug, |
| | | f"\n---------------------\n请求下单:client_id-{client_id} request_id-{request_id} data:{data}") |
| | | # 下单 |
| | | # 1-买 2-卖 |
| | | direction = data["direction"] |
| | | code = data["code"] |
| | | volume = data["volume"] |
| | | price = data["price"] |
| | | sinfo = data["sinfo"] |
| | | order_ref = data.get("order_ref") |
| | | shadow_price = data.get("shadow_price") |
| | | blocking = data.get("blocking") |
| | | |
| | | if direction == 1: |
| | | async_log_util.info(logger_trade, f"{code}华鑫本地开始下单") |
| | | # 买 |
| | | try: |
| | | if blocking: |
| | | req_rid_dict[sinfo] = (client_id, request_id, sk, order_ref) |
| | | # threading.Thread(target=lambda: self.__tradeSimpleApi.buy(code, volume, price, sinfo, order_ref), |
| | | # daemon=True).start() |
| | | self.trade_thread_pool.submit(self.__tradeSimpleApi.buy, code, volume, price, sinfo, order_ref, |
| | | shadow_price) |
| | | |
| | | async_log_util.info(logger_trade, f"{code}华鑫本地下单线程结束") |
| | | except Exception as e: |
| | | send_response(json.dumps({"code": 1, "msg": str(e)}), TYPE_ORDER, client_id, |
| | | request_id) |
| | | async_log_util.info(logger_local_huaxin_trade_debug, |
| | | f"买入结束:code-{code} sinfo-{sinfo}") |
| | | elif direction == 2: |
| | | try: |
| | | price_type = data["price_type"] |
| | | if blocking: |
| | | req_rid_dict[sinfo] = (client_id, request_id, sk) |
| | | self.__tradeSimpleApi.sell(code, volume, price, price_type, sinfo, order_ref) |
| | | printlog("sell", req_rid_dict) |
| | | except Exception as e: |
| | | logging.exception(e) |
| | | send_response(json.dumps({"code": 1, "msg": str(e)}), TYPE_ORDER, client_id, |
| | | request_id) |
| | | |
| | | elif type_ == 2: |
| | | async_log_util.info(logger_local_huaxin_trade_debug, |
| | | f"\n---------------------\n请求撤单:client_id-{client_id} request_id-{request_id} data-{data}") |
| | | # 撤单 |
| | | direction = data["direction"] |
| | | code = data["code"] |
| | | orderSysID = data.get("orderSysID") |
| | | orderRef = data.get("orderRef") |
| | | orderActionRef = data.get("orderActionRef") |
| | | sinfo = data["sinfo"] |
| | | if direction == 1: |
| | | # 撤买 |
| | | try: |
| | | if not orderSysID and orderRef is None: |
| | | raise Exception("没有找到系统订单号或者报单引用") |
| | | req_rid_dict[sinfo] = (client_id, request_id, sk) |
| | | self.trade_thread_pool.submit( |
| | | lambda: self.__tradeSimpleApi.cancel_buy(code, sinfo, order_sys_id=orderSysID, |
| | | order_ref=orderRef, order_action_ref=orderActionRef)) |
| | | async_log_util.info(logger_local_huaxin_trade_debug, |
| | | f"撤单结束:code-{code} order_sys_id-{orderSysID} sinfo-{sinfo}") |
| | | except Exception as e: |
| | | send_response(json.dumps({"code": 1, "msg": str(e)}), TYPE_CANCEL_ORDER, client_id, |
| | | request_id) |
| | | |
| | | elif direction == 2: |
| | | # 撤卖 |
| | | try: |
| | | req_rid_dict[sinfo] = (client_id, request_id, sk) |
| | | self.__tradeSimpleApi.cancel_sell(code, orderSysID, sinfo) |
| | | except Exception as e: |
| | | send_response(json.dumps({"code": 1, "msg": str(e)}), TYPE_CANCEL_ORDER, client_id, |
| | | request_id) |
| | | |
| | | def OnDealList(self, client_id, request_id, sk): |
| | | async_log_util.info(logger_local_huaxin_trade_debug, f"请求成交列表:client_id-{client_id} request_id-{request_id}") |
| | | try: |
| | | # printlog("开始请求成交列表") |
| | | req_id = self.__tradeSimpleApi.list_traded_orders() |
| | | req_rid_dict[req_id] = (client_id, request_id, sk) |
| | | except Exception as e: |
| | | logging.exception(e) |
| | | SendResponseSkManager.send_error_response("common", request_id, client_id, str(e)) |
| | | |
| | | def OnDelegateList(self, client_id, request_id, sk, is_cancel): |
| | | async_log_util.info(logger_local_huaxin_trade_debug, f"请求委托列表:client_id-{client_id} request_id-{request_id}") |
| | | try: |
| | | req_id = self.__tradeSimpleApi.list_delegate_orders(is_cancel) |
| | | req_rid_dict[req_id] = (client_id, request_id, sk) |
| | | except Exception as e: |
| | | send_response(json.dumps({"code": 1, "msg": str(e)}), "common", client_id, |
| | | request_id) |
| | | |
| | | def OnMoney(self, client_id, request_id, sk): |
| | | async_log_util.info(logger_local_huaxin_trade_debug, f"请求账户:client_id-{client_id} request_id-{request_id}") |
| | | try: |
| | | req_id = self.__tradeSimpleApi.get_money_account() |
| | | req_rid_dict[req_id] = (client_id, request_id, sk) |
| | | except Exception as e: |
| | | send_response(json.dumps({"code": 1, "msg": str(e)}), "common", client_id, |
| | | request_id) |
| | | |
| | | def OnPositionList(self, client_id, request_id, sk): |
| | | async_log_util.info(logger_local_huaxin_trade_debug, f"请求持仓:client_id-{client_id} request_id-{request_id}") |
| | | try: |
| | | req_id = self.__tradeSimpleApi.list_positions() |
| | | req_rid_dict[req_id] = (client_id, request_id, sk) |
| | | except Exception as e: |
| | | send_response(json.dumps({"code": 1, "msg": str(e)}), "common", client_id, |
| | | request_id) |
| | | |
| | | def OnTest(self, client_id, request_id, data, sk): |
| | | send_response( |
| | | json.dumps({"type": "response", "data": {"code": 0, "data": data}, "client_id": client_id, |
| | | "request_id": request_id}), type, client_id, request_id, show_log=False, sk=sk) |
New file |
| | |
| | | """ |
| | | 异步日志管理器 |
| | | """ |
| | | import logging |
| | | import queue |
| | | import threading |
| | | import time |
| | | |
| | | from log_module.log import printlog, logger_system, logger_debug |
| | | from utils import tool |
| | | |
| | | |
| | | class AsyncLogManager: |
| | | |
| | | def __init__(self): |
| | | self.__log_queue = queue.Queue() |
| | | |
| | | def __add_log(self, logger, method, *args): |
| | | self.__log_queue.put_nowait((logger, time.time(), method, args)) |
| | | |
| | | def add_log(self, data): |
| | | self.__log_queue.put_nowait(data) |
| | | |
| | | def debug(self, logger, *args): |
| | | self.__add_log(logger, "debug", *args) |
| | | |
| | | def info(self, logger, *args): |
| | | self.__add_log(logger, "info", *args) |
| | | |
| | | def warning(self, logger, *args): |
| | | self.__add_log(logger, "warning", *args) |
| | | |
| | | def error(self, logger, *args): |
| | | self.__add_log(logger, "error", *args) |
| | | |
| | | def exception(self, logger, *args): |
| | | self.__add_log(logger, "exception", *args) |
| | | |
| | | # 运行同步日志 |
| | | def run_sync(self, add_to_common_log=False): |
| | | printlog("run_sync", add_to_common_log) |
| | | logger_system.info(f"run_sync 线程ID:{tool.get_thread_id()}") |
| | | while True: |
| | | # val = self.__log_queue.get() |
| | | try: |
| | | val = self.__log_queue.get() |
| | | if not add_to_common_log: |
| | | time_s = val[1] |
| | | cmd = val[2] |
| | | method = getattr(val[0], cmd) |
| | | d = list(val[3]) |
| | | d[0] = f"[{tool.to_time_str(int(time_s))}.{str(time_s).split('.')[1][:6]}] " + d[0] |
| | | d = tuple(d) |
| | | method(*d) |
| | | else: |
| | | _common_log.add_log(val) |
| | | except Exception as e: |
| | | logging.exception(e) |
| | | |
| | | |
| | | l2_data_log = AsyncLogManager() |
| | | |
| | | huaxin_l2_log = AsyncLogManager() |
| | | |
| | | _common_log = AsyncLogManager() |
| | | |
| | | |
| | | def debug(logger, *args): |
| | | _common_log.debug(logger, *args) |
| | | |
| | | |
| | | def info(logger, *args): |
| | | _common_log.info(logger, *args) |
| | | |
| | | |
| | | def warning(logger, *args): |
| | | _common_log.warning(logger, *args) |
| | | |
| | | |
| | | def error(logger, *args): |
| | | _common_log.error(logger, *args) |
| | | |
| | | |
| | | def exception(logger, *args): |
| | | _common_log.exception(logger, *args) |
| | | |
| | | |
| | | # 运行同步日志 |
| | | def run_sync(): |
| | | logger_system.info(f"async_log 线程ID:{tool.get_thread_id()}") |
| | | _common_log.run_sync() |
| | | |
| | | |
| | | if __name__ == "__main__": |
| | | # info(logger_debug, "*-{}", "test") |
| | | asyncLogManager = AsyncLogManager() |
| | | asyncLogManager.info(logger_debug, "测试123") |
| | | threading.Thread(target=lambda: asyncLogManager.run_sync(), daemon=True).start() |
| | | time.sleep(1) |
| | | # info(logger_debug, "002375") |
| | | run_sync() |
New file |
| | |
| | | """ |
| | | 日志 |
| | | """ |
| | | import logging |
| | | import os |
| | | import sys |
| | | from loguru import logger |
| | | |
| | | import constant |
| | | |
| | | |
| | | class MyLogger: |
| | | def __init__(self): |
| | | logger.remove() |
| | | # 每一天生成一个日志文件,历史日志文件采用zip压缩,异步写入日志 |
| | | logger.add(self.get_path("trade", "trade_gui"), |
| | | filter=lambda record: record["extra"].get("name") == "trade_gui", |
| | | rotation="00:00", compression="zip", enqueue=True) |
| | | logger.add(self.get_path("trade", "trade"), filter=lambda record: record["extra"].get("name") == "trade", |
| | | rotation="00:00", |
| | | compression="zip", enqueue=True) |
| | | logger.add(self.get_path("trade", "delegate"), filter=lambda record: record["extra"].get("name") == "delegate", |
| | | rotation="00:00", |
| | | compression="zip", enqueue=True) |
| | | logger.add(self.get_path("l2", "l2_error"), filter=lambda record: record["extra"].get("name") == "l2_error", |
| | | rotation="00:00", compression="zip", enqueue=True) |
| | | |
| | | logger.add(self.get_path("l2", "l2_process"), filter=lambda record: record["extra"].get("name") == "l2_process", |
| | | rotation="00:00", compression="zip", enqueue=True) |
| | | logger.add(self.get_path("l2", "l2_process_time"), |
| | | filter=lambda record: record["extra"].get("name") == "l2_process_time", |
| | | rotation="00:00", compression="zip", enqueue=True) |
| | | |
| | | logger.add(self.get_path("l2", "l2_trade"), filter=lambda record: record["extra"].get("name") == "l2_trade", |
| | | rotation="00:00", compression="zip", enqueue=True) |
| | | |
| | | logger.add(self.get_path("l2", "l2_data"), filter=lambda record: record["extra"].get("name") == "l2_data", |
| | | rotation="00:00", compression="zip", enqueue=True) |
| | | |
| | | logger.add(self.get_path("l2", "l2_latest_data"), |
| | | filter=lambda record: record["extra"].get("name") == "l2_latest_data", |
| | | rotation="00:00", compression="zip", enqueue=True) |
| | | |
| | | logger.add(self.get_path("mysql", "mysql_debug"), |
| | | filter=lambda record: record["extra"].get("name") == "mysql_debug", |
| | | rotation="00:00", compression="zip", enqueue=True) |
| | | |
| | | # 显示在控制台 |
| | | logger.add(sys.stdout, |
| | | filter=lambda record: record["extra"].get("name") == "info", enqueue=True) |
| | | |
| | | logger.add(self.get_path("l2", "l2_trade_cancel"), |
| | | filter=lambda record: record["extra"].get("name") == "l2_trade_cancel", |
| | | rotation="00:00", compression="zip", enqueue=True) |
| | | |
| | | logger.add(self.get_path("l2", "cancel/s_cancel"), |
| | | filter=lambda record: record["extra"].get("name") == "s_cancel", |
| | | rotation="00:00", compression="zip", enqueue=True) |
| | | |
| | | logger.add(self.get_path("l2", "cancel/h_cancel"), |
| | | filter=lambda record: record["extra"].get("name") == "h_cancel", |
| | | rotation="00:00", compression="zip", enqueue=True) |
| | | |
| | | logger.add(self.get_path("l2", "cancel/l_cancel"), |
| | | filter=lambda record: record["extra"].get("name") == "l_cancel", |
| | | rotation="00:00", compression="zip", enqueue=True) |
| | | |
| | | logger.add(self.get_path("l2", "cancel/g_cancel"), |
| | | filter=lambda record: record["extra"].get("name") == "g_cancel", |
| | | rotation="00:00", compression="zip", enqueue=True) |
| | | |
| | | logger.add(self.get_path("l2", "cancel/d_cancel"), |
| | | filter=lambda record: record["extra"].get("name") == "d_cancel", |
| | | rotation="00:00", compression="zip", enqueue=True) |
| | | |
| | | logger.add(self.get_path("l2", "cancel/f_cancel"), |
| | | filter=lambda record: record["extra"].get("name") == "f_cancel", |
| | | rotation="00:00", compression="zip", enqueue=True) |
| | | |
| | | logger.add(self.get_path("l2", "l2_trade_buy"), |
| | | filter=lambda record: record["extra"].get("name") == "l2_trade_buy", |
| | | rotation="00:00", compression="zip", enqueue=True) |
| | | |
| | | logger.add(self.get_path("l2", "l2_big_data"), |
| | | filter=lambda record: record["extra"].get("name") == "l2_big_data", |
| | | rotation="00:00", compression="zip", enqueue=True) |
| | | |
| | | logger.add(self.get_path("l2", "l2_trade_queue"), |
| | | filter=lambda record: record["extra"].get("name") == "l2_trade_queue", |
| | | rotation="00:00", compression="zip", enqueue=True) |
| | | |
| | | logger.add(self.get_path("l2", "l2_trade_buy_queue"), |
| | | filter=lambda record: record["extra"].get("name") == "l2_trade_buy_queue", |
| | | rotation="00:00", compression="zip", enqueue=True) |
| | | |
| | | logger.add(self.get_path("l2", "l2_trade_buy_progress"), |
| | | filter=lambda record: record["extra"].get("name") == "l2_trade_buy_progress", |
| | | rotation="00:00", compression="zip", enqueue=True) |
| | | |
| | | logger.add(self.get_path("l2", "l2_real_place_order_position"), |
| | | filter=lambda record: record["extra"].get("name") == "l2_real_place_order_position", |
| | | rotation="00:00", compression="zip", enqueue=True) |
| | | |
| | | logger.add(self.get_path("l2", "l2_subscript"), |
| | | filter=lambda record: record["extra"].get("name") == "l2_codes_subscript", |
| | | rotation="00:00", compression="zip", enqueue=True) |
| | | |
| | | logger.add(self.get_path("l2", "l2_market_sell"), |
| | | filter=lambda record: record["extra"].get("name") == "l2_market_sell", |
| | | rotation="00:00", compression="zip", enqueue=True) |
| | | |
| | | logger.add(self.get_path("l2", "l2_not_buy_reasons"), |
| | | filter=lambda record: record["extra"].get("name") == "l2_not_buy_reasons", |
| | | rotation="00:00", compression="zip", enqueue=True) |
| | | |
| | | logger.add(self.get_path("juejin", "juejin_tick"), |
| | | filter=lambda record: record["extra"].get("name") == "juejin_tick", |
| | | rotation="00:00", compression="zip", enqueue=True) |
| | | |
| | | logger.add(self.get_path("juejin", "juejin_trade"), |
| | | filter=lambda record: record["extra"].get("name") == "juejin_trade", |
| | | rotation="00:00", compression="zip", enqueue=True) |
| | | |
| | | logger.add(self.get_path("juejin", "huaxin_trade"), |
| | | filter=lambda record: record["extra"].get("name") == "huaxin_trade", |
| | | rotation="00:00", compression="zip", enqueue=True) |
| | | |
| | | logger.add(self.get_path("ths", "code_operate"), |
| | | filter=lambda record: record["extra"].get("name") == "code_operate", |
| | | rotation="00:00", compression="zip", enqueue=True) |
| | | # 显示在控制台 |
| | | logger.add(sys.stdout, |
| | | filter=lambda record: record["extra"].get("name") == "print", enqueue=True) |
| | | |
| | | logger.add(self.get_path("device", "device"), filter=lambda record: record["extra"].get("name") == "device", |
| | | rotation="00:00", compression="zip", enqueue=True) |
| | | |
| | | logger.add(self.get_path("system", "system"), filter=lambda record: record["extra"].get("name") == "system", |
| | | rotation="00:00", compression="zip", enqueue=True) |
| | | |
| | | logger.add(self.get_path("ths", "buy_1_volumn"), |
| | | filter=lambda record: record["extra"].get("name") == "buy_1_volumn", |
| | | rotation="00:00", compression="zip", enqueue=True) |
| | | |
| | | logger.add(self.get_path("ths", "buy_1_volumn_record"), |
| | | filter=lambda record: record["extra"].get("name") == "buy_1_volumn_record", |
| | | rotation="00:00", compression="zip", enqueue=True) |
| | | |
| | | logger.add(self.get_path("ths", "trade_queue_price_info"), |
| | | filter=lambda record: record["extra"].get("name") == "trade_queue_price_info", |
| | | rotation="00:00", compression="zip", enqueue=True) |
| | | |
| | | logger.add(self.get_path("ths", "day_volumn"), |
| | | filter=lambda record: record["extra"].get("name") == "day_volumn", |
| | | rotation="00:00", compression="zip", enqueue=True) |
| | | |
| | | logger.add(self.get_path("ths", "buy_win_distibute"), |
| | | filter=lambda record: record["extra"].get("name") == "buy_win_distibute", |
| | | rotation="00:00", compression="zip", enqueue=True) |
| | | |
| | | logger.add(self.get_path("first_code", "first_code_record"), |
| | | filter=lambda record: record["extra"].get("name") == "first_code_record", |
| | | rotation="00:00", compression="zip", enqueue=True) |
| | | |
| | | logger.add(self.get_path("debug", "debug"), |
| | | filter=lambda record: record["extra"].get("name") == "debug", |
| | | rotation="00:00", compression="zip", enqueue=True) |
| | | |
| | | logger.add(self.get_path("debug", "request_api"), |
| | | filter=lambda record: record["extra"].get("name") == "request_api", |
| | | rotation="00:00", compression="zip", enqueue=True) |
| | | |
| | | logger.add(self.get_path("trade", "trade_record"), |
| | | filter=lambda record: record["extra"].get("name") == "trade_record", |
| | | rotation="00:00", compression="zip", enqueue=True) |
| | | |
| | | logger.add(self.get_path("trade", "position_api_request"), |
| | | filter=lambda record: record["extra"].get("name") == "position_api_request", |
| | | rotation="00:00", compression="zip", enqueue=True) |
| | | |
| | | logger.add(self.get_path("score", "place_order_score"), |
| | | filter=lambda record: record["extra"].get("name") == "place_order_score", |
| | | rotation="00:00", compression="zip", enqueue=True) |
| | | |
| | | logger.add(self.get_path("kpl", "kpl_limit_up_reason_change"), |
| | | filter=lambda record: record["extra"].get("name") == "kpl_limit_up_reason_change", |
| | | rotation="00:00", compression="zip", enqueue=True) |
| | | |
| | | logger.add(self.get_path("kpl", "kpl_limit_up"), |
| | | filter=lambda record: record["extra"].get("name") == "kpl_limit_up", |
| | | rotation="00:00", compression="zip", enqueue=True) |
| | | |
| | | logger.add(self.get_path("kpl", "kpl_debug"), |
| | | filter=lambda record: record["extra"].get("name") == "kpl_debug", |
| | | rotation="00:00", compression="zip", enqueue=True) |
| | | |
| | | logger.add(self.get_path("kpl", "kpl_block_can_buy"), |
| | | filter=lambda record: record["extra"].get("name") == "kpl_block_can_buy", |
| | | rotation="00:00", compression="zip", enqueue=True) |
| | | |
| | | logger.add(self.get_path("kpl", "kpl_open_limit_up"), |
| | | filter=lambda record: record["extra"].get("name") == "kpl_open_limit_up", |
| | | rotation="00:00", compression="zip", enqueue=True) |
| | | |
| | | # 看盘日志 |
| | | logger.add(self.get_path("kp", "kp_msg"), |
| | | filter=lambda record: record["extra"].get("name") == "kp_msg", |
| | | rotation="00:00", compression="zip", enqueue=True) |
| | | |
| | | logger.add(self.get_path("redis", "redis_debug"), |
| | | filter=lambda record: record["extra"].get("name") == "redis_debug", |
| | | rotation="00:00", compression="zip", enqueue=True) |
| | | |
| | | logger.add(self.get_path("profile", "profile"), |
| | | filter=lambda record: record["extra"].get("name") == "profile", |
| | | rotation="00:00", compression="zip", enqueue=True) |
| | | |
| | | ################################华鑫日志################################ |
| | | logger.add(self.get_hx_path("l2", "transaction"), |
| | | filter=lambda record: record["extra"].get("name") == "hx_l2_transaction", |
| | | rotation="00:00", compression="zip", enqueue=True) |
| | | |
| | | logger.add(self.get_hx_path("l2", "transaction_sell_order"), |
| | | filter=lambda record: record["extra"].get("name") == "hx_l2_transaction_sell_order", |
| | | rotation="00:00", compression="zip", enqueue=True) |
| | | |
| | | logger.add(self.get_hx_path("l2", "transaction_desc"), |
| | | filter=lambda record: record["extra"].get("name") == "hx_l2_transaction_desc", |
| | | rotation="00:00", compression="zip", enqueue=True) |
| | | logger.add(self.get_hx_path("l2", "orderdetail"), |
| | | filter=lambda record: record["extra"].get("name") == "hx_l2_orderdetail", |
| | | rotation="00:00", compression="zip", enqueue=True) |
| | | logger.add(self.get_hx_path("l2", "marketdata"), |
| | | filter=lambda record: record["extra"].get("name") == "hx_l2_market_data", |
| | | rotation="00:00", compression="zip", enqueue=True) |
| | | logger.add(self.get_hx_path("l2", "upload"), |
| | | filter=lambda record: record["extra"].get("name") == "hx_l2_upload", |
| | | rotation="00:00", compression="zip", enqueue=True) |
| | | logger.add(self.get_hx_path("l2", "debug"), |
| | | filter=lambda record: record["extra"].get("name") == "hx_l2_debug", |
| | | rotation="00:00", compression="zip", enqueue=True) |
| | | |
| | | logger.add(self.get_hx_path("contact", "debug"), |
| | | filter=lambda record: record["extra"].get("name") == "hx_contact_debug", |
| | | rotation="00:00", compression="zip", enqueue=True) |
| | | logger.add(self.get_hx_path("trade", "trade_callback"), |
| | | filter=lambda record: record["extra"].get("name") == "hx_trade_callback", |
| | | rotation="00:00", compression="zip", enqueue=True) |
| | | logger.add(self.get_hx_path("trade", "debug"), |
| | | filter=lambda record: record["extra"].get("name") == "hx_trade_debug", |
| | | rotation="00:00", compression="zip", enqueue=True) |
| | | logger.add(self.get_hx_path("trade", "trade_loop"), |
| | | filter=lambda record: record["extra"].get("name") == "hx_trade_loop", |
| | | rotation="00:00", compression="zip", enqueue=True) |
| | | |
| | | logger.add(self.get_local_huaxin_path("l2", "transaction"), |
| | | filter=lambda record: record["extra"].get("name") == "local_huaxin_transaction", |
| | | rotation="00:00", compression="zip", enqueue=True) |
| | | logger.add(self.get_local_huaxin_path("l2", "orderdetail"), |
| | | filter=lambda record: record["extra"].get("name") == "local_huaxin_orderdetail", |
| | | rotation="00:00", compression="zip", enqueue=True) |
| | | logger.add(self.get_local_huaxin_path("l2", "upload"), |
| | | filter=lambda record: record["extra"].get("name") == "local_huaxin_upload", |
| | | rotation="00:00", compression="zip", enqueue=True) |
| | | |
| | | logger.add(self.get_local_huaxin_path("l2", "error"), |
| | | filter=lambda record: record["extra"].get("name") == "local_huaxin_error", |
| | | rotation="00:00", compression="zip", enqueue=True) |
| | | |
| | | logger.add(self.get_local_huaxin_path("l2", "subscript"), |
| | | filter=lambda record: record["extra"].get("name") == "local_huaxin_subscript", |
| | | rotation="00:00", compression="zip", enqueue=True) |
| | | |
| | | logger.add(self.get_local_huaxin_path("l2", "market"), |
| | | filter=lambda record: record["extra"].get("name") == "local_huaxin_l2_market", |
| | | rotation="00:00", compression="zip", enqueue=True) |
| | | |
| | | logger.add(self.get_local_huaxin_path("contact", "debug"), |
| | | filter=lambda record: record["extra"].get("name") == "local_huaxin_debug", |
| | | rotation="00:00", compression="zip", enqueue=True) |
| | | |
| | | logger.add(self.get_local_huaxin_path("trade", "trade_debug"), |
| | | filter=lambda record: record["extra"].get("name") == "local_huaxin_trade_debug", |
| | | rotation="00:00", compression="zip", enqueue=True) |
| | | logger.add(sys.stdout, |
| | | filter=lambda record: record["extra"].get("name") == "local_huaxin_l1_show_info") |
| | | logger.add(self.get_local_huaxin_path("l1", "show_info"), |
| | | filter=lambda record: record["extra"].get("name") == "local_huaxin_l1_show_info", |
| | | rotation="00:00", compression="zip", enqueue=True) |
| | | |
| | | logger.add(self.get_local_huaxin_path("l1", "l1_for_trade"), |
| | | filter=lambda record: record["extra"].get("name") == "local_huaxin_l1_trade_info", |
| | | rotation="00:00", compression="zip", enqueue=True) |
| | | |
| | | logger.add(self.get_local_huaxin_path("l2", "g_cancel"), |
| | | filter=lambda record: record["extra"].get("name") == "local_huaxin_g_cancel", |
| | | rotation="00:00", compression="zip", enqueue=True) |
| | | |
| | | logger.add(self.get_local_huaxin_path("l2", "special_volume"), |
| | | filter=lambda record: record["extra"].get("name") == "local_huaxin_l2_special_volume", |
| | | rotation="00:00", compression="zip", enqueue=True) |
| | | |
| | | logger.add(self.get_local_huaxin_path("l2", "l2_buy_no"), |
| | | filter=lambda record: record["extra"].get("name") == "local_huaxin_l2_buy_no", |
| | | rotation="00:00", compression="zip", enqueue=True) |
| | | |
| | | logger.add(self.get_path("request", "request_debug"), |
| | | filter=lambda record: record["extra"].get("name") == "request_debug", |
| | | rotation="00:00", compression="zip", enqueue=True) |
| | | |
| | | logger.add(self.get_path("request", "tuoguan_request_debug"), |
| | | filter=lambda record: record["extra"].get("name") == "tuoguan_request_debug", |
| | | rotation="00:00", compression="zip", enqueue=True) |
| | | |
| | | def get_path(self, dir_name, log_name): |
| | | path_str = "{}/{}/gp/{}/{}".format(constant.get_path_prefix(), constant.LOG_DIR, dir_name, |
| | | log_name) + ".{time:YYYY-MM-DD}.log" |
| | | # print(path_str) |
| | | return path_str |
| | | |
| | | def get_hx_path(self, dir_name, log_name): |
| | | path_str = "{}/{}/huaxin/{}/{}".format(constant.get_path_prefix(), constant.LOG_DIR, dir_name, |
| | | log_name) + ".{time:YYYY-MM-DD}.log" |
| | | # print(path_str) |
| | | return path_str |
| | | |
| | | def get_local_huaxin_path(self, dir_name, log_name): |
| | | path_str = "{}/{}/huaxin_local/{}/{}".format(constant.get_path_prefix(), constant.LOG_DIR, dir_name, |
| | | log_name) + ".{time:YYYY-MM-DD}.log" |
| | | # print(path_str) |
| | | return path_str |
| | | |
| | | def get_logger(self, log_name): |
| | | return logger.bind(name=log_name) |
| | | |
| | | |
| | | __mylogger = MyLogger() |
| | | |
| | | logger_print = __mylogger.get_logger("print") |
| | | |
| | | logger_info = __mylogger.get_logger("info") |
| | | |
| | | logger_trade_gui = __mylogger.get_logger("trade_gui") |
| | | logger_trade = __mylogger.get_logger("trade") |
| | | logger_trade_delegate = __mylogger.get_logger("delegate") |
| | | logger_l2_error = __mylogger.get_logger("l2_error") |
| | | logger_l2_process = __mylogger.get_logger("l2_process") |
| | | logger_l2_process_time = __mylogger.get_logger("l2_process_time") |
| | | logger_l2_data = __mylogger.get_logger("l2_data") |
| | | logger_l2_latest_data = __mylogger.get_logger("l2_latest_data") |
| | | logger_l2_market_sell = __mylogger.get_logger("l2_market_sell") |
| | | logger_l2_not_buy_reasons = __mylogger.get_logger("l2_not_buy_reasons") |
| | | |
| | | logger_l2_trade = __mylogger.get_logger("l2_trade") |
| | | logger_l2_trade_cancel = __mylogger.get_logger("l2_trade_cancel") |
| | | logger_l2_s_cancel = __mylogger.get_logger("s_cancel") |
| | | logger_l2_h_cancel = __mylogger.get_logger("h_cancel") |
| | | logger_l2_d_cancel = __mylogger.get_logger("d_cancel") |
| | | logger_l2_f_cancel = __mylogger.get_logger("f_cancel") |
| | | logger_l2_l_cancel = __mylogger.get_logger("l_cancel") |
| | | logger_l2_g_cancel = __mylogger.get_logger("g_cancel") |
| | | logger_l2_trade_buy = __mylogger.get_logger("l2_trade_buy") |
| | | logger_l2_trade_queue = __mylogger.get_logger("l2_trade_queue") |
| | | logger_l2_trade_buy_queue = __mylogger.get_logger("l2_trade_buy_queue") |
| | | logger_l2_trade_buy_progress = __mylogger.get_logger("l2_trade_buy_progress") |
| | | logger_real_place_order_position = __mylogger.get_logger("l2_real_place_order_position") |
| | | # 代码订阅日志 |
| | | logger_l2_codes_subscript = __mylogger.get_logger("l2_codes_subscript") |
| | | |
| | | logger_l2_big_data = __mylogger.get_logger("l2_big_data") |
| | | logger_juejin_tick = __mylogger.get_logger("juejin_tick") |
| | | logger_juejin_trade = __mylogger.get_logger("juejin_trade") |
| | | logger_huaxin_trade = __mylogger.get_logger("huaxin_trade") |
| | | logger_code_operate = __mylogger.get_logger("code_operate") |
| | | logger_device = __mylogger.get_logger("device") |
| | | logger_system = __mylogger.get_logger("system") |
| | | |
| | | logger_buy_1_volumn = __mylogger.get_logger("buy_1_volumn") |
| | | |
| | | logger_buy_1_volumn_record = __mylogger.get_logger("buy_1_volumn_record") |
| | | |
| | | logger_trade_queue_price_info = __mylogger.get_logger("trade_queue_price_info") |
| | | |
| | | logger_day_volumn = __mylogger.get_logger("day_volumn") |
| | | |
| | | logger_buy_win_distibute = __mylogger.get_logger("buy_win_distibute") |
| | | |
| | | logger_first_code_record = __mylogger.get_logger("first_code_record") |
| | | |
| | | logger_debug = __mylogger.get_logger("debug") |
| | | |
| | | logger_request_api = __mylogger.get_logger("request_api") |
| | | |
| | | logger_trade_record = __mylogger.get_logger("trade_record") |
| | | |
| | | logger_trade_position_api_request = __mylogger.get_logger("position_api_request") |
| | | |
| | | logger_place_order_score = __mylogger.get_logger("place_order_score") |
| | | |
| | | logger_kpl_limit_up_reason_change = __mylogger.get_logger("kpl_limit_up_reason_change") |
| | | |
| | | logger_kpl_limit_up = __mylogger.get_logger("kpl_limit_up") |
| | | |
| | | logger_kpl_debug = __mylogger.get_logger("kpl_debug") |
| | | |
| | | logger_kpl_block_can_buy = __mylogger.get_logger("kpl_block_can_buy") |
| | | |
| | | logger_kpl_open_limit_up = __mylogger.get_logger("kpl_open_limit_up") |
| | | |
| | | logger_kp_msg = __mylogger.get_logger("kp_msg") |
| | | |
| | | logger_redis_debug = __mylogger.get_logger("redis_debug") |
| | | |
| | | logger_profile = __mylogger.get_logger("profile") |
| | | |
| | | logger_mysql_debug = __mylogger.get_logger("mysql_debug") |
| | | |
| | | # -------------------------------华鑫日志--------------------------------- |
| | | hx_logger_l2_orderdetail = __mylogger.get_logger("hx_l2_orderdetail") |
| | | hx_logger_l2_transaction = __mylogger.get_logger("hx_l2_transaction") |
| | | hx_logger_l2_transaction_sell_order = __mylogger.get_logger("hx_l2_transaction_sell_order") |
| | | hx_logger_l2_transaction_desc = __mylogger.get_logger("hx_l2_transaction_desc") |
| | | hx_logger_l2_market_data = __mylogger.get_logger("hx_l2_market_data") |
| | | hx_logger_l2_upload = __mylogger.get_logger("hx_l2_upload") |
| | | hx_logger_l2_debug = __mylogger.get_logger("hx_l2_debug") |
| | | hx_logger_contact_debug = __mylogger.get_logger("hx_contact_debug") |
| | | hx_logger_trade_callback = __mylogger.get_logger("hx_trade_callback") |
| | | hx_logger_trade_debug = __mylogger.get_logger("hx_trade_debug") |
| | | hx_logger_trade_loop = __mylogger.get_logger("hx_trade_loop") |
| | | |
| | | # -------------------------------华鑫本地日志--------------------------------- |
| | | logger_local_huaxin_l2_transaction = __mylogger.get_logger("local_huaxin_transaction") |
| | | logger_local_huaxin_l2_orderdetail = __mylogger.get_logger("local_huaxin_orderdetail") |
| | | logger_local_huaxin_l2_upload = __mylogger.get_logger("local_huaxin_upload") |
| | | logger_local_huaxin_l2_error = __mylogger.get_logger("local_huaxin_error") |
| | | logger_local_huaxin_l2_subscript = __mylogger.get_logger("local_huaxin_subscript") |
| | | logger_local_huaxin_l2_market = __mylogger.get_logger("local_huaxin_l2_market") |
| | | logger_local_huaxin_contact_debug = __mylogger.get_logger("local_huaxin_debug") |
| | | logger_local_huaxin_trade_debug = __mylogger.get_logger("local_huaxin_trade_debug") |
| | | logger_local_huaxin_l1 = __mylogger.get_logger("local_huaxin_l1_show_info") |
| | | logger_local_huaxin_g_cancel = __mylogger.get_logger("local_huaxin_g_cancel") |
| | | logger_local_huaxin_l2_buy_no = __mylogger.get_logger("local_huaxin_l2_buy_no") |
| | | logger_local_huaxin_l1_trade_info = __mylogger.get_logger("local_huaxin_l1_trade_info") |
| | | logger_local_huaxin_l2_special_volume = __mylogger.get_logger("local_huaxin_l2_special_volume") |
| | | |
| | | logger_request_debug = __mylogger.get_logger("request_debug") |
| | | |
| | | logger_tuoguan_request_debug = __mylogger.get_logger("tuoguan_request_debug") |
| | | |
| | | |
| | | def close_print(): |
| | | logging.basicConfig(level=logging.ERROR) |
| | | if not constant.is_windows(): |
| | | os.close(1) |
| | | os.open('/dev/null', os.O_WRONLY) |
| | | |
| | | |
| | | def printlog(*args): |
| | | logger_print.info(args) |
| | | |
| | | |
| | | if __name__ == "__main__": |
| | | open_limit_up_codes = set({"000333", "000222"}) |
| | | logger_kpl_open_limit_up.info(f"炸板代码:{open_limit_up_codes}") |
New file |
| | |
| | | from log_module import async_log_util |
| | | from log_module.log import logger_request_debug |
| | | from utils import tool |
| | | |
| | | |
| | | def request_info(type_name, content, thread_id=None): |
| | | if not thread_id: |
| | | thread_id = tool.get_thread_id() |
| | | async_log_util.info(logger_request_debug, f"【{thread_id}】【{type_name}】 {content}") |
| | | |
| | |
| | | import multiprocessing |
| | | import threading |
| | | |
| | | import constant |
| | | import data_server |
| | | import log |
| | | import middle_api_server |
| | | import middle_cb_api_server |
| | | import middle_server |
| | | # from huaxin_client import huaxin_trade_client |
| | | # from trade import huaxin_trade_api |
| | | # from huaxin_client import huaxin_trade_client |
| | | from log_module import async_log_util |
| | | # from trade import huaxin_trade_api |
| | | |
| | | if __name__ == "__main__": |
| | | t1 = threading.Thread(target=lambda: middle_api_server.run(), daemon=True) |
| | | t1.start() |
| | | t1 = threading.Thread(target=lambda: data_server.run("0.0.0.0", constant.DATA_SERVER_PORT), daemon=True) |
| | | t1.start() |
| | | t1 = threading.Thread(target=lambda: log.async_log_util.run_sync(), daemon=True) |
| | | t1 = threading.Thread(target=lambda: async_log_util.run_sync(), daemon=True) |
| | | t1.start() |
| | | t1 = threading.Thread(target=lambda: middle_server.run(12880), daemon=True) |
| | | t1.start() |
| | |
| | | t1.start() |
| | | # t1 = threading.Thread(target=lambda: middle_l1_data_server.run(12881), daemon=True) |
| | | # t1.start() |
| | | |
| | | # 运行仿真交易 |
| | | # queue_strategy_r_trade_w, queue_strategy_w_trade_r, queue_strategy_w_trade_r_for_query = multiprocessing.Queue(), multiprocessing.Queue(), multiprocessing.Queue() |
| | | # huaxin_trade_api.run_trade(queue_strategy_r_trade_w, queue_strategy_w_trade_r, queue_strategy_w_trade_r_for_query) |
| | | |
| | | # ===========运行交易端========== |
| | | # tradeProcess = multiprocessing.Process( |
| | | # target=huaxin_trade_client.run, |
| | | # args=(queue_strategy_w_trade_r, queue_strategy_w_trade_r_for_query, queue_strategy_r_trade_w,)) |
| | | # tradeProcess.start() |
| | | |
| | | middle_server.run() |
| | |
| | | import time |
| | | |
| | | import constant |
| | | import log |
| | | import socket_manager |
| | | import trade_manager |
| | | from db import mysql_data, redis_manager |
| | | from db.redis_manager import RedisUtils |
| | | from log import logger_request_debug |
| | | from log_module import log, request_log_util |
| | | from log_module.log import logger_request_debug |
| | | from middle_l1_data_server import L1DataManager |
| | | from output import push_msg_manager |
| | | from utils import socket_util, hosting_api_util, huaxin_trade_record_manager, huaxin_util, tool, global_data_cache_util |
| | |
| | | data_json = json.loads(data_str) |
| | | type_ = data_json['type'] |
| | | try: |
| | | log.request_info("middle_api_server", f"请求开始:{type_}") |
| | | request_log_util.request_info("middle_api_server", f"请求开始:{type_}") |
| | | if type(type_) == int: |
| | | # 处理数字型TYPE |
| | | return_str = self.process_num_type(sk, type_, data_str) |
| | |
| | | results = L1DataManager.get_current_l1_data() |
| | | return_str = json.dumps({"code": 0, "data": results}) |
| | | finally: |
| | | log.request_info("middle_api_server", f"请求结束:{type_}") |
| | | request_log_util.request_info("middle_api_server", f"请求结束:{type_}") |
| | | break |
| | | # sk.close() |
| | | except Exception as e: |
| | |
| | | import logging |
| | | import socket |
| | | import socketserver |
| | | import log |
| | | from log import logger_request_debug |
| | | |
| | | from log_module import log, request_log_util |
| | | from log_module.log import logger_request_debug |
| | | from utils import socket_util, hosting_api_util |
| | | |
| | | """ |
| | |
| | | data_json = json.loads(data_str) |
| | | type_ = data_json['type'] |
| | | try: |
| | | log.request_info("middle_cb_api_server", f"请求开始:{type_}") |
| | | request_log_util.request_info("middle_cb_api_server", f"请求开始:{type_}") |
| | | is_sign_right = socket_util.is_client_params_sign_right(data_json) |
| | | # ------客户端请求接口------- |
| | | if type_ == 'buy': |
| | |
| | | return_str = json.dumps(result) |
| | | break |
| | | finally: |
| | | log.request_info("middle_cb_api_server", f"请求结束:{type_}") |
| | | request_log_util.request_info("middle_cb_api_server", f"请求结束:{type_}") |
| | | break |
| | | # sk.close() |
| | | except Exception as e: |
| | |
| | | import json |
| | | import logging |
| | | import queue |
| | | import random |
| | | import socket |
| | | import socketserver |
| | | import threading |
| | | import time |
| | | |
| | | import constant |
| | | import log |
| | | import socket_manager |
| | | from db import mysql_data |
| | | from db.redis_manager import RedisUtils, RedisManager |
| | | from log import logger_debug, logger_request_debug |
| | | from output import push_msg_manager |
| | | from utils import socket_util, kpl_api_util, hosting_api_util, kp_client_msg_manager, global_data_cache_util, tool |
| | | from utils.juejin_util import JueJinHttpApi |
| | | |
| | | from log_module import log |
| | | from utils import socket_util |
| | | |
| | | trade_data_request_queue = queue.Queue() |
| | | |
| | |
| | | import builtins |
| | | import copy |
| | | import hashlib |
| | | import json |
| | | import logging |
| | |
| | | import time |
| | | |
| | | import constant |
| | | import log |
| | | import socket_manager |
| | | from db import mysql_data |
| | | from db.redis_manager import RedisUtils, RedisManager |
| | | from log import logger_debug, logger_request_debug |
| | | from log_module import log |
| | | from log_module.log import logger_debug |
| | | from middle_l1_data_server import L1DataManager |
| | | from output import push_msg_manager |
| | | from trade import huaxin_trade_api |
| | | from utils import socket_util, kpl_api_util, hosting_api_util, kp_client_msg_manager, global_data_cache_util, tool |
| | | from utils.juejin_util import JueJinHttpApi |
| | | |
| | | trade_data_request_queue = queue.Queue() |
| | | |
| | | __mysql_config_dict = {} |
| | | |
| | | |
| | | def get_mysql_config(db_name): |
| | | """ |
| | | 获取mysql的配置 |
| | | :param db_name: |
| | | :return: |
| | | """ |
| | | if db_name in __mysql_config_dict: |
| | | return __mysql_config_dict.get(db_name) |
| | | config = copy.deepcopy(constant.MYSQL_CONFIG) |
| | | config["database"] = db_name |
| | | __mysql_config_dict[db_name] = config |
| | | return config |
| | | |
| | | |
| | | class MyTCPServer(socketserver.TCPServer): |
| | |
| | | db = data["db"] |
| | | cmd = data["cmd"] |
| | | args = data.get("args") |
| | | mysql = mysql_data.Mysqldb() |
| | | mysql_config = get_mysql_config(db) |
| | | mysql = mysql_data.Mysqldb(mysql_config) |
| | | method = getattr(mysql, cmd) |
| | | args_ = [] |
| | | if args: |
| | |
| | | datas = data_json["data"] |
| | | L1DataManager.add_datas(datas) |
| | | break |
| | | elif data_json["type"] == "simulation_trade": |
| | | datas = data_json["data"] |
| | | ctype = datas["ctype"] |
| | | data = datas["data"] |
| | | result = huaxin_trade_api.request(ctype,data) |
| | | result_str = json.dumps({"code": 0, "data": result}, default=str) |
| | | sk.sendall(socket_util.load_header(result_str.encode(encoding='utf-8'))) |
| | | |
| | | |
| | | except Exception as e: |
| | | log.logger_tuoguan_request_debug.exception(e) |
| | | finally: |
| | | if time.time() - __start_time > 2: |
| | | log.logger_tuoguan_request_debug.info(f"耗时:{int(time.time() - __start_time)}s 数据:{data_json}") |
| | | log.logger_tuoguan_request_debug.info( |
| | | f"耗时:{int(time.time() - __start_time)}s 数据:{data_json}") |
| | | else: |
| | | # 断开连接 |
| | | break |
| | |
| | | pass |
| | | |
| | | |
| | | def run(port = constant.MIDDLE_SERVER_PORT): |
| | | def run(port=constant.MIDDLE_SERVER_PORT): |
| | | print("create MiddleServer") |
| | | t1 = threading.Thread(target=lambda: clear_invalid_client(), daemon=True) |
| | | t1.start() |
| | |
| | | |
| | | |
| | | if __name__ == "__main__": |
| | | print(builtins.type("")==str) |
| | | pass |
| | |
| | | """ |
| | | import json |
| | | |
| | | from log import logger_debug |
| | | from log_module.log import logger_debug |
| | | from utils import socket_util |
| | | |
| | | TYPE_ORDER_ALMOST_DEAL = "order_almost_deal" # 订单即将成交 |
| | |
| | | |
| | | @classmethod |
| | | def add_client(cls, _type, rid, sk): |
| | | if _type == cls.CLIENT_TYPE_COMMON or _type == cls.CLIENT_TYPE_TRADE or _type == cls.CLIENT_TYPE_TRADE_SELL: |
| | | if _type == cls.CLIENT_TYPE_COMMON or _type == cls.CLIENT_TYPE_TRADE or _type == cls.CLIENT_TYPE_TRADE_SELL or _type == cls.CLIENT_TYPE_TRADE_CB: |
| | | # 交易列表 |
| | | if _type not in cls.socket_client_dict: |
| | | cls.socket_client_dict[_type] = [] |
| | |
| | | |
| | | @classmethod |
| | | def acquire_client(cls, _type): |
| | | if _type == cls.CLIENT_TYPE_COMMON or _type == cls.CLIENT_TYPE_TRADE or _type == cls.CLIENT_TYPE_TRADE_SELL: |
| | | if _type == cls.CLIENT_TYPE_COMMON or _type == cls.CLIENT_TYPE_TRADE or _type == cls.CLIENT_TYPE_TRADE_SELL or _type == cls.CLIENT_TYPE_TRADE_CB: |
| | | if _type in cls.socket_client_dict: |
| | | # 根据排序活跃时间排序 |
| | | client_list = sorted(cls.socket_client_dict[_type], key=lambda x: cls.active_client_dict.get(x[0]) if x[ |
| | |
| | | |
| | | import constant |
| | | from db.redis_manager import RedisUtils |
| | | from log import logger_kpl_limit_up_reason_change |
| | | from log_module.log import logger_kpl_limit_up_reason_change |
| | | from utils import tool |
| | | |
| | | # 开盘啦历史涨停数据管理 |
New file |
| | |
| | | """ |
| | | 交易API |
| | | """ |
| | | import json |
| | | import logging |
| | | import multiprocessing |
| | | import queue |
| | | import random |
| | | import threading |
| | | import time |
| | | import concurrent.futures |
| | | |
| | | from utils import socket_util, tool |
| | | |
| | | __response_thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=15) |
| | | __save_data_queue = queue.Queue() |
| | | |
| | | |
| | | def __run_recv_queue_trade(queue: multiprocessing.Queue): |
| | | # 设置结果 |
| | | def __set_response(data_json): |
| | | if 'request_id' not in data_json: |
| | | return |
| | | # 设置响应内容 |
| | | set_response(data_json["request_id"], data_json['data']) |
| | | |
| | | if queue is not None: |
| | | while True: |
| | | try: |
| | | val = queue.get() |
| | | if val: |
| | | data_json = json.loads(val) |
| | | # 处理数据 |
| | | type_ = data_json["type"] |
| | | if type_ == "response": |
| | | # 主动触发的响应 |
| | | request_id = data_json['request_id'] |
| | | |
| | | __response_thread_pool.submit(__set_response, data_json) |
| | | elif type_ == "trade_callback": |
| | | try: |
| | | # 交易回调 |
| | | data_json = data_json["data"] |
| | | ctype = data_json["type"] |
| | | # 记录交易反馈日志 |
| | | finally: |
| | | pass |
| | | except: |
| | | pass |
| | | |
| | | |
| | | # 设置交易通信队列 |
| | | # 暂时不会使用该方法 |
| | | def run_trade(queue_strategy_r_trade_w_, queue_strategy_w_trade_r_, queue_strategy_w_trade_r_for_query_): |
| | | """ |
| | | |
| | | :param queue_strategy_r_trade_w_: 接收交易结果数据队列 |
| | | :param queue_strategy_w_trade_r_: 发送交易指令队列 |
| | | :param queue_strategy_w_trade_r_for_query_:发送查询的交易指令队列 |
| | | :param trade_ipc_addr: |
| | | :return: |
| | | """ |
| | | global queue_strategy_w_trade_r, queue_strategy_w_trade_r_for_query |
| | | queue_strategy_w_trade_r = queue_strategy_w_trade_r_ |
| | | queue_strategy_w_trade_r_for_query = queue_strategy_w_trade_r_for_query_ |
| | | |
| | | # 读取交易结果 |
| | | threading.Thread(target=lambda: __run_recv_queue_trade(queue_strategy_r_trade_w_), daemon=True).start() |
| | | |
| | | |
| | | # 交易通道的错误次数 |
| | | trade_pipe_channel_error_count = 0 |
| | | |
| | | |
| | | # pipe的交易通道是否正常 |
| | | def is_pipe_channel_normal(): |
| | | return True |
| | | |
| | | |
| | | # 测试交易通道 |
| | | def test_trade_channel(): |
| | | global trade_pipe_channel_error_count |
| | | sid = random.randint(0, 1000000) |
| | | result = __test_trade_channel(sid) |
| | | if result["code"] == 0 and result["data"]["data"]["sid"] == sid: |
| | | trade_pipe_channel_error_count = 0 |
| | | return True |
| | | trade_pipe_channel_error_count += 1 |
| | | if trade_pipe_channel_error_count > 100: |
| | | trade_pipe_channel_error_count = 100 |
| | | return False |
| | | |
| | | |
| | | class ClientSocketManager: |
| | | # 客户端类型 |
| | | CLIENT_TYPE_TRADE = "trade" |
| | | CLIENT_TYPE_DELEGATE_LIST = "delegate_list" |
| | | CLIENT_TYPE_DEAL_LIST = "deal_list" |
| | | CLIENT_TYPE_POSITION_LIST = "position_list" |
| | | CLIENT_TYPE_MONEY = "money" |
| | | CLIENT_TYPE_DEAL = "deal" |
| | | CLIENT_TYPE_CMD_L2 = "l2_cmd" |
| | | socket_client_dict = {} |
| | | socket_client_lock_dict = {} |
| | | active_client_dict = {} |
| | | |
| | | @classmethod |
| | | def list_client(cls, _type): |
| | | if _type == cls.CLIENT_TYPE_TRADE: |
| | | if _type in cls.socket_client_dict: |
| | | return cls.socket_client_dict.get(_type) |
| | | else: |
| | | if _type in cls.socket_client_dict: |
| | | return [cls.socket_client_dict.get(_type)] |
| | | return [] |
| | | |
| | | @classmethod |
| | | def add_client(cls, _type, rid, sk): |
| | | if _type == cls.CLIENT_TYPE_TRADE: |
| | | # 交易列表 |
| | | if _type not in cls.socket_client_dict: |
| | | cls.socket_client_dict[_type] = [] |
| | | cls.socket_client_dict[_type].append((rid, sk)) |
| | | cls.socket_client_lock_dict[rid] = threading.Lock() |
| | | else: |
| | | cls.socket_client_dict[_type] = (rid, sk) |
| | | cls.socket_client_lock_dict[rid] = threading.Lock() |
| | | |
| | | # 是否已经被锁住 |
| | | @classmethod |
| | | def is_client_locked(cls, rid): |
| | | if rid in cls.socket_client_lock_dict: |
| | | return cls.socket_client_lock_dict[rid].locked() |
| | | return None |
| | | |
| | | @classmethod |
| | | def acquire_client(cls, _type): |
| | | if _type == cls.CLIENT_TYPE_TRADE: |
| | | if _type in cls.socket_client_dict: |
| | | # 根据排序活跃时间排序 |
| | | client_list = sorted(cls.socket_client_dict[_type], key=lambda x: cls.active_client_dict.get(x[0]) if x[ |
| | | 0] in cls.active_client_dict else 0, |
| | | reverse=True) |
| | | |
| | | for d in client_list: |
| | | if d[0] in cls.socket_client_lock_dict: |
| | | try: |
| | | if cls.socket_client_lock_dict[d[0]].acquire(blocking=False): |
| | | |
| | | return d |
| | | except threading.TimeoutError: |
| | | pass |
| | | else: |
| | | if _type in cls.socket_client_dict: |
| | | try: |
| | | d = cls.socket_client_dict[_type] |
| | | if d[0] in cls.socket_client_lock_dict: |
| | | if cls.socket_client_lock_dict[d[0]].acquire(blocking=False): |
| | | return d |
| | | except threading.TimeoutError: |
| | | pass |
| | | return None |
| | | |
| | | @classmethod |
| | | def release_client(cls, client_id): |
| | | sucess = False |
| | | if client_id in cls.socket_client_lock_dict: |
| | | sucess = True |
| | | # 释放锁 |
| | | if cls.socket_client_lock_dict[client_id].locked(): |
| | | cls.socket_client_lock_dict[client_id].release() |
| | | |
| | | @classmethod |
| | | def del_client(cls, rid): |
| | | # 删除线程锁 |
| | | if rid in cls.socket_client_lock_dict: |
| | | cls.socket_client_lock_dict.pop(rid) |
| | | # 删除sk |
| | | for t in cls.socket_client_dict: |
| | | if type(cls.socket_client_dict[t]) == list: |
| | | for d in cls.socket_client_dict[t]: |
| | | if d[0] == rid: |
| | | try: |
| | | # 关闭socket |
| | | d[1].close() |
| | | except: |
| | | pass |
| | | cls.socket_client_dict[t].remove(d) |
| | | break |
| | | |
| | | elif type(cls.socket_client_dict[t]) == tuple: |
| | | if cls.socket_client_dict[t][0] == rid: |
| | | try: |
| | | # 关闭socket |
| | | cls.socket_client_dict[t][1].close() |
| | | except: |
| | | pass |
| | | cls.socket_client_dict.pop(t) |
| | | break |
| | | |
| | | # 心跳信息 |
| | | @classmethod |
| | | def heart(cls, rid): |
| | | cls.active_client_dict[rid] = time.time() |
| | | |
| | | @classmethod |
| | | def del_invalid_clients(cls): |
| | | # 清除长时间无心跳的客户端通道 |
| | | for k in cls.active_client_dict.keys(): |
| | | if time.time() - cls.active_client_dict[k] > 20: |
| | | # 心跳时间间隔20s以上视为无效 |
| | | cls.del_client(k) |
| | | |
| | | |
| | | TRADE_DIRECTION_BUY = 1 |
| | | TRADE_DIRECTION_SELL = 2 |
| | | |
| | | # 超时时间2s |
| | | TIMEOUT = 2.0 |
| | | # 交易代理 |
| | | TRADE_DELEGATED = True |
| | | |
| | | # 等待响应的request_id |
| | | __request_response_dict = {} |
| | | |
| | | |
| | | def __get_request_id(type): |
| | | return f"r_{type}_{round(time.time() * 10000)}_{random.randint(0, 100000)}" |
| | | |
| | | |
| | | # 网络请求 |
| | | def __request(_type, data, request_id=None, log_enable=True, is_trade=False): |
| | | """ |
| | | 请求,将交易(包含下单/撤单)与查询(包含查持仓/账户可用金额/委托列表/成交列表)队列分离 |
| | | @param _type: |
| | | @param data: |
| | | @param request_id: |
| | | @param log_enable: |
| | | @param is_trade: |
| | | @return: |
| | | """ |
| | | if not request_id: |
| | | request_id = __get_request_id(_type) |
| | | try: |
| | | root_data = {"type": _type, |
| | | "data": data, |
| | | "request_id": request_id, |
| | | "time": time.time() |
| | | } |
| | | root_data = socket_util.encryp_client_params_sign(root_data) |
| | | start_time = time.time() |
| | | if is_trade: |
| | | queue_strategy_w_trade_r.put_nowait(root_data) |
| | | else: |
| | | queue_strategy_w_trade_r_for_query.put_nowait(root_data) |
| | | |
| | | use_time = int((time.time() - start_time) * 1000) |
| | | except BrokenPipeError as e: |
| | | |
| | | raise e |
| | | except Exception as e: |
| | | |
| | | logging.exception(e) |
| | | raise e |
| | | return request_id |
| | | |
| | | |
| | | def __read_response(request_id, blocking, timeout=TIMEOUT, log_enable=True): |
| | | if blocking: |
| | | start_time = time.time() |
| | | try: |
| | | while True: |
| | | time.sleep(0.005) |
| | | if request_id in __request_response_dict: |
| | | # 获取到了响应内容 |
| | | result = __request_response_dict.pop(request_id) |
| | | return result |
| | | if time.time() - start_time > timeout: |
| | | # 读取内容超时才会释放 |
| | | raise Exception(f"读取内容超时: request_id={request_id}") |
| | | finally: |
| | | pass |
| | | |
| | | return None |
| | | |
| | | |
| | | |
| | | def set_response(request_id, response): |
| | | if request_id: |
| | | # 主动触发 |
| | | __request_response_dict[request_id] = response |
| | | else: |
| | | # 被动触发 |
| | | pass |
| | | |
| | | |
| | | def request(type_, data): |
| | | request_id = __request(type_, |
| | | data) |
| | | return __read_response(request_id, blocking=True) |
| | | |
| | | |
| | | # 设置L2订阅数据 |
| | | def __test_trade_channel(sid): |
| | | request_id = __request("test", |
| | | {"type": "test", "data": {"sid": sid}}, log_enable=False) |
| | | return __read_response(request_id, True, log_enable=False) |
| | | |
| | | |
| | | def parseResponse(data_str): |
| | | if not data_str: |
| | | raise Exception("反馈内容为空") |
| | | res = data_str |
| | | if type(res) == str: |
| | | res = json.loads(data_str) |
| | | res = res['data'] |
| | | if res['code'] != 0: |
| | | raise Exception(res['msg']) |
| | | return res['data'] |
| | | |
| | | |
| | | if __name__ == "__main__": |
| | | pass |
| | |
| | | return common_request_for_cb({"ctype": "get_account_money"}, blocking) |
| | | |
| | | |
| | | def refresh_trade_data_for_cb(type_, blocking=True): |
| | | """ |
| | | 刷新可转债交易数据 |
| | | :param code: |
| | | :param blocking: |
| | | :return: |
| | | """ |
| | | request_id, client = __request(ClientSocketManager.CLIENT_TYPE_TRADE_CB, |
| | | {"type": API_TYPE_REFRESH_TRADE_DATA, "ctype": type_, |
| | | "sinfo": f"cb_{API_TYPE_REFRESH_TRADE_DATA}_{round(time.time() * 1000)}"}) |
| | | return __read_response(client, request_id, blocking) |
| | | |
| | | |
| | | |
| | | def common_request_for_cb(params, blocking=True): |
| | | """ |
| | | 通用请求 |
| | |
| | | import threading |
| | | import time |
| | | |
| | | from log import logger_kp_msg |
| | | from log_module.log import logger_kp_msg |
| | | from utils import log_export |
| | | |
| | | CLIENT_IDS = ["zjb", "hxh"] |