import base64 import concurrent.futures import copy import json import logging import threading import time import psutil import requests import huaxin_client.constant import constant import inited_data import outside_api_command_manager from cancel_strategy.s_l_h_cancel_strategy import SCancelBigNumComputer from code_attribute import gpcode_manager, code_volumn_manager, zyltgb_util from code_attribute.code_data_util import ZYLTGBUtil from code_attribute.code_l1_data_manager import L1DataManager from code_attribute.gpcode_manager import CodePrePriceManager, CodesNameManager, WantBuyCodesManager from db import mysql_data_delegate as mysql_data, redis_manager_delegate as redis_manager from db.redis_manager_delegate import RedisUtils from huaxin_client import l1_subscript_codes_manager from huaxin_client.client_network import SendResponseSkManager from l2 import l2_data_manager_new, l2_data_util, transaction_progress, \ l2_data_source_util, cancel_buy_strategy from l2.code_price_manager import Buy1PriceManager from l2.l2_data_manager import TradePointManager, OrderBeginPosInfo from l2.l2_data_util import L2DataUtil from l2.l2_transaction_data_manager import HuaXinBuyOrderManager, BigOrderDealManager from log_module import async_log_util, log_export from log_module.log import logger_debug, \ logger_trade, logger_trade_position_api_request, logger_request_api, \ logger_real_place_order_position, logger_device from output import l2_output_util from third_data import kpl_util, history_k_data_manager, huaxin_l1_data_manager, third_blocks_manager, kpl_data_manager from third_data.code_plate_key_manager import KPLCodeJXBlockManager, RealTimeKplMarketData from third_data.history_k_data_manager import HistoryKDataManager from third_data.history_k_data_util import JueJinApi, HistoryKDatasUtils from third_data.kpl_data_manager import KPLDataManager from third_data.kpl_limit_up_data_manager import CodeLimitUpSequenceManager from third_data.kpl_util import KPLDataType from third_data.third_blocks_manager import CodeThirdBlocksManager, SOURCE_TYPE_KPL, BlockMapManager from trade import trade_manager, l2_trade_util, trade_data_manager, trade_constant import l2_data_util as l2_data_util_old from trade.buy_money_count_setting import BuyMoneyAndCountSetting, RadicalBuyBlockCodeCountManager from trade.buy_radical import block_special_codes_manager from trade.huaxin import huaxin_trade_api, huaxin_trade_data_update, \ huaxin_trade_record_manager, huaxin_trade_order_processor, huaxin_sell_util from trade.huaxin.huaxin_trade_record_manager import PositionManager, DealRecordManager, DelegateRecordManager from trade.buy_radical.radical_buy_data_manager import RadicalBuyBlockManager from trade.sell import sell_manager from trade.sell.sell_rule_manager import TradeRuleManager, SellRule from trade.trade_data_manager import RadicalBuyDealCodesManager from trade.trade_manager import TradeTargetCodeModeManager, AutoCancelSellModeManager from settings.trade_setting import MarketSituationManager, TradeBlockBuyModeManager from utils import socket_util, data_export_util, tool, huaxin_util, output_util, global_util from servers import server_util class OutsideApiCommandCallback(outside_api_command_manager.ActionCallback): __cancel_sell_thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=8) __DealRecordManager = DealRecordManager() __code_sell_way_dict = {} @classmethod def __send_response(cls, data_bytes): sk = SendResponseSkManager.create_send_response_sk(addr=huaxin_client.constant.SERVER_IP, port=huaxin_client.constant.SERVER_PORT) try: data_bytes = socket_util.load_header(data_bytes) sk.sendall(data_bytes) result, header_str = socket_util.recv_data(sk) result = json.loads(result) if result["code"] != 0: raise Exception(result['msg']) finally: sk.close() def send_response(self, data, _client_id, _request_id): data_bytes = json.dumps({"type": "response", "data": data, "client_id": _client_id, "request_id": _request_id}).encode('utf-8') for i in range(3): try: self.__send_response(data_bytes) # print("发送数据成功") break except Exception as e1: logging.exception(e1) # 撤长期没有成交的单 def __cancel_not_deal_order(self, code, order_ref, timeout=3): time.sleep(timeout) # 撤买单 huaxin_trade_api.cancel_order(huaxin_trade_api.TRADE_DIRECTION_BUY, code, "", orderRef=order_ref) # 交易 def OnTrade(self, client_id, request_id, data): try: trade_type = data["trade_type"] if trade_type == outside_api_command_manager.TRADE_TYPE_ORDER: code = data["code"] direction = data["direction"] volume = data["volume"] price_type = data["price_type"] price = data["price"] sinfo = data["sinfo"] if direction == 2: # price_type: 0-价格笼子 1-跌停价 2-涨停价 3-现价 4-买5价 async_log_util.info(logger_trade, f"API卖: 接收数据-{data}") current_price = L1DataManager.get_l1_current_price(code) limit_down_price = gpcode_manager.get_limit_down_price(code) limit_up_price = gpcode_manager.get_limit_up_price(code) order_ref = huaxin_util.create_order_ref() try: result = huaxin_sell_util.start_sell(code, volume, price_type, limit_up_price, limit_down_price, current_price, blocking=True, request_id=request_id, order_ref=order_ref) async_log_util.info(logger_trade, f"API卖结果: {result}") self.send_response(result, client_id, request_id) except Exception as e: if str(e).find("超时") >= 0: self.send_response({"code": 0, "data": {"orderRef": order_ref}}, client_id, request_id) else: raise e else: if not price: if tool.trade_time_sub(tool.get_now_time_str(), "09:30:00") < 0: # 开盘之前 limit_down_price = gpcode_manager.get_limit_down_price(code) if not limit_down_price: raise Exception("尚未获取跌停价") # 比跌停价高1分 price = round(float(limit_down_price) + 0.01, 2) else: # 开盘之后 # 没有传入价格,以最新价买入 current_price = L1DataManager.get_l1_current_price(code) if not current_price: raise Exception("尚未获取到现价") # 获取买1金额 price = round(float(current_price), 2) buy1_info = huaxin_l1_data_manager.get_buy1_info(code) if buy1_info and buy1_info[0] * buy1_info[1] > 50 * 10000: # 如果买1在50w以上就加一档 price += 0.01 limit_up_price = gpcode_manager.get_limit_up_price(code) if limit_up_price and price > float(limit_up_price): price = round(float(limit_up_price), 2) order_ref = huaxin_util.create_order_ref() result = huaxin_trade_api.order(direction, code, volume, price, price_type=price_type, sinfo=sinfo, order_ref=order_ref, blocking=True, request_id=request_id) # 2s内没成交就撤单 self.__cancel_sell_thread_pool.submit(self.__cancel_not_deal_order, code, order_ref) else: result = huaxin_trade_api.order(direction, code, volume, price, price_type=price_type, sinfo=sinfo, blocking=True, request_id=request_id) self.send_response({"code": 0, "data": result}, client_id, request_id) elif trade_type == outside_api_command_manager.TRADE_TYPE_CANCEL_ORDER: # print("手动撤单:", data) code = data["code"] direction = data["direction"] accountID = data["accountID"] orderSysID = data["orderSysID"] sinfo = data["sinfo"] async_log_util.info(logger_trade, f"API撤单: {data}") if orderSysID: result = huaxin_trade_api.cancel_order(direction, code, orderSysID, sinfo=sinfo, blocking=True, request_id=request_id) self.send_response({"code": 0, "data": result}, client_id, request_id) elif code: msg_list = [] try: sell_count = 0 sell_orders = huaxin_trade_order_processor.TradeResultProcessor.get_huaxin_sell_order_by_code( code) if sell_orders: for sell_order in sell_orders: if huaxin_util.is_can_cancel(sell_order.orderStatus): sell_count += 1 huaxin_trade_api.cancel_order(direction, code, sell_order.orderRef, blocking=False) msg_list.append(f"撤卖单数量:{sell_count}") except Exception as e: logger_debug.exception(e) can_cancel = l2_data_manager_new.L2TradeDataProcessor.cancel_buy(code, "手动撤单", cancel_type=trade_constant.CANCEL_TYPE_HUMAN) if not can_cancel: msg_list.append(f"无法撤买单") else: msg_list.append(f"已撤买单") async_log_util.info(logger_trade, f"API撤单结果: {msg_list}") self.send_response({"code": 0, "data": {"code": 0, "msg": ";".join(msg_list)}}, client_id, request_id) except Exception as e: logger_debug.exception(e) self.send_response({"code": 1, "msg": str(e)}, client_id, request_id) # 交易状态 def OnTradeState(self, client_id, request_id, data): try: operate = data["operate"] if operate == outside_api_command_manager.OPERRATE_SET: state = data["state"] if state: trade_manager.TradeStateManager().open_buy() else: trade_manager.TradeStateManager().close_buy() self.send_response({"code": 0, "msg": ("开启成功" if state else "关闭成功")}, client_id, request_id) elif operate == outside_api_command_manager.OPERRATE_GET: can_buy = trade_manager.TradeStateManager().is_can_buy_cache() self.send_response({"code": 0, "data": {"can_buy": can_buy}}, client_id, request_id) except Exception as e: self.send_response({"code": 1, "msg": str(e)}, client_id, request_id) # 交易模式 def OnTradeMode(self, client_id, request_id, data): try: operate = data["operate"] if operate == outside_api_command_manager.OPERRATE_SET: mode = data["mode"] TradeTargetCodeModeManager().set_mode(mode) self.send_response({"code": 0, "data": {"mode": mode}}, client_id, request_id) elif operate == outside_api_command_manager.OPERRATE_GET: mode = TradeTargetCodeModeManager().get_mode_cache() self.send_response({"code": 0, "data": {"mode": mode}}, client_id, request_id) except Exception as e: self.send_response({"code": 1, "msg": str(e)}, client_id, request_id) def OnSellRule(self, client_id, request_id, data): try: operate = data["operate"] if operate == outside_api_command_manager.OPERRATE_ADD: data = data["data"] code = data["code"] type = data["type"] buy1_price = data.get("buy1_price") if not buy1_price: buy1_price = gpcode_manager.get_limit_up_price(code) if not buy1_price: raise Exception("尚未获取到涨停价") rule = SellRule(code=data["code"], buy1_volume=data["buy1_volume"], buy1_price=buy1_price, sell_volume=data.get("sell_volume"), sell_price_type=data.get("sell_price_type"), end_time=data["end_time"], type=type) TradeRuleManager().add_rule(rule) self.send_response({"code": 0, "data": {}}, client_id, request_id) elif operate == outside_api_command_manager.OPERRATE_SET: data = data["data"] code = data["code"] buy1_price = data.get("buy1_price") if not buy1_price: buy1_price = gpcode_manager.get_limit_up_price(code) if not buy1_price: raise Exception("尚未获取到涨停价") rule = SellRule(id_=data["id"], code=data["code"], buy1_volume=data["buy1_volume"], buy1_price=buy1_price, sell_volume=data.get("sell_volume"), sell_price_type=data.get("sell_price_type"), end_time=data["end_time"]) TradeRuleManager().update_rule(rule) self.send_response({"code": 0, "data": {}}, client_id, request_id) elif operate == outside_api_command_manager.OPERRATE_DELETE: data = data["data"] TradeRuleManager().del_rule(data["id"]) self.send_response({"code": 0, "data": {}}, client_id, request_id) elif operate == outside_api_command_manager.OPERRATE_GET: rules = TradeRuleManager().list_rules() fresults = [] for rule in rules: fresults.append(rule.to_dict()) self.send_response({"code": 0, "data": fresults}, client_id, request_id) except Exception as e: self.send_response({"code": 1, "msg": str(e)}, client_id, request_id) pass # 代码名单 def OnCodeList(self, client_id, request_id, data): try: code_list_type = data["code_list_type"] operate = data["operate"] code = data.get("code") if code and not tool.is_can_buy_code(code): raise Exception(f"不是可以交易的代码:{code}") fresult = {"code": 0} if code_list_type == outside_api_command_manager.CODE_LIST_WANT: if operate == outside_api_command_manager.OPERRATE_SET: gpcode_manager.WantBuyCodesManager().add_code(code) # 加想买单要从黑名单移除 l2_trade_util.remove_from_forbidden_trade_codes(code) name = gpcode_manager.get_code_name(code) if not name: results = HistoryKDatasUtils.get_gp_codes_names([code]) if results: gpcode_manager.CodesNameManager.add_first_code_name(code, results[code]) elif operate == outside_api_command_manager.OPERRATE_DELETE: gpcode_manager.WantBuyCodesManager().remove_code(code) elif operate == outside_api_command_manager.OPERRATE_GET: codes = gpcode_manager.WantBuyCodesManager().list_code_cache() datas = [] for code in codes: name = gpcode_manager.get_code_name(code) datas.append(f"{name}:{code}") fresult = {"code": 0, "data": datas} elif code_list_type == outside_api_command_manager.CODE_LIST_BLACK: if operate == outside_api_command_manager.OPERRATE_SET: # 先手动撤单 try: l2_data_manager_new.L2TradeDataProcessor.cancel_buy(code, "手动拉黑", cancel_type=trade_constant.CANCEL_TYPE_HUMAN) except Exception as e: logger_debug.exception(e) l2_trade_util.forbidden_trade(code, msg="手动加入 trade_server") WantBuyCodesManager().remove_code(code) name = gpcode_manager.get_code_name(code) if not name: results = HistoryKDatasUtils.get_gp_codes_names([code]) if results: gpcode_manager.CodesNameManager.add_first_code_name(code, results[code]) elif operate == outside_api_command_manager.OPERRATE_DELETE: l2_trade_util.remove_from_forbidden_trade_codes(code) WantBuyCodesManager().add_code(code) elif operate == outside_api_command_manager.OPERRATE_GET: codes = gpcode_manager.BlackListCodeManager().list_codes_cache() datas = [] for code in codes: name = gpcode_manager.get_code_name(code) datas.append(f"{name}:{code}") fresult = {"code": 0, "data": datas} elif code_list_type == outside_api_command_manager.CODE_LIST_WHITE: if operate == outside_api_command_manager.OPERRATE_SET: gpcode_manager.WhiteListCodeManager().add_code(code) name = gpcode_manager.get_code_name(code) if not name: results = HistoryKDatasUtils.get_gp_codes_names([code]) if results: gpcode_manager.CodesNameManager.add_first_code_name(code, results[code]) elif operate == outside_api_command_manager.OPERRATE_DELETE: gpcode_manager.WhiteListCodeManager().remove_code(code) elif operate == outside_api_command_manager.OPERRATE_GET: codes = gpcode_manager.WhiteListCodeManager().list_codes_cache() datas = [] for code in codes: name = gpcode_manager.get_code_name(code) datas.append(f"{name}:{code}") fresult = {"code": 0, "data": datas} elif code_list_type == outside_api_command_manager.CODE_LIST_PAUSE_BUY: if operate == outside_api_command_manager.OPERRATE_SET: gpcode_manager.PauseBuyCodesManager().add_code(code) name = gpcode_manager.get_code_name(code) if not name: results = HistoryKDatasUtils.get_gp_codes_names([code]) if results: gpcode_manager.CodesNameManager.add_first_code_name(code, results[code]) elif operate == outside_api_command_manager.OPERRATE_DELETE: gpcode_manager.PauseBuyCodesManager().remove_code(code) elif operate == outside_api_command_manager.OPERRATE_GET: codes = gpcode_manager.PauseBuyCodesManager().list_code_cache() datas = [] for code in codes: name = gpcode_manager.get_code_name(code) datas.append(f"{name}:{code}") fresult = {"code": 0, "data": datas} elif code_list_type == outside_api_command_manager.CODE_LIST_MUST_BUY: if operate == outside_api_command_manager.OPERRATE_SET: gpcode_manager.MustBuyCodesManager().add_code(code) name = gpcode_manager.get_code_name(code) if not name: results = HistoryKDatasUtils.get_gp_codes_names([code]) if results: gpcode_manager.CodesNameManager.add_first_code_name(code, results[code]) elif operate == outside_api_command_manager.OPERRATE_DELETE: gpcode_manager.MustBuyCodesManager().remove_code(code) elif operate == outside_api_command_manager.OPERRATE_GET: codes = gpcode_manager.MustBuyCodesManager().list_code_cache() datas = [] for code in codes: name = gpcode_manager.get_code_name(code) datas.append(f"{name}:{code}") fresult = {"code": 0, "data": datas} elif code_list_type == outside_api_command_manager.CODE_LIST_GREEN: if operate == outside_api_command_manager.OPERRATE_SET: gpcode_manager.GreenListCodeManager().add_code(code) gpcode_manager.WantBuyCodesManager().add_code(code) name = gpcode_manager.get_code_name(code) if not name: results = HistoryKDatasUtils.get_gp_codes_names([code]) if results: gpcode_manager.CodesNameManager.add_first_code_name(code, results[code]) elif operate == outside_api_command_manager.OPERRATE_DELETE: gpcode_manager.GreenListCodeManager().remove_code(code) gpcode_manager.WantBuyCodesManager().remove_code(code) elif operate == outside_api_command_manager.OPERRATE_GET: codes = gpcode_manager.GreenListCodeManager().list_codes_cache() datas = [] for code in codes: name = gpcode_manager.get_code_name(code) datas.append(f"{name}:{code}") fresult = {"code": 0, "data": datas} self.send_response(fresult, client_id, request_id) except Exception as e: self.send_response({"code": 1, "msg": str(e)}, client_id, request_id) def OnExportL2(self, client_id, request_id, data): try: code = data["code"] excel_file_name = data_export_util.export_l2_excel(code) # print("导出L2数据目录:", excel_file_name) self.send_response({"code": 0, "data": {}, "msg": ""}, client_id, request_id) except Exception as e: logging.exception(e) self.send_response({"code": 1, "msg": str(e)}, client_id, request_id) def OnEveryDayInit(self, client_id, request_id, data): try: inited_data.everyday_init() self.send_response({"code": 0, "data": {}, "msg": ""}, client_id, request_id) except Exception as e: self.send_response({"code": 1, "msg": str(e)}, client_id, request_id) def OnRefreshTradeData(self, client_id, request_id, data): try: sync_type = data["ctype"] if sync_type == "delegate_list": huaxin_trade_data_update.add_delegate_list("API主动请求") elif sync_type == "deal_list": huaxin_trade_data_update.add_deal_list() elif sync_type == "money": huaxin_trade_data_update.add_money_list() elif sync_type == "position_list": huaxin_trade_data_update.add_position_list() self.send_response({"code": 0, "data": {}, "msg": ""}, client_id, request_id) except Exception as e: self.send_response({"code": 1, "msg": str(e)}, client_id, request_id) def OnGetCodeAttribute(self, client_id, request_id, data): try: code = data["code"] # 查询是否想买单/白名单/黑名单/暂不买 code_name = gpcode_manager.get_code_name(code) want = gpcode_manager.WantBuyCodesManager().is_in_cache(code) white = gpcode_manager.WhiteListCodeManager().is_in_cache(code) black = l2_trade_util.is_in_forbidden_trade_codes(code) pause_buy = gpcode_manager.PauseBuyCodesManager().is_in_cache(code) must_buy = gpcode_manager.MustBuyCodesManager().is_in_cache(code) green = gpcode_manager.GreenListCodeManager().is_in_cache(code) desc_list = [] if want: desc_list.append("【想买单】") if white: desc_list.append("【白名单】") if black: desc_list.append("【黑名单】") if pause_buy: desc_list.append("【暂不买】") if must_buy: desc_list.append("【红名单】") if green: desc_list.append("【绿名单】") result = {"code": 0, "data": {"code_info": (code, code_name), "desc": "".join(desc_list)}} self.send_response(result, client_id, request_id) except Exception as e: self.send_response({"code": 1, "msg": str(e)}, client_id, request_id) def OnGetCodeTradeState(self, client_id, request_id, data): try: code = data["code"] state = trade_manager.CodesTradeStateManager().get_trade_state(code) result = {"code": 0, "data": {"state": state}} self.send_response(result, client_id, request_id) except Exception as e: self.send_response({"code": 1, "msg": str(e)}, client_id, request_id) def OnGetEnvInfo(self, client_id, request_id, data): try: fdata = {} try: date = JueJinApi.get_previous_trading_date(tool.get_now_date_str()) if date: fdata["juejin"] = 1 except Exception as e: fdata["juejin"] = 0 fdata["kpl"] = {} # 获取开盘啦数据 kpl_types = [KPLDataType.LIMIT_UP.value, KPLDataType.JINGXUAN_RANK.value, KPLDataType.INDUSTRY_RANK.value] for kpl_type in kpl_types: if kpl_type in KPLDataManager.kpl_data_update_info: fdata["kpl"][kpl_type] = KPLDataManager.kpl_data_update_info.get(kpl_type) try: # 验证redis RedisUtils.get(redis_manager.RedisManager(0).getRedis(), "test") fdata["redis"] = 1 except: fdata["redis"] = 0 try: # 验证mysql mysql_data.Mysqldb().select_one("select 1") fdata["mysql"] = 1 except: fdata["mysql"] = 0 try: # redis异步任务数量 fdata["redis_async_task_count"] = redis_manager.RedisUtils.get_async_task_count() except: pass # 获取交易通道 try: can_access = huaxin_trade_api.test_trade_channel() fdata["trade_channel_access"] = 1 if can_access else 0 except Exception as e: logger_debug.exception(e) fdata["trade_channel_access"] = 0 # 获取CPU与内存适用情况 memory_info = psutil.virtual_memory() cpu_percent = psutil.cpu_percent(interval=1) fdata["device"] = {"cpu": cpu_percent, "memery": memory_info.percent} logger_device.info(fdata["device"]) # 获取今日自由流通量的更新 try: count = ZYLTGBUtil.count_today_updated_volume_codes() fdata["today_zylt_updated_count"] = count except Exception as e: logger_debug.exception(e) fdata["today_zylt_updated_count"] = -1 # 获取今日K线的更新数量 try: dates = HistoryKDatasUtils.get_latest_trading_date_cache(5) latest_trading_date = None if dates: latest_trading_date = dates[0] if latest_trading_date is None: raise Exception("没有获取到上一个交易日的日期") codes = HistoryKDataManager().get_history_bars_codes(latest_trading_date) count = len(codes) fdata["today_history_k_bar_count"] = count except Exception as e: logger_debug.exception(e) fdata["today_history_k_bar_count"] = -1 # 获取数据服务器是否联通 try: is_data_server_open = socket_util.is_port_bind(9004) fdata["data_server_open"] = 1 if is_data_server_open else 0 except Exception as e: logger_debug.exception(e) fdata["data_server_open"] = -1 # 获取交易通道 result = {"code": 0, "data": fdata, "msg": ""} # print("OnGetEnvInfo 成功") self.send_response(result, client_id, request_id) except Exception as e: self.send_response({"code": 1, "msg": str(e)}, client_id, request_id) # 同步L2订阅代码 def OnSyncL2SubscriptCodes(self, client_id, request_id): logger_debug.debug("OnSyncL2SubscriptCodes") try: codes_sh, codes_sz = l1_subscript_codes_manager.request_l1_subscript_target_codes() if codes_sh and codes_sz: l1_subscript_codes_manager.save_codes(codes_sh, codes_sz) result = {"code": 0, "data": {"codes_sh": len(codes_sh), "codes_sz": len(codes_sz)}} self.send_response(result, client_id, request_id) # 拉取三方板块 codes = [] codes.extend(codes_sh) codes.extend(codes_sz) threading.Thread(target=third_blocks_manager.load_if_less, args=(codes,), daemon=True).start() except Exception as e: logger_debug.error(e) def OnSystemLog(self, client_id, request_id, data): try: start_index = data["start_index"] count = data["count"] # 读取系统日志 logs_data = log_export.load_system_log() total_count = len(logs_data) if start_index >= 0: logs_data = logs_data[start_index:start_index + count] result = {"code": 0, "data": {"total_count": total_count, "list": logs_data}} self.send_response(result, client_id, request_id) except Exception as e: logger_debug.error(e) def OnGetFromDataServer(self, client_id, request_id, data): path = data["path"] params = data["params"] params_strs = [] if params: for k in params: params_strs.append(f"{k}={params[k]}") if params_strs: path += "?" path += "&".join(params_strs) try: # 获取参数 response = requests.get(f"http://127.0.0.1:9004{path}") if response.status_code == 200: self.send_response(response.text, client_id, request_id) else: self.send_response(json.dumps({"code": 1, "msg": f"网络请求状态错误:{response.status_code}"}), client_id, request_id) except: self.send_response(json.dumps({"code": 1, "msg": "网络请求出错"}), client_id, request_id) # 代码的交易信息 def OnGetCodeTradeInfo(self, client_id, request_id, data): try: code = data["code"] # 获取交易信息, # 获取正在成交的位置/获取下单位置/获取成交速率 total_datas = l2_data_util.local_today_datas.get(code) if total_datas is None: total_datas = [] trade_index, is_default = transaction_progress.TradeBuyQueue().get_traded_index(code) trade_speed = transaction_progress.TradeBuyQueue().get_trade_speed(code) # 下单位置 place_order_index = SCancelBigNumComputer().get_real_place_order_index_cache(code) fdata = {} if trade_index and place_order_index: # 有成交进度位与下单位 total_count = 0 total_money = 0 big_money_300_indexs = [] big_money_200_indexs = [] for i in range(trade_index, place_order_index): data = total_datas[i] val = data["val"] if not L2DataUtil.is_limit_up_price_buy(val): continue # 是否已经撤单 left_count = l2_data_source_util.L2DataSourceUtils.get_limit_up_buy_no_canceled_count_v2(code, i, total_datas, l2_data_util.local_today_canceled_buyno_map.get( code)) if left_count <= 0: continue total_count += left_count money = val["num"] * int(val["price"] * 100) total_money += money if money >= 300 * 10000: big_money_300_indexs.append(i) elif money >= 200 * 10000: big_money_200_indexs.append(i) fdata["waiting_for_trade"] = f"{total_count}笔&{output_util.money_desc(total_money)}" total_count = 0 total_money = 0 for i in big_money_300_indexs: data = total_datas[i] val = data["val"] total_count += 1 money = val["num"] * int(val["price"] * 100) total_money += money fdata["big_num_300"] = {"desc": f"{total_count}笔&{output_util.money_desc(total_money)}", "datas": [output_util.format_l2_data(total_datas[x]) for x in big_money_300_indexs]} total_count = 0 total_money = 0 for i in big_money_200_indexs: data = total_datas[i] val = data["val"] total_count += 1 money = val["num"] * int(val["price"] * 100) total_money += money fdata["big_num_200"] = {"desc": f"{total_count}笔&{output_util.money_desc(total_money)}", "datas": [output_util.format_l2_data(total_datas[x]) for x in big_money_200_indexs]} if trade_speed: seconds = int(total_money / trade_speed) h = seconds // 3600 m = seconds % 3600 // 60 s = seconds % 60 fdata["trade_speed"] = f"{trade_speed}元/秒" fdata["trade_use_time"] = "{:0>2d}:{:0>2d}:{:0>2d}".format(h, m, s) fdata["trade_time"] = tool.trade_time_add_second(tool.get_now_time_str(), seconds) result = {"code": 0, "data": fdata} self.send_response(result, client_id, request_id) except Exception as e: logging.exception(e) self.send_response(json.dumps({"code": 1, "msg": f"数据处理出错:{e}"}), client_id, request_id) def OnGetActiveListenCount(self, client_id, request_id): try: order = 0 # l2DataListenManager.get_active_count(L2DataListenManager.TYPE_ORDER) transaction = 0 # l2DataListenManager.get_active_count(L2DataListenManager.TYPE_TRANSACTION) market = 0 # l2DataListenManager.get_active_count(L2DataListenManager.TYPE_MARKET) result = {"code": 0, "data": {"order": order, "transaction": transaction, "market": market}} self.send_response(result, client_id, request_id) except Exception as e: logging.exception(e) self.send_response(json.dumps({"code": 1, "msg": f"数据处理出错:{e}"}), client_id, request_id) def OnSaveRunningData(self, client_id, request_id): try: inited_data.save_running_data() except Exception as e: logging.exception(e) self.send_response(json.dumps({"code": 1, "msg": f"数据处理出错:{e}"}), client_id, request_id) def OnGetCodePositionInfo(self, client_id, request_id, data): code = data.get("code") __start_time = time.time() try: if not tool.is_can_buy_code(code): raise Exception("非主板代码") # 获取代码基本信息 # 查询是否想买单/白名单/黑名单/暂不买 code_name = gpcode_manager.get_code_name(code) want = gpcode_manager.WantBuyCodesManager().is_in_cache(code) white = gpcode_manager.WhiteListCodeManager().is_in_cache(code) black = l2_trade_util.is_in_forbidden_trade_codes(code) pause_buy = gpcode_manager.PauseBuyCodesManager().is_in_cache(code) desc_list = [] if want: desc_list.append("【想买单】") if white: desc_list.append("【白名单】") if black: desc_list.append("【黑名单】") if pause_buy: desc_list.append("【暂不买】") # 获取持仓 positions = PositionManager.latest_positions trade_rules_count = len(TradeRuleManager().list_can_excut_rules_cache()) fdata = {"code": code, "total": 0, "available": 0, "sell_orders": [], "sell_rules_count": trade_rules_count, "cost_price": 0, "cost_price_rate": 0, "code_info": (code, code_name), "desc": "".join(desc_list)} if positions: for d in positions: code_name = gpcode_manager.get_code_name(d["securityID"]) if not code_name: # 判断是否有名称 results = HistoryKDatasUtils.get_gp_codes_names([d["securityID"]]) threading.Thread( target=CodesNameManager.add_first_code_name(d["securityID"], results[d["securityID"]])).start() if d["prePosition"] <= 0: continue if d["securityID"] != code: continue fdata["total"] = d["prePosition"] fdata["available"] = d["availablePosition"] fdata["cost_price"] = round(float(d["historyPosPrice"]), 2) deal_order_system_id_infos = {} # 获取已经卖的单数 deal_list = self.__DealRecordManager.list_sell_by_code_cache(code) if deal_list: for d in deal_list: if d["orderSysID"] not in deal_order_system_id_infos: deal_order_system_id_infos[d["orderSysID"]] = [d["volume"], d["tradeTime"]] else: deal_order_system_id_infos[d["orderSysID"]][0] += d["volume"] # 获取9:30之前的卖委托 current_delegates = DelegateRecordManager().list_current_delegates(code) if current_delegates: for d in current_delegates: if d["orderSysID"] not in deal_order_system_id_infos: deal_order_system_id_infos[d["orderSysID"]] = [d["volume"], d["insertTime"]] deal_list = [deal_order_system_id_infos[k] for k in deal_order_system_id_infos] deal_list.sort(key=lambda x: x[1]) # TODO 测试 # deal_list.clear() # fdata["available"] = fdata["total"] fdata["sell_orders"] = [k[0] for k in deal_list] break # 有现价就获取现价 current_price = L1DataManager.get_l1_current_price(code) if current_price: fdata["cost_price"] = current_price pre_close_price = CodePrePriceManager.get_price_pre_cache(code) if current_price and pre_close_price: rate = round((float(current_price) - float(pre_close_price)) / float(pre_close_price), 4) fdata["cost_price_rate"] = rate # 获取涨幅 async_log_util.info(logger_trade_position_api_request, f"{fdata}") result = {"code": 0, "data": fdata} self.send_response(result, client_id, request_id) except Exception as e: logging.exception(e) self.send_response({"code": 1, "msg": f"数据处理出错:{e}"}, client_id, request_id) finally: use_time = time.time() - __start_time if use_time > 0.01: # 耗时10ms以上才记录日志 async_log_util.info(logger_trade_position_api_request, f"{code}请求持仓耗时:{use_time * 1000}ms") def OnCommonRequest(self, client_id, request_id, data): # 通用请求 ctype = data["ctype"] __start_time = time.time() try: if ctype == "get_sell_result": order_ref = data["order_ref"] order_entity = huaxin_trade_order_processor.TradeResultProcessor.get_huaxin_order_by_order_ref( order_ref) result = {} if not order_entity: result = {"code": 1, "msg": f"没有获取到订单状态"} else: code_name = gpcode_manager.get_code_name(order_entity.code) result = {} if huaxin_util.is_canceled(order_entity.orderStatus): result = {"code": 0, "data": {"orderStatus": order_entity.orderStatus, "code": order_entity.code, "msg": f"【{order_entity.code}({code_name})】已撤单"}} elif huaxin_util.is_deal(order_entity.orderStatus): result = {"code": 0, "data": {"orderStatus": order_entity.orderStatus, "code": order_entity.code, "msg": f"【{order_entity.code}({code_name})】已经成交"}} else: result = {"code": 0, "data": {"orderStatus": order_entity.orderStatus, "code": order_entity.code, "msg": f"【{order_entity.code}({code_name})】已挂单"}} self.send_response(result, client_id, request_id) elif ctype == "get_position_codes": # 获取今日可卖的持仓代码 codes = PositionManager.get_position_codes() result = {"code": 0, "data": codes} self.send_response(result, client_id, request_id) elif ctype == "market_situation": try: operate = data["operate"] if operate == outside_api_command_manager.OPERRATE_SET: situation = data["situation"] MarketSituationManager().set_situation(situation) self.send_response({"code": 0, "data": {"situation": situation}}, client_id, request_id) elif operate == outside_api_command_manager.OPERRATE_GET: situation = MarketSituationManager().get_situation_cache() self.send_response({"code": 0, "data": {"situation": situation}}, client_id, request_id) except Exception as e: self.send_response({"code": 1, "msg": str(e)}, client_id, request_id) elif ctype == "get_kpl_limit_up_datas": # 获取开盘啦涨停队列 try: datas = kpl_data_manager.KPLDataManager.get_from_file(kpl_util.KPLDataType.LIMIT_UP, tool.get_now_date_str()) self.send_response({"code": 0, "data": datas}, client_id, request_id) except Exception as e: self.send_response({"code": 1, "msg": str(e)}, client_id, request_id) elif ctype == "get_delegated_buy_code_infos": account_available_money = trade_data_manager.AccountMoneyManager().get_available_money_cache() # 获取委托中的代码 # current_delegates = huaxin_trade_record_manager.DelegateRecordManager().list_current_delegates() current_delegates, update_time = huaxin_trade_record_manager.DelegateRecordManager.list_by_day( tool.get_now_date_str("%Y%m%d"), None, [huaxin_util.TORA_TSTP_OST_Accepted, huaxin_util.TORA_TSTP_OST_PartTraded]) fdatas = [] if current_delegates: for c in current_delegates: try: if int(c["direction"]) != huaxin_util.TORA_TSTP_D_Buy: continue code = c["securityID"] orderSysID = c.get("orderSysID") code_name = gpcode_manager.get_code_name(code) # 获取下单位置信息 order_begin_pos = TradePointManager().get_buy_compute_start_data_cache(code) if order_begin_pos is None or order_begin_pos.buy_single_index is None: order_begin_pos = OrderBeginPosInfo(buy_single_index=0, buy_exec_index=0) l2_data_util.load_l2_data(code) total_datas = l2_data_util.local_today_datas.get(code) if not total_datas: continue trade_index, is_default = transaction_progress.TradeBuyQueue().get_traded_index(code) if trade_index is None: trade_index = 0 # 下单位置 place_order_index = SCancelBigNumComputer().get_real_place_order_index_cache(code) if place_order_index is None: place_order_index = 0 # 计算信号位置到真实下单位置的总买(不管是否已撤) total_nums = 0 for i in range(order_begin_pos.buy_single_index, place_order_index): data = total_datas[i] val = data["val"] if not L2DataUtil.is_limit_up_price_buy(val): continue total_nums += val["num"] # 计算已成交/已撤单的数量 deal_or_cancel_num = 0 for i in range(order_begin_pos.buy_single_index, trade_index + 1): data = total_datas[i] val = data["val"] if not L2DataUtil.is_limit_up_price_buy(val): continue deal_or_cancel_num += val["num"] # 获取剩下的笔数 total_left_count = 0 total_left_num = 0 for i in range(trade_index + 1, place_order_index): data = total_datas[i] val = data["val"] if not L2DataUtil.is_limit_up_price_buy(val): continue if val["num"] * float(val["price"]) < 5000: continue left_count = l2_data_source_util.L2DataSourceUtils.get_limit_up_buy_no_canceled_count_v2( code, i, total_datas, l2_data_util.local_today_canceled_buyno_map.get( code)) if left_count > 0: total_left_count += left_count total_left_num += val["num"] * left_count # 获取正在成交 dealing_info = HuaXinBuyOrderManager.get_dealing_order_info(code) if dealing_info: if str(total_datas[trade_index]["val"]["orderNo"]) == str(dealing_info[0]): total_left_num += (total_datas[trade_index]["val"]["num"] - dealing_info[1] // 100) limit_up_price = gpcode_manager.get_limit_up_price(code) buy1_money = Buy1PriceManager().get_latest_buy1_money(code) if buy1_money is None: buy1_money = 1 # 获取已经成交的大单数量 total_big_num = 0 total_big_count = 0 is_ge_code = tool.is_ge_code(code) for i in range(0, trade_index): val = total_datas[i]["val"] if not L2DataUtil.is_limit_up_price_buy(val): continue # 是不是大单 if not l2_data_util_old.is_big_money(val, is_ge_code): continue canceled_data = l2_data_source_util.L2DataSourceUtils.get_limit_up_buy_canceled_data_v2( code, i, total_datas, l2_data_util.local_today_canceled_buyno_map.get( code)) if not canceled_data: total_big_count += 1 else: total_big_num -= canceled_data["val"]["num"] total_big_num += val["num"] not_deal_total_big_num_pre = 0 not_deal_total_big_count_pre = 0 not_deal_total_big_num_after = 0 not_deal_total_big_count_after = 0 is_ge_code = tool.is_ge_code(code) for i in range(trade_index, total_datas[-1]["index"] + 1): val = total_datas[i]["val"] if not L2DataUtil.is_limit_up_price_buy(val): continue # 是不是大单 if not l2_data_util_old.is_big_money(val, is_ge_code): continue canceled_data = l2_data_source_util.L2DataSourceUtils.get_limit_up_buy_canceled_data_v2( code, i, total_datas, l2_data_util.local_today_canceled_buyno_map.get( code)) if not canceled_data: if i < place_order_index: not_deal_total_big_count_pre += 1 else: not_deal_total_big_count_after += 1 else: if i < place_order_index: not_deal_total_big_num_pre -= canceled_data["val"]["num"] else: not_deal_total_big_num_after -= canceled_data["val"]["num"] if i < place_order_index: not_deal_total_big_num_pre += val["num"] else: not_deal_total_big_num_after += val["num"] real_place_order_after_count = 0 real_place_order_after_num = 0 is_ge_code = tool.is_ge_code(code) # 统计真实下单位置后面未撤的金额 for i in range(place_order_index, total_datas[-1]["index"]): val = total_datas[i]["val"] if not L2DataUtil.is_limit_up_price_buy(val): continue # 是不是大单 if not l2_data_util_old.is_big_money(val, is_ge_code): continue canceled_data = l2_data_source_util.L2DataSourceUtils.get_limit_up_buy_canceled_data_v2( code, i, total_datas, l2_data_util.local_today_canceled_buyno_map.get( code)) if not canceled_data: real_place_order_after_count += 1 real_place_order_after_num += val["num"] # 获取当日的量比 volume_rate = code_volumn_manager.CodeVolumeManager().get_volume_rate(code) # 是否需要注意 need_pay_attention = (total_left_count <= 10 or total_left_num * float( limit_up_price) * 100 < 1500 * 10000) and ( real_place_order_after_count <= 10 or real_place_order_after_num * float( limit_up_price) * 100 < 1500 * 10000) # 统计真实下单位是否距离大单位置过近 is_near_big_order = False try: count = 0 for i in range(place_order_index - 1, -1, -1): data = total_datas[i] val = data["val"] if not L2DataUtil.is_limit_up_price_buy(val): continue money = val["num"] * float(val["price"]) if money < 50 * 100: continue left_count = l2_data_source_util.L2DataSourceUtils.get_limit_up_buy_no_canceled_count_v2( code, i, total_datas, l2_data_util.local_today_canceled_buyno_map.get( code)) if left_count <= 0: continue if money >= 299 * 100: if count < 1: is_near_big_order = True else: count += 1 if count >= 1: break except: pass fdata = {"id": orderSysID, "code_info": (code, code_name), "total_num": total_nums, "finish_num": deal_or_cancel_num, "buy1_money": output_util.money_desc(buy1_money), "big_num_count": total_big_count, "big_num_money": output_util.money_desc( total_big_num * float(limit_up_price) * 100), "not_deal_big_num_count": ( not_deal_total_big_count_pre, not_deal_total_big_count_after), "not_deal_big_num_money": (output_util.money_desc( not_deal_total_big_num_pre * float(limit_up_price) * 100), output_util.money_desc( not_deal_total_big_num_after * float( limit_up_price) * 100)), "left_count": total_left_count, "volume_rate": volume_rate, "left_money": output_util.money_desc(total_left_num * float(limit_up_price) * 100), "pay_attention": need_pay_attention, "trade_progress_percent": round( total_left_num * float(limit_up_price) * 100 * 100 / buy1_money, 2), # 成交进度比例 "limit_up_price": gpcode_manager.get_limit_up_price_as_num(code), "is_near_big_order": is_near_big_order, "block": '', "trade_queue": [] } limit_up_data = kpl_data_manager.KPLLimitUpDataRecordManager.record_code_dict.get(code) # 获取当前板块 try: limit_up_sequences = CodeLimitUpSequenceManager.get_current_limit_up_sequence(code) if limit_up_sequences: buy_blocks = RadicalBuyDealCodesManager().get_code_blocks(code) blocks_info = [] for limit_up_sequence in limit_up_sequences: # 获取代码下单的板块 if buy_blocks and limit_up_sequence[0] not in buy_blocks: continue blocks_info.append( f"{limit_up_sequence[0]}-{limit_up_sequence[1]}({limit_up_sequence[2]}&{limit_up_sequence[2] - limit_up_sequence[3]})") if buy_blocks: fdata['block'] = "/".join(blocks_info) except: pass # 获取涨停时间 if limit_up_data: fdata['limit_up_time'] = tool.to_time_str(limit_up_data[2]) # 获取委托队列 try: real_place_order_index = SCancelBigNumComputer().get_real_place_order_index_cache(code) if real_place_order_index is not None: trade_queue = l2_output_util.get_trade_queue_at_near_place_order(code, real_place_order_index, 9) fdata['trade_queue'] = trade_queue # 自由流通股本 zyltgb = global_util.zyltgb_map.get(code) if zyltgb is not None: fdata['zyltgb'] = output_util.money_desc(zyltgb) except: pass try: if order_begin_pos: fdata['mode'] = order_begin_pos.mode else: fdata['mode'] = -1 except: pass fdatas.append(fdata) except Exception as e: logger_debug.exception(e) result = {"code": 0, "data": {"account_available_money": account_available_money, "delegates": fdatas}} self.send_response(result, client_id, request_id) elif ctype == "set_real_place_order_index": # 设置真实下单位置 code = data["code"] real_order_index = data["index"] order_begin_pos = TradePointManager().get_buy_compute_start_data_cache(code) if order_begin_pos is None or order_begin_pos.buy_exec_index is None or order_begin_pos.buy_exec_index < 0: raise Exception("尚未下单") cancel_buy_strategy.set_real_place_position(code, real_order_index, buy_single_index=order_begin_pos.buy_single_index, is_default=False) # 更新日志 async_log_util.info(logger_real_place_order_position, f"真实下单位置(矫正):{code}-{real_order_index}") result = {"code": 0, "data": {}} self.send_response(result, client_id, request_id) elif ctype == "get_positions": # 获取所有持仓信息 positions = PositionManager.latest_positions fdatas = [] if positions: for d in positions: code_ = d["securityID"] code_name = gpcode_manager.get_code_name(d["securityID"]) if not code_name: # 判断是否有名称 results = HistoryKDatasUtils.get_gp_codes_names([code_]) threading.Thread( target=CodesNameManager.add_first_code_name(code_, results[code_])).start() if d["prePosition"] <= 0: continue fdatas.append({"code": code_, "code_name": code_name, "total": d["prePosition"], "available": d["availablePosition"]}) result = {"code": 0, "data": fdatas} self.send_response(result, client_id, request_id) elif ctype == "set_code_sell_way": # 设置卖出方式 # mode : 1-均分 2-百分比 sell_manager.set_code_sell_way(data) result = {"code": 0, "data": {}} self.send_response(result, client_id, request_id) elif ctype == "get_buy1_info": # 获取代码的买1信息 code = data["code"] results = HistoryKDatasUtils.get_gp_current_info([code]) item = results[0]["quotes"][0] result = {"code": 0, "data": {"price": item["bid_p"], "volume": item["bid_v"]}} self.send_response(result, client_id, request_id) elif ctype == "auto_cancel_sell_mode": try: operate = data["operate"] if operate == outside_api_command_manager.OPERRATE_SET: mode = data["mode"] AutoCancelSellModeManager().set_mode(mode) self.send_response({"code": 0, "data": {"mode": mode}}, client_id, request_id) elif operate == outside_api_command_manager.OPERRATE_GET: sell_mode = AutoCancelSellModeManager().get_mode() self.send_response({"code": 0, "data": {"mode": sell_mode}}, client_id, request_id) except Exception as e: self.send_response({"code": 1, "msg": str(e)}, client_id, request_id) elif ctype == "set_per_code_buy_money": # 设置单只票的买入金额 money = data["money"] if money > 50000: raise Exception("最多只能设置5w") constant.BUY_MONEY_PER_CODE = money self.send_response({"code": 0, "data": {"money": constant.BUY_MONEY_PER_CODE}}, client_id, request_id) elif ctype == "get_per_code_buy_money": self.send_response({"code": 0, "data": {"money": constant.BUY_MONEY_PER_CODE}}, client_id, request_id) elif ctype == "repaire_task": # 修复开盘啦任务 kpl_data_manager.PullTask.repaire_pull_task() # 修复数据服务 server_util.repaire_data_server() # 任务修复 huaxin_trade_data_update.repaire_task() self.send_response({"code": 0, "data": {}}, client_id, request_id) elif ctype == "get_trade_queue": code = data["code"] count = data.get("count") if count is None: count = 100 real_place_order_index = SCancelBigNumComputer().get_real_place_order_index_cache(code) trade_queue = l2_output_util.get_trade_queue(code, real_place_order_index, count) self.send_response({"code": 0, "data": trade_queue}, client_id, request_id) elif ctype == "get_deal_big_money_list": # 获取大单成交列表 code = data["code"] data_list = BigOrderDealManager().get_total_buy_money_list(code) bigger_money = l2_data_util_old.get_big_money_val(gpcode_manager.get_limit_up_price_as_num(code), tool.is_ge_code(code)) fdatas = [] for d in data_list: if d < bigger_money: continue fdatas.append(d) results = [output_util.money_desc(d) for d in fdatas] self.send_response({"code": 0, "data": results}, client_id, request_id) elif ctype == "refresh_zylt_volume": update_count = zyltgb_util.update_all_zylt_volumes() self.send_response({"code": 0, "data": {}, "msg": f"更新代码数量:{update_count}"}, client_id, request_id) elif ctype == "get_today_updated_zylt_volume_count": # 获取今日已经更新的自由流通量的代码数量 count = ZYLTGBUtil.count_today_updated_volume_codes() self.send_response({"code": 0, "data": {"count": count}}, client_id, request_id) # 更新代码的K线 elif ctype == "update_history_k_bars": # 更新历史K线 count = history_k_data_manager.update_history_k_bars() self.send_response({"code": 0, "data": {"count": count}, "msg": f"需要更新K线代码数量:{count}"}, client_id, request_id) elif ctype == "get_buy_block_mode": # 获取买入板块的模式 can_buy_unique_block = TradeBlockBuyModeManager().can_buy_unique_block() self.send_response({"code": 0, "data": {"unique_block": 1 if can_buy_unique_block else 0}, "msg": f""}, client_id, request_id) elif ctype == "set_buy_unique_block_mode": mode = data["mode"] # 添加独苗买入模式 if mode > 0: TradeBlockBuyModeManager().add_unique_block() else: TradeBlockBuyModeManager().remove_unique_block() self.send_response({"code": 0, "data": {}, "msg": f""}, client_id, request_id) elif ctype == "get_code_third_blocks": # 获取第三方板块数据 code = data["code"] source_dict = copy.deepcopy(CodeThirdBlocksManager().get_source_blocks(code)) if not source_dict: source_dict = {} source_origin_dict = copy.deepcopy(CodeThirdBlocksManager().get_source_blocks_origin(code)) if not source_origin_dict: source_origin_dict = {} kpl_blocks = set() if kpl_blocks is None: kpl_blocks = set() filter_blocks, match_blocks = RadicalBuyBlockManager.get_code_blocks(code) source_origin_dict[SOURCE_TYPE_KPL] = kpl_blocks source_dict[SOURCE_TYPE_KPL] = BlockMapManager().filter_blocks(kpl_blocks) data = { "blocks": {}, "origin_blocks": {}, "match_blocks": [list(filter_blocks), list(match_blocks)], # 板块净流入情况 "block_in_moneys": [RealTimeKplMarketData.get_block_info_at_block_in(b) for b in filter_blocks] } for s in source_origin_dict: data["origin_blocks"][s] = list(source_origin_dict[s]) for s in source_dict: data["blocks"][s] = list(source_dict[s]) self.send_response({"code": 0, "data": data, "msg": f""}, client_id, request_id) elif ctype == "set_buy_money_count_setting": # 设置买入金额和数量 normal = data["normal"] radical = data["radical"] default_buy_money = data["default_buy_money"] if int(default_buy_money) not in constant.AVAILABLE_BUY_MONEYS: raise Exception("默认金额不在预设金额内") constant.BUY_MONEY_PER_CODE = default_buy_money BuyMoneyAndCountSetting().set_normal_buy_data(normal[0], json.loads(normal[1])) BuyMoneyAndCountSetting().set_radical_buy_data(radical[0], json.loads(radical[1])) data = { "normal": BuyMoneyAndCountSetting().get_normal_buy_setting(), "radical": BuyMoneyAndCountSetting().get_radical_buy_setting() } self.send_response({"code": 0, "data": data, "msg": f""}, client_id, request_id) elif ctype == "get_buy_money_count_setting": # 设置买入金额和数量 data = { "normal": BuyMoneyAndCountSetting().get_normal_buy_setting(), "radical": BuyMoneyAndCountSetting().get_radical_buy_setting(), "moneys": constant.AVAILABLE_BUY_MONEYS, "default_buy_money": constant.BUY_MONEY_PER_CODE } self.send_response({"code": 0, "data": data, "msg": f""}, client_id, request_id) elif ctype == "set_radical_buy_block_count_setting": # 设置买入金额和数量 data_str = data["data"] data_str = base64.b64decode(data_str).decode('utf-8') setting = json.loads(data_str) RadicalBuyBlockCodeCountManager().set_block_code_count(setting) self.send_response({"code": 0, "data": setting, "msg": f""}, client_id, request_id) elif ctype == "get_radical_buy_block_count_setting": # 设置买入金额和数量 data = RadicalBuyBlockCodeCountManager().get_block_code_count_settings() self.send_response({"code": 0, "data": data, "msg": f""}, client_id, request_id) elif ctype == "get_place_order_settings": # 获取买入下单设置 data = { "radical_buy": {"price": (constant.MIN_CODE_RADICAL_BUY_PRICE, constant.MAX_CODE_RADICAL_BUY_PRICE), "zyltgb": constant.RADICAL_BUY_ZYLTGB_AS_YI_RANGES, "top_block_count_by_market_strong":constant.RADICAL_BUY_TOP_IN_COUNT_BY_MARKET_STRONG, "special_codes_max_block_in_rank": constant.RADICAL_BUY_TOP_IN_INDEX_WITH_SPECIAL }} self.send_response({"code": 0, "data": data, "msg": f""}, client_id, request_id) elif ctype == "set_place_order_settings": radical_buy = data.get("radical_buy") if radical_buy: radical_buy = json.loads(radical_buy) constant.MIN_CODE_RADICAL_BUY_PRICE = radical_buy["price"][0] constant.MAX_CODE_RADICAL_BUY_PRICE = radical_buy["price"][1] constant.RADICAL_BUY_ZYLTGB_AS_YI_RANGES = radical_buy["zyltgb"] if radical_buy.get("top_block_count_by_market_strong"): constant.RADICAL_BUY_TOP_IN_COUNT_BY_MARKET_STRONG = radical_buy.get("top_block_count_by_market_strong") if radical_buy.get("special_codes_max_block_in_rank"): constant.RADICAL_BUY_TOP_IN_INDEX_WITH_SPECIAL = radical_buy.get( "special_codes_max_block_in_rank") self.send_response({"code": 0, "data": {}, "msg": f""}, client_id, request_id) elif ctype == "get_buy_open_limit_up_codes": # 获取隔夜单排1的代码 codes = gpcode_manager.BuyOpenLimitUpCodeManager().get_codes() if not codes: codes = set() self.send_response({"code": 0, "data": list(codes), "msg": f""}, client_id, request_id) elif ctype == "set_buy_open_limit_up_codes": # 设置隔夜单排1的代码 codes = data.get("codes") codes = json.loads(codes) gpcode_manager.BuyOpenLimitUpCodeManager().set_codes(set(codes)) self.send_response({"code": 0, "data": list(codes), "msg": f""}, client_id, request_id) elif ctype == "async_radical_buy_special_codes": # 同步扫入买的辨识度代码 count = block_special_codes_manager.update_block_special_codes() self.send_response({"code": 0, "msg": f"更新成功数量:{count}"}, client_id, request_id) except Exception as e: logging.exception(e) logger_debug.exception(e) self.send_response({"code": 1, "msg": f"数据处理出错:{e}"}, client_id, request_id) finally: use_time = time.time() - __start_time if use_time > 5: logger_request_api.info(f"common_request请求时间过长,ctype-{ctype}")