| | |
| | | if _type == CLIENT_TYPE_TRADE: |
| | | # 交易 |
| | | ctype = data["trade_type"] |
| | | async_log_util.info(logger_trade, f"交易开始") |
| | | async_log_util.info(logger_trade, f"交易开始:{request_id}") |
| | | cls.action_callback.OnTrade(client_id, request_id, sk, ctype, data) |
| | | async_log_util.info(logger_trade, f"交易结束") |
| | | async_log_util.info(logger_trade, f"交易结束:{request_id}") |
| | | elif _type == CLIENT_TYPE_MONEY: |
| | | cls.action_callback.OnMoney(client_id, request_id, sk) |
| | | elif _type == CLIENT_TYPE_DEAL_LIST: |
| | |
| | | import multiprocessing |
| | | import queue |
| | | import threading |
| | | import time |
| | | |
| | | from huaxin_client import l2_data_manager |
| | | from log_module import async_log_util |
| | | from log_module.log import logger_debug |
| | | |
| | | __queue = queue.Queue() |
| | | |
| | | |
| | | def start_thread(): |
| | | print(threading.current_thread().getName(), threading.current_thread().ident) |
| | | def add_data(msg): |
| | | time.sleep(1) |
| | | start_time = time.time() |
| | | __queue.put({"msg": msg}) |
| | | end_time = time.time() |
| | | if end_time - start_time > 0.002: |
| | | print("加入日志耗时") |
| | | |
| | | |
| | | def test_process_1(pipe): |
| | | while True: |
| | | for i in range(10): |
| | | pipe.send_bytes(f"hello world:{i}".encode("utf-8")) |
| | | time.sleep(1) |
| | | |
| | | |
| | | def test_process_2(pipe): |
| | | while True: |
| | | results = pipe.recv_bytes() |
| | | if results: |
| | | print("接受到内容:", results) |
| | | |
| | | |
| | | if __name__ == "__main__": |
| | | _queue = queue.Queue() |
| | | _queue.put("123123") |
| | | print(_queue.get(False)) |
| | | print(_queue.get(False)) |
| | | p1, p2 = multiprocessing.Pipe() |
| | | # L1订阅数据 |
| | | progress1 = multiprocessing.Process(target=lambda: test_process_1(p1)) |
| | | progress2 = multiprocessing.Process(target=lambda: test_process_2(p2)) |
| | | progress1.start() |
| | | progress2.start() |
| | | |
| | | input() |
| | |
| | | trade_cmd_callback(TradeRequest(_type, root_data, request_id)) |
| | | else: |
| | | start_time = time.time() |
| | | pipe_trade.send(json.dumps(root_data).encode("utf-8")) |
| | | pipe_trade.send_bytes(json.dumps(root_data).encode("utf-8")) |
| | | use_time = int((time.time() - start_time)*1000) |
| | | if use_time > 10: |
| | | async_log_util.info(hx_logger_trade_loop, f"发送耗时:request_id-{request_id} 耗时时间:{use_time}") |
| | |
| | | |
| | | # 通过量下单,返回(代码,账号ID,订单号) |
| | | def order_volume(code, price, count, last_data_index, order_ref=None): |
| | | async_log_util.info(logger_trade, f"{code}下单方法开始") |
| | | price = round(float(price), 2) |
| | | if code.find("00") != 0 and code.find("60") != 0: |
| | | raise Exception("只支持00开头与60开头的代码下单") |
| | | # 保存下单信息 |
| | | huaxin.huaxin_delegate_postion_manager.place_order(code, price, count, last_data_index) |
| | | if not constant.TRADE_ENABLE: |
| | | return |
| | | result = None |
| | | blocking = False |
| | | async_log_util.info(logger_trade, f"{code} trade_huaxin.order_volume 开始") |
| | | try: |
| | | async_log_util.info(logger_trade, f"{code}下单开始") |
| | | result = huaxin_trade_api.order(1, code, count, price, blocking=blocking, order_ref=order_ref) |
| | | async_log_util.info(logger_trade, f"{code}下单结束") |
| | | except Exception as e: |
| | | if str(e).find("超时") >= 0: |
| | | # 此处出现超时,需要通过读取委托列表来设置订单信息 |
| | | async_log_util.error(hx_logger_trade_debug, f"{code}:下单结果反馈出错-{str(e)}") |
| | | else: |
| | | raise e |
| | | |
| | | if result: |
| | | if blocking: |
| | | if result['code'] == 0: |
| | | result = result["data"] |
| | | if result["orderStatus"] == huaxin_util.TORA_TSTP_OST_Rejected: |
| | | async_log_util.info(hx_logger_trade_debug, f"{code}:下单失败:{result.get('statusMsg')}") |
| | | raise Exception(result.get('statusMsg')) |
| | | else: |
| | | __TradeOrderIdManager.add_order_id(code, result["accountID"], result["orderSysID"]) |
| | | async_log_util.info(hx_logger_trade_debug, f"{code}:下单成功 orderSysID:{result['orderSysID']}") |
| | | return result["securityId"], result["accountID"], result["orderSysID"] |
| | | price = round(float(price), 2) |
| | | if code.find("00") != 0 and code.find("60") != 0: |
| | | raise Exception("只支持00开头与60开头的代码下单") |
| | | # 保存下单信息 |
| | | huaxin.huaxin_delegate_postion_manager.place_order(code, price, count, last_data_index) |
| | | if not constant.TRADE_ENABLE: |
| | | return |
| | | result = None |
| | | blocking = False |
| | | try: |
| | | async_log_util.info(logger_trade, f"{code}下单开始") |
| | | result = huaxin_trade_api.order(1, code, count, price, blocking=blocking, order_ref=order_ref) |
| | | async_log_util.info(logger_trade, f"{code}下单结束") |
| | | except Exception as e: |
| | | if str(e).find("超时") >= 0: |
| | | # 此处出现超时,需要通过读取委托列表来设置订单信息 |
| | | async_log_util.error(hx_logger_trade_debug, f"{code}:下单结果反馈出错-{str(e)}") |
| | | else: |
| | | raise Exception(result['msg']) |
| | | raise e |
| | | |
| | | if result: |
| | | if blocking: |
| | | if result['code'] == 0: |
| | | result = result["data"] |
| | | if result["orderStatus"] == huaxin_util.TORA_TSTP_OST_Rejected: |
| | | async_log_util.info(hx_logger_trade_debug, f"{code}:下单失败:{result.get('statusMsg')}") |
| | | raise Exception(result.get('statusMsg')) |
| | | else: |
| | | __TradeOrderIdManager.add_order_id(code, result["accountID"], result["orderSysID"]) |
| | | async_log_util.info(hx_logger_trade_debug, f"{code}:下单成功 orderSysID:{result['orderSysID']}") |
| | | return result["securityId"], result["accountID"], result["orderSysID"] |
| | | else: |
| | | raise Exception(result['msg']) |
| | | else: |
| | | order_ref = result["order_ref"] |
| | | __TradeOrderIdManager.add_order_ref(code, order_ref) |
| | | async_log_util.info(hx_logger_trade_debug, f"{code}:下单成功 orderRef:{order_ref}") |
| | | return code, "local", order_ref |
| | | else: |
| | | order_ref = result["order_ref"] |
| | | __TradeOrderIdManager.add_order_ref(code, order_ref) |
| | | async_log_util.info(hx_logger_trade_debug, f"{code}:下单成功 orderRef:{order_ref}") |
| | | return code, "local", order_ref |
| | | else: |
| | | raise Exception("下单失败,无返回") |
| | | raise Exception("下单失败,无返回") |
| | | finally: |
| | | async_log_util.info(logger_trade, f"{code} trade_huaxin.order_volume 结束") |
| | | |
| | | |
| | | |
| | | def order_success(code, accountId, orderSysID, orderRef, insertTime): |
| | |
| | | # 交易管理器 |
| | | import datetime |
| | | import json |
| | | import random |
| | | import threading |
| | | import time |
| | | |
| | | import dask |
| | | import concurrent.futures |
| | | |
| | | from code_attribute import gpcode_manager |
| | | from db import mysql_data_delegate as mysql_data, redis_manager_delegate as redis_manager |
| | |
| | | |
| | | # 开始交易 |
| | | def start_buy(code, capture_timestamp, last_data, last_data_index): |
| | | # @dask.delayed |
| | | def is_forbidden(code): |
| | | if l2_trade_util.is_in_forbidden_trade_codes(code): |
| | | return Exception("禁止交易") |
| | | return None, None |
| | | |
| | | # @dask.delayed |
| | | def is_state_right(code): |
| | | trade_state = __CodesTradeStateManager.get_trade_state_cache(code) |
| | | if trade_state != TRADE_STATE_NOT_TRADE and trade_state != TRADE_STATE_BUY_CANCEL_SUCCESS and trade_state != TRADE_STATE_BUY_CANCEL_ING: |
| | | return Exception("代码处于不可交易状态"), trade_state |
| | | return None, trade_state |
| | | |
| | | # @dask.delayed |
| | | def is_money_enough(code): |
| | | money = AccountAvailableMoneyManager().get_available_money_cache() |
| | | if money is None: |
| | |
| | | return Exception("账户可用资金不足"), price |
| | | return None, price |
| | | |
| | | # @dask.delayed |
| | | def can_trade(*args): |
| | | for arg in args: |
| | | if arg[0] is not None: |
| | | return arg[0], None, None |
| | | return None, args[1][1], args[2][1] |
| | | |
| | | _start_time = tool.get_now_timestamp() |
| | | |
| | | ex = is_forbidden(code)[0] |
| | | if ex: |
| | | raise ex |
| | | ex, trade_state = is_state_right(code) |
| | | if ex: |
| | | raise ex |
| | | ex, price = is_money_enough(code) |
| | | if ex: |
| | | raise ex |
| | | |
| | | print("开始买入") |
| | | async_log_util.info(logger_trade, "{}开始买入".format(code)) |
| | | __CodesTradeStateManager.set_trade_state(code, TRADE_STATE_BUY_PLACE_ORDER) |
| | | # 状态改变过后必须要有本地下单编号 |
| | | # _start_time = l2_data_log.l2_time(code, tool.get_now_timestamp() - _start_time, "买入判断时间", force=True) |
| | | __buy(code, price, trade_state, capture_timestamp, last_data, last_data_index) |
| | | # l2_data_log.l2_time(code, tool.get_now_timestamp() - _start_time, "异步买入时间", force=True) |
| | | async_log_util.info(logger_trade, "{} trade.manager.start_buy 开始".format(code)) |
| | | try: |
| | | try: |
| | | ex = is_forbidden(code)[0] |
| | | if ex: |
| | | raise ex |
| | | ex, trade_state = is_state_right(code) |
| | | if ex: |
| | | raise ex |
| | | ex, price = is_money_enough(code) |
| | | if ex: |
| | | raise ex |
| | | finally: |
| | | async_log_util.info(logger_trade, "{} trade.manager.start_buy 判断是否可买".format(code)) |
| | | __CodesTradeStateManager.set_trade_state(code, TRADE_STATE_BUY_PLACE_ORDER) |
| | | # 状态改变过后必须要有本地下单编号 |
| | | __buy(code, price, trade_state, capture_timestamp, last_data, last_data_index) |
| | | finally: |
| | | async_log_util.info(logger_trade, "{} trade.manager.start_buy 结束".format(code)) |
| | | |
| | | |
| | | # 中断买入 |
| | |
| | | # 购买 |
| | | # @tool.async_call |
| | | def __buy(code, price, trade_state, capture_timestamp, last_data, last_data_index): |
| | | async_log_util.info(logger_trade, "{} trade_manager.__buy 开始".format(code)) |
| | | try: |
| | | |
| | | if constant.API_TRADE_ENABLE: |
| | | count = (constant.BUY_MONEY_PER_CODE // int(round(float(price) * 100))) * 100 |
| | | # 最低下单1手 |
| | |
| | | logger_trade.error("{}买入异常{}".format(code, str(e))) |
| | | logger_trade.exception(e) |
| | | raise e |
| | | finally: |
| | | async_log_util.info(logger_trade, "{} trade_manager.__buy 结束".format(code)) |
| | | |
| | | |
| | | # 下单成功 |
| | |
| | | |
| | | __CodesTradeStateManager = CodesTradeStateManager() |
| | | |
| | | __cancel_order_thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=10) |
| | | |
| | | |
| | | # 开始取消买入 |
| | | def start_cancel_buy(code, force=False): |
| | | async_log_util.info(logger_trade, "{}进入撤单方法".format(code)) |
| | | async_log_util.info(logger_trade, "{} trade_manager.start_cancel_buy 开始".format(code)) |
| | | trade_state = __CodesTradeStateManager.get_trade_state_cache(code) |
| | | if trade_state == TRADE_STATE_BUY_SUCCESS: |
| | | return None |
| | | if not force: |
| | | if trade_state != TRADE_STATE_BUY_PLACE_ORDER and trade_state != TRADE_STATE_BUY_DELEGATED: |
| | | return None |
| | | try: |
| | | async_log_util.info(logger_trade, "{}开始撤单".format(code)) |
| | | if trade_state == TRADE_STATE_BUY_SUCCESS: |
| | | return None |
| | | if not force: |
| | | if trade_state != TRADE_STATE_BUY_PLACE_ORDER and trade_state != TRADE_STATE_BUY_DELEGATED: |
| | | return None |
| | | __CodesTradeStateManager.set_trade_state(code, TRADE_STATE_BUY_CANCEL_ING) |
| | | async_log_util.info(logger_trade, "{}撤单方法开始".format(code)) |
| | | if constant.API_TRADE_ENABLE: |
| | | if constant.TRADE_WAY == constant.TRADE_WAY_JUEJIN: |
| | | trade_juejin.cancel_order(code) |
| | |
| | | trade_huaxin.cancel_order(code) |
| | | else: |
| | | guiTrade.cancel_buy(code) |
| | | async_log_util.info(logger_trade, "{}撤单方法结束".format(code)) |
| | | __cancel_success(code) |
| | | # 再次撤单 |
| | | if constant.TRADE_WAY == constant.TRADE_WAY_HUAXIN: |
| | | threading.Thread(target=lambda: trade_huaxin.cancel_order(code, msg="再次撤单"), daemon=True).start() |
| | | __cancel_order_thread_pool.submit(lambda: trade_huaxin.cancel_order(code, msg="再次撤单")) |
| | | # 不需要再次撤单了 |
| | | # try: |
| | | # cancel_buy_again(code) |
| | |
| | | except Exception as e: |
| | | # 状态还原 |
| | | CodesTradeStateManager().set_trade_state(code, trade_state) |
| | | logger_trade.error("{}撤单异常:{}".format(code, str(e))) |
| | | async_log_util.error(logger_trade, "{}撤单异常:{}".format(code, str(e))) |
| | | raise e |
| | | async_log_util.info(logger_trade, "{}撤单完毕".format(code)) |
| | | finally: |
| | | async_log_util.info(logger_trade, "{} trade_manager.start_cancel_buy 结束".format(code)) |
| | | |
| | | |
| | | # 再次撤单,防止没有撤掉 |
| | |
| | | RedisUtils.delete(redis_l2, k, auto_free=False) |
| | | finally: |
| | | RedisUtils.realse(redis_l2) |
| | | |
| | | |
| | | |
| | | |
| | | |
| | | if __name__ == "__main__": |