""" 可转债入口函数 """ import datetime import json import logging import multiprocessing import threading import time from code_attribute import target_codes_manager, gpcode_manager, code_market_manager, history_k_data_util from code_attribute.gpcode_manager import CodesNameManager from code_attribute.limit_up_time_manager import FirstLimitUpTimeManager from db.redis_manager_delegate import RedisUtils from huaxin_client import l2_client_for_cb from huaxin_client.client_network import SendResponseSkManager from log_module import async_log_util, log_export from records import huaxin_trade_record_manager from third_data import kpl_data_manager, kpl_util from third_data.kpl_data_manager import PullTask, KPLCodeJXBlockManager, KPLLimitUpDataRecordManager from trade import huaxin_trade_api, huaxin_trade_data_update, huaxin_sell_util, backtest_trade, buy_strategy, \ sell_strategy from trade.buy_strategy import BuyStrategyDataManager, StrategyBuyOrderRefManager from trade.trade_manager import CodeTradeStateManager from trade.trade_settings import WantBuyCodesManager, TradeStateManager from utils import middle_api_protocol, outside_api_command_manager, constant, tool, huaxin_util, socket_util, sell_util, \ output_util, l2_huaxin_util, output_data_util middle_api_protocol.SERVER_PORT = 10008 middle_api_protocol.SERVER_HOST = "43.138.167.68" constant.LOG_DIR = "logs_cb" from log_module.log import logger_debug, logger_trade, printlog import concurrent.futures __cancel_sell_thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=8) __BuyStrategyDataManager = BuyStrategyDataManager() def __send_response(data_bytes): sk = SendResponseSkManager.create_send_response_sk(addr=middle_api_protocol.SERVER_HOST, port=middle_api_protocol.SERVER_PORT) try: data_bytes = socket_util.load_header(data_bytes) sk.sendall(data_bytes) result, header_str = socket_util.recv_data(sk) result = json.loads(result) if result["code"] != 0: raise Exception(result['msg']) finally: sk.close() def send_response(data, _client_id, _request_id): data_bytes = json.dumps({"type": "response", "data": data, "client_id": _client_id, "request_id": _request_id}).encode('utf-8') for i in range(3): try: __send_response(data_bytes) printlog("发送数据成功") break except Exception as e1: logging.exception(e1) # 撤长期没有成交的单 def __cancel_not_deal_order(code, order_ref, timeout=3): time.sleep(timeout) # 撤买单 huaxin_trade_api.cancel_order(1, code, "", orderRef=order_ref) def command_callback(client_id, request_id, data): """ 命令回调 :param client_id: :param request_id: :param data: json格式数据 :return: """ type_ = data.get('type') if type_ == outside_api_command_manager.API_TYPE_TRADE: try: trade_type = data["trade_type"] if trade_type == outside_api_command_manager.TRADE_TYPE_ORDER: code = data["code"] direction = data["direction"] volume = data["volume"] price_type = data["price_type"] price = data["price"] sinfo = data["sinfo"] if direction == 2: # price_type: 0-价格笼子 1-跌停价 2-涨停价 3-现价 4-买5价 async_log_util.info(logger_trade, f"API卖: 接收数据-{data}") current_price = None market_info = code_market_manager.get_market_info(code) if market_info: current_price = market_info.price else: price_results = history_k_data_util.HistoryKDatasUtils.get_gp_current_info([code]) current_price = price_results[0]["price"] limit_down_price = target_codes_manager.get_limit_down_price(code) limit_up_price = target_codes_manager.get_limit_up_price(code) order_ref = huaxin_util.create_order_ref() try: result = huaxin_sell_util.start_sell(code, volume, price_type, limit_up_price, limit_down_price, current_price, blocking=True, request_id=request_id, order_ref=order_ref) async_log_util.info(logger_trade, f"API卖结果: {result}") send_response(result, client_id, request_id) except Exception as e: if str(e).find("超时") >= 0: send_response({"code": 0, "data": {"orderRef": order_ref}}, client_id, request_id) else: raise e finally: huaxin_trade_data_update.add_position_list() huaxin_trade_data_update.add_money_list() huaxin_trade_data_update.add_deal_list() else: if not price: limit_down_price = target_codes_manager.get_limit_down_price(code) limit_up_price = target_codes_manager.get_limit_up_price(code) price = sell_util.get_sell_price(price_type, limit_up_price, limit_down_price, None) if not price: raise Exception("尚未获取到买入价格") # 获取买1金额 price = round(float(price), 3) order_ref = huaxin_util.create_order_ref() result = huaxin_trade_api.order(direction, code, volume, price, sinfo=sinfo, order_ref=order_ref, blocking=True, request_id=request_id) # 2s内没成交就撤单 __cancel_sell_thread_pool.submit(__cancel_not_deal_order, code, order_ref) else: result = huaxin_trade_api.order(direction, code, volume, price, sinfo=sinfo, blocking=True, request_id=request_id) huaxin_trade_data_update.add_position_list() huaxin_trade_data_update.add_money_list() huaxin_trade_data_update.add_deal_list() send_response({"code": 0, "data": result}, client_id, request_id) except Exception as e: logger_debug.exception(e) send_response({"code": 1, "msg": str(e)}, client_id, request_id) elif type_ == "get_code_position_info": try: # 查询此仓 code = data.get("code") if code: results = huaxin_trade_record_manager.PositionManager().list_by_day(tool.get_now_date_str("%Y%m%d"), code) else: results = huaxin_trade_record_manager.PositionManager().get_from_cache() if constant.backtest_mode_info: results.clear() position_dict = backtest_trade.position_dict for p in position_dict: translation = position_dict[p] underlying_code = translation["SecurityID"] # TODO 获取当时的价格 translation["buy_list"] = [{"price": str(translation['TradePrice']), "tradeTime": l2_huaxin_util.convert_time(translation['OrderTime']), "volume": 10}] cb_code = target_codes_manager.get_cb_code(underlying_code) underlying_market = code_market_manager.get_market_info(underlying_code) results.append( {"securityID": cb_code, "securityName": gpcode_manager.CodesNameManager().get_code_name(cb_code), "buy_list": translation["buy_list"], "sell_list": [], "currentPosition": 10, "id": cb_code}) for r in results: cb_code = r["securityID"] underlying_code = target_codes_manager.get_underlying_code(cb_code) cb_market = code_market_manager.get_market_info(cb_code) underlying_market = code_market_manager.get_market_info(underlying_code) if cb_market: r["marketInfo"] = {"code": cb_market.code, "name": r["securityName"], "rate": f"{'+' if cb_market.rate > 0.0001 else ''}{round(cb_market.rate * 100, 2)}%", "price": cb_market.price, "lastVolume": cb_market.total_bid_volume // 100, "buy1Money": output_util.money_desc( cb_market.buy1_price * cb_market.buy1_volume), "preClosePrice": cb_market.pre_close_price} r["createTime"] = 0 if underlying_market: if not gpcode_manager.CodesNameManager().get_code_name(underlying_market.code): # 异步请求名称 threading.Thread( target=lambda: gpcode_manager.CodesNameManager().request_code_name(underlying_market.code), daemon=True).start() r["underlyingMarketInfo"] = {"code": underlying_market.code, "name": gpcode_manager.CodesNameManager().get_code_name( underlying_market.code), "rate": f"{'+' if underlying_market.rate > 0.0001 else ''}{round(underlying_market.rate * 100, 2)}%", "price": underlying_market.price, "lastVolume": underlying_market.total_bid_volume // 100, "buy1Money": output_util.money_desc( underlying_market.buy1_price * underlying_market.buy1_volume), "preClosePrice": underlying_market.pre_close_price} r["underlyingDetailInfo"] = output_data_util.load_code_detail_info(underlying_market.code) r["underlyingDetailInfo"]["price"] = underlying_market.price # 获取买点与卖点 if "buy_list" not in r: buys = huaxin_trade_record_manager.DealRecordManager().list_buy_by_code_cache(cb_code) # 根据orderSystemId聚合 temp_dict = {} for b in buys: orderSysID = b["orderSysID"] if orderSysID not in temp_dict: temp_dict[orderSysID] = [] temp_dict[orderSysID].append(b) r["buy_list"] = [] for k in temp_dict: volume = sum([x["volume"] for x in temp_dict[k]]) x = temp_dict[k][0] r["buy_list"].append({"price": str(x["price"]), "tradeTime": x["tradeTime"], "volume": volume, "type": StrategyBuyOrderRefManager().get_strategy_type(x["orderRef"])}) if buys: r["createTime"] = int(buys[0]["tradeTime"].replace(":", "")) if "sell_list" not in r: sells = huaxin_trade_record_manager.DealRecordManager().list_sell_by_code_cache(cb_code) temp_dict = {} for s in sells: orderSysID = s["orderSysID"] if orderSysID not in temp_dict: temp_dict[orderSysID] = [] temp_dict[orderSysID].append(s) r["sell_list"] = [] for k in temp_dict: volume = sum([x["volume"] for x in temp_dict[k]]) x = temp_dict[k][0] r["sell_list"].append({"price": str(x["price"]), "tradeTime": x["tradeTime"], "volume": volume}) send_response({"code": 0, "data": results}, client_id, request_id) except Exception as e: logger_debug.exception(e) send_response({"code": 1, "msg": str(e)}, client_id, request_id) elif type_ == "refresh_trade_data": # 刷新交易数据 ctype = data.get("ctype") if ctype == "money": huaxin_trade_data_update.add_money_list() elif ctype == "position_list": huaxin_trade_data_update.add_position_list() elif ctype == "deal_list": huaxin_trade_data_update.add_deal_list() elif ctype == "delegate_list": huaxin_trade_data_update.add_delegate_list("手动刷新") send_response({"code": 0, "data": {}}, client_id, request_id) elif type_ == outside_api_command_manager.API_TYPE_COMMON_REQUEST: # 常规接口 ctype = data['ctype'] if ctype == 'get_account_money': # 获取账户资金 result = huaxin_trade_record_manager.MoneyManager.get_data() send_response({"code": 0, "data": result}, client_id, request_id) elif ctype == 'set_backtest_mode': try: # 设置回测模式 date = data.get("date") mode = data.get("mode") if not date: date = tool.get_now_date_str() if mode: # 开始回撤 constant.backtest_mode_info = (True, date) threading.Thread(target=lambda: backtest_trade.start_backtest(date), daemon=True).start() else: # 结束回撤 constant.backtest_mode_info = None send_response({"code": 0, "data": {}}, client_id, request_id) except Exception as e: send_response({"code": 0, "data": {}, "msg": str(e)}, client_id, request_id) elif ctype == "want_buy_codes": try: operate = data["operate"] if operate == outside_api_command_manager.OPERRATE_ADD: code = data["code"] WantBuyCodesManager().add_code(code, buy_strategy.STRATEGY_TYPE_LIMIT_UP) WantBuyCodesManager().add_code(code, buy_strategy.STRATEGY_TYPE_RISE_HIGH_WITH_BLOCKS) send_response({"code": 0, "data": {}}, client_id, request_id) elif operate == outside_api_command_manager.OPERRATE_GET: code_infos = WantBuyCodesManager().list_code() codes = set([x.split("-")[0] for x in code_infos]) code_infos = [(x, CodesNameManager.get_code_name(x)) for x in codes] send_response({"code": 0, "data": code_infos}, client_id, request_id) elif operate == outside_api_command_manager.OPERRATE_DELETE: code = data["code"] WantBuyCodesManager().remove_code(code, buy_strategy.STRATEGY_TYPE_LIMIT_UP) WantBuyCodesManager().remove_code(code, buy_strategy.STRATEGY_TYPE_RISE_HIGH_WITH_BLOCKS) send_response({"code": 0, "data": {}}, client_id, request_id) except Exception as e: send_response({"code": 1, "msg": str(e)}, client_id, request_id) elif ctype == "trade_state": try: operate = data["operate"] if operate == outside_api_command_manager.OPERRATE_SET: state = data["state"] if state > 0: TradeStateManager().open_buy() else: TradeStateManager().close_buy() send_response({"code": 0, "data": {}}, client_id, request_id) elif operate == outside_api_command_manager.OPERRATE_GET: state = 1 if TradeStateManager().is_can_buy() else 0 send_response({"code": 0, "data": {"state": state}}, client_id, request_id) except Exception as e: send_response({"code": 1, "msg": str(e)}, client_id, request_id) def test(): time.sleep(5) print("获取资金:", huaxin_trade_api.get_money()) # print("获取成交:", huaxin_trade_api.get_deal_list()) # print("下单:", huaxin_trade_api.order(1, "127075", 10, 140.5, blocking=True)) def __get_buy_money(strategy_type): time_str = tool.get_now_time_str().replace(":", "") if int(time_str) < int("103000"): money = 40000 elif int(time_str) < int("113000"): money = 30000 elif int(time_str) < int("140000"): money = 20000 else: money = 10000 if strategy_type == buy_strategy.STRATEGY_TYPE_RISE_HIGH_WITH_BLOCKS: return int(money * 0.8) elif strategy_type == buy_strategy.STRATEGY_TYPE_LIMIT_UP: return int(money * 0.2) else: return money def read_l2_results(trade_call_back_queue): while True: try: result = trade_call_back_queue.get() if result: # 获取可以买的代码 code, trade_time = result[0], result[1] buy_infos = result[3] for buy_info in buy_infos: if buy_info[0]: if not TradeStateManager().is_can_buy(): continue # 策略类型 strategy_type = buy_info[1] # 获取股票代码的可转债代码 cb_code = target_codes_manager.get_cb_code(code) if CodeTradeStateManager().get_trade_state(cb_code, strategy_type) == CodeTradeStateManager.TRADE_STATE_ALREADY_BUY and not WantBuyCodesManager().is_in_cache( cb_code, strategy_type): # 已经买了且没在想买单 continue # 如果之前就涨停了且没在想买单中 underlying_code = target_codes_manager.get_underlying_code(cb_code) underlying_limit_up_time = FirstLimitUpTimeManager().get_first_limit_up_time(underlying_code) if not WantBuyCodesManager().is_in_cache(cb_code, strategy_type) and underlying_limit_up_time: continue # 获取可转债的涨停价 market_info = code_market_manager.get_market_info(cb_code) limit_up_price = target_codes_manager.get_limit_up_price(cb_code) if market_info: if market_info.rate > 0.139: async_log_util.info(logger_trade, f"可转债涨幅过高::{cb_code}-{market_info.rate}") continue volume = int(__get_buy_money(strategy_type) / float(limit_up_price)) volume = (volume // 10) * 10 buy_price = round(min(float(market_info.price * 1.02), float(limit_up_price)), 3) async_log_util.info(logger_trade, f"准备下单:{cb_code}-{buy_price}-{buy_info}") # 买入20股 result = huaxin_trade_api.order(1, cb_code, volume, buy_price, blocking=True) if type(result) == dict and result['code'] == 0: orderRef = result['data']['orderRef'] StrategyBuyOrderRefManager().add(orderRef, strategy_type) CodeTradeStateManager().set_trade_state(cb_code, strategy_type, CodeTradeStateManager.TRADE_STATE_ALREADY_BUY) # 移除想买单 WantBuyCodesManager().remove_code(cb_code, strategy_type) async_log_util.info(logger_trade, f"可转债下单结果:{result}") huaxin_trade_data_update.add_position_list() huaxin_trade_data_update.add_money_list() huaxin_trade_data_update.add_deal_list() except Exception as e: logger_debug.exception(e) time.sleep(1) finally: pass def __read_market_data(queue_market: multiprocessing.Queue): while True: try: result = queue_market.get() if result: # (代码, 最近的价格, 涨幅, 买1价, 买1量, 成交总量, 买入量, 卖出量, 昨日收盘价, 时间戳) code_market_manager.set_market_info(result) try: time_str = l2_huaxin_util.convert_time(result[9]) if int(time_str.replace(":", "")) < int("092455"): continue except Exception as e: pass code = result[0] # 正股需要加载板块 if tool.is_stock(code): # 正股代码 limit_up_price = tool.get_limit_up_price(code, result[8]) # 涨幅大于5%才开始获取板块 if result[2] > 0.05: KPLCodeJXBlockManager().load_jx_blocks(result[0], result[3], float(limit_up_price), KPLLimitUpDataRecordManager.get_current_reasons()) FirstLimitUpTimeManager().process(result) else: # 可转债代码,处理 sell_strategy.process_market_info(result, __BuyStrategyDataManager) except Exception as e: logger_debug.exception(e) time.sleep(1) finally: pass def __init_data(): """ 初始化参数 :return: """ try: market_dict = log_export.load_latest_market_info() for k in market_dict: code_market_manager.set_market_info(market_dict[k], with_log=False) except Exception as e: logger_debug.exception(e) def __kpl_limit_up_callback(results): if results: result_list = kpl_util.parseLimitUpData(results) kpl_data_manager.KPLLimitUpDataRecordManager.save_record(tool.get_now_date_str(), result_list) if __name__ == '__main__': # ===========初始化数据========== try: target_codes_manager.load_valid_codes_info() except Exception as e: logger_debug.exception(e) __init_data() trade_call_back_queue = multiprocessing.Queue() # 华鑫交易数据更新 huaxin_trade_data_update.run() # 定时拉取开盘啦涨停数据 threading.Thread(target=lambda: PullTask.run_limit_up_task(__kpl_limit_up_callback), daemon=True).start() # 仿真交易不运行交易客户端 # # ===========运行交易外部API========== # # # # # 策略与交易通信队列 # # 交易结果读取, 交易命令队列与交易查询队列设置为同一个 # queue_strategy_r_trade_w, queue_strategy_w_trade_r, queue_strategy_w_trade_r_for_query = multiprocessing.Queue(), multiprocessing.Queue(), multiprocessing.Queue() # huaxin_trade_api.run_trade(queue_strategy_r_trade_w, queue_strategy_w_trade_r, queue_strategy_w_trade_r_for_query) # # # # ===========运行交易端========== # tradeProcess = multiprocessing.Process( # target=trade_client_for_cb.run, # args=(queue_strategy_w_trade_r, queue_strategy_w_trade_r_for_query, queue_strategy_r_trade_w,)) # tradeProcess.start() # ===========运行本地API接口========== # middle_api_protocol.SERVER_HOST = "192.168.3.122" manager = outside_api_command_manager.NewApiCommandManager() manager.init(middle_api_protocol.SERVER_HOST, middle_api_protocol.SERVER_PORT, command_callback, [("trade_cb", 50)]) manager.run(blocking=False) # threading.Thread(target=test, daemon=True).start() # ===========读取根据L2制定的买入策略========== threading.Thread(target=read_l2_results, args=(trade_call_back_queue,), daemon=True).start() # ===========异步日志持久化========== threading.Thread(target=async_log_util.run_sync, daemon=True).start() # ===========Redis缓存====================== threading.Thread(target=RedisUtils.run_loop, daemon=True).start() # 运行L2数据监听队列 queue_market = multiprocessing.Queue() threading.Thread(target=__read_market_data, args=(queue_market,), daemon=True).start() l2_client_for_cb.run(trade_call_back_queue, queue_market)