From 3ec79004bd769828c8dc18ed35280f81cfb473ff Mon Sep 17 00:00:00 2001 From: Administrator <admin@example.com> Date: 星期三, 08 二月 2023 19:30:45 +0800 Subject: [PATCH] 交易结果整理 --- server.py | 262 ++++++++++++++++++++++++++++------------------------ 1 files changed, 142 insertions(+), 120 deletions(-) diff --git a/server.py b/server.py index acb687c..cdec9e6 100644 --- a/server.py +++ b/server.py @@ -19,10 +19,10 @@ import gpcode_manager import authority import juejin -import l2_data_log -from l2 import l2_data_manager_new, l2_data_manager +from l2 import l2_data_manager_new, l2_data_manager, l2_data_log import l2_data_util from l2.cancel_buy_strategy import HourCancelBigNumComputer, L2LimitUpMoneyStatisticUtil +import l2.l2_data_util import ths_industry_util import ths_util @@ -60,6 +60,7 @@ l2_save_time_dict = {} l2_trade_buy_queue_dict = {} tradeBuyQueue = l2.transaction_progress.TradeBuyQueue() + last_time = {} def setup(self): super().setup() # 鍙互涓嶈皟鐢ㄧ埗绫荤殑setup()鏂规硶锛岀埗绫荤殑setup鏂规硶浠�涔堥兘娌″仛 @@ -80,90 +81,101 @@ # print("- " * 30) sk: socket.socket = self.request while True: - data = sk.recv(1024 * 1024 * 20) + data = sk.recv(1024 * 100) if len(data) == 0: # print("瀹㈡埛绔柇寮�杩炴帴") break _str = str(data, encoding="gbk") if len(_str) > 0: # print("缁撴灉锛�",_str) - type = data_process.parseType(_str) + type = -1 + try: + type = data_process.parseType(_str) + except: + print(_str) return_str = "OK" if type == 0: - try: origin_start_time = round(time.time() * 1000) __start_time = round(time.time() * 1000) do_id = random.randint(0, 100000) - # level2鐩樺彛鏁版嵁 - day, client, channel, code, capture_time, process_time, datas, origin_datas = l2_data_manager.parseL2Data( + day, client, channel, code, capture_time, process_time, datas, origin_datas = l2.l2_data_util.parseL2Data( _str) - # 闂撮殧1s淇濆瓨涓�鏉2鐨勬渶鍚庝竴鏉℃暟鎹� - if code not in self.l2_save_time_dict or origin_start_time - self.l2_save_time_dict[ - code] >= 1000 and len(datas) > 0: - self.l2_save_time_dict[code] = origin_start_time - logger_l2_latest_data.info("{}#{}#{}", code, capture_time, datas[-1]) + if channel == 0: + now_time = round(time.time() * 1000) + if self.last_time.get(channel) is not None: + #print("鎺ュ彈鍒癓2鐨勬暟鎹�", channel, now_time - self.last_time.get(channel), "瑙f瀽鑰楁椂",now_time - origin_start_time) + pass - # 10ms鐨勭綉缁滀紶杈撳欢鏃� - capture_timestamp = __start_time - process_time - 10 - # print("鎴浘鏃堕棿锛�", process_time) - __start_time = l2_data_log.l2_time(code, do_id, round(time.time() * 1000) - __start_time, - "鎴浘鏃堕棿锛歿} 鏁版嵁瑙f瀽鏃堕棿".format(process_time)) + self.last_time[channel] = now_time - cid, pid = gpcode_manager.get_listen_code_pos(code) + if True: + # 闂撮殧1s淇濆瓨涓�鏉2鐨勬渶鍚庝竴鏉℃暟鎹� + if code not in self.l2_save_time_dict or origin_start_time - self.l2_save_time_dict[ + code] >= 1000 and len(datas) > 0: + self.l2_save_time_dict[code] = origin_start_time + logger_l2_latest_data.info("{}#{}#{}", code, capture_time, datas[-1]) - __start_time = l2_data_log.l2_time(code, do_id, round(time.time() * 1000) - __start_time, - "l2鑾峰彇浠g爜浣嶇疆鑰楁椂") - # 鍒ゆ柇鐩爣浠g爜浣嶇疆鏄惁涓庝笂浼犳暟鎹綅缃竴鑷� - if cid is not None and pid is not None and client == int(cid) and channel == int(pid): - try: - # 鏍¢獙瀹㈡埛绔唬鐮� - l2_code_operate.verify_with_l2_data_pos_info(code, client, channel) - __start_time = round(time.time() * 1000) - if gpcode_manager.is_listen(code): - __start_time = l2_data_log.l2_time(code, do_id, - round(time.time() * 1000) - __start_time, - "l2澶栭儴鏁版嵁棰勫鐞嗚�楁椂") - l2_data_manager_new.L2TradeDataProcessor.process(code, datas, capture_timestamp, - do_id) - __start_time = l2_data_log.l2_time(code, do_id, - round(time.time() * 1000) - __start_time, - "l2鏁版嵁鏈夋晥澶勭悊澶栭儴鑰楁椂", - False) - # 淇濆瓨鍘熷鏁版嵁鏁伴噺 - l2_data_util.save_l2_latest_data_number(code, len(origin_datas)) - if round(time.time() * 1000) - __start_time > 20: - l2_data_log.l2_time(code, do_id, round(time.time() * 1000) - __start_time, - "寮傛淇濆瓨鍘熷鏁版嵁鏉℃暟鑰楁椂", - False) + # 10ms鐨勭綉缁滀紶杈撳欢鏃� + capture_timestamp = __start_time - process_time - 10 + # print("鎴浘鏃堕棿锛�", process_time) + __start_time = l2_data_log.l2_time(code, do_id, round(time.time() * 1000) - __start_time, + "鎴浘鏃堕棿锛歿} 鏁版嵁瑙f瀽鏃堕棿".format(process_time)) - except l2_data_manager.L2DataException as l: - # 鍗曚环涓嶇 - if l.get_code() == l2_data_manager.L2DataException.CODE_PRICE_ERROR: - key = "{}-{}-{}".format(client, channel, code) - if key not in self.l2_data_error_dict or round( - time.time() * 1000) - self.l2_data_error_dict[key] > 10000: - # self.l2CodeOperate.repaire_l2_data(code) - # todo 澶晱鎰熺Щ闄や唬鐮� - logger_l2_error.warning("code-{} l2鍗曚环閿欒:{}", code, l.msg) - # 鍗曚环涓嶄竴鑷存椂闇�瑕佺Щ闄や唬鐮侀噸鏂版坊鍔� - l2_code_operate.L2CodeOperate().remove_l2_listen(code, "l2鐩戝惉鍗曚环閿欒") - self.l2_data_error_dict[key] = round(time.time() * 1000) + cid, pid = gpcode_manager.get_listen_code_pos(code) - except Exception as e: - print("寮傚父", str(e), code) - logging.exception(e) - logger_l2_error.error("鍑洪敊锛歿}".format(str(e))) - logger_l2_error.error("鍐呭锛歿}".format(_str)) - finally: + __start_time = l2_data_log.l2_time(code, do_id, round(time.time() * 1000) - __start_time, + "l2鑾峰彇浠g爜浣嶇疆鑰楁椂") + # 鍒ゆ柇鐩爣浠g爜浣嶇疆鏄惁涓庝笂浼犳暟鎹綅缃竴鑷� + if cid is not None and pid is not None and client == int(cid) and channel == int(pid): + try: + # 鏍¢獙瀹㈡埛绔唬鐮� + l2_code_operate.verify_with_l2_data_pos_info(code, client, channel) + __start_time = round(time.time() * 1000) + if gpcode_manager.is_listen(code): + __start_time = l2_data_log.l2_time(code, do_id, + round(time.time() * 1000) - __start_time, + "l2澶栭儴鏁版嵁棰勫鐞嗚�楁椂") + l2_data_manager_new.L2TradeDataProcessor.process(code, datas, capture_timestamp, + do_id) + __start_time = l2_data_log.l2_time(code, do_id, + round(time.time() * 1000) - __start_time, + "l2鏁版嵁鏈夋晥澶勭悊澶栭儴鑰楁椂", + False) + # 淇濆瓨鍘熷鏁版嵁鏁伴噺 + l2_data_util.save_l2_latest_data_number(code, len(origin_datas)) + if round(time.time() * 1000) - __start_time > 20: + l2_data_log.l2_time(code, do_id, round(time.time() * 1000) - __start_time, + "寮傛淇濆瓨鍘熷鏁版嵁鏉℃暟鑰楁椂", + False) - __end_time = round(time.time() * 1000) - # 鍙褰曞ぇ浜�40ms鐨勬暟鎹� - if __end_time - origin_start_time > 100: - l2_data_log.l2_time(code, do_id, round(time.time() * 1000) - origin_start_time, - "l2鏁版嵁澶勭悊鎬昏�楁椂", - True) + except l2_data_manager.L2DataException as l: + # 鍗曚环涓嶇 + if l.get_code() == l2_data_manager.L2DataException.CODE_PRICE_ERROR: + key = "{}-{}-{}".format(client, channel, code) + if key not in self.l2_data_error_dict or round( + time.time() * 1000) - self.l2_data_error_dict[key] > 10000: + # self.l2CodeOperate.repaire_l2_data(code) + # todo 澶晱鎰熺Щ闄や唬鐮� + logger_l2_error.warning("code-{} l2鍗曚环閿欒:{}", code, l.msg) + # 鍗曚环涓嶄竴鑷存椂闇�瑕佺Щ闄や唬鐮侀噸鏂版坊鍔� + l2_code_operate.L2CodeOperate().remove_l2_listen(code, "l2鐩戝惉鍗曚环閿欒") + self.l2_data_error_dict[key] = round(time.time() * 1000) + + except Exception as e: + print("寮傚父", str(e), code) + logging.exception(e) + logger_l2_error.error("鍑洪敊锛歿}".format(str(e))) + logger_l2_error.error("鍐呭锛歿}".format(_str)) + finally: + + __end_time = round(time.time() * 1000) + # 鍙褰曞ぇ浜�40ms鐨勬暟鎹� + if __end_time - origin_start_time > 100: + l2_data_log.l2_time(code, do_id, round(time.time() * 1000) - origin_start_time, + "l2鏁版嵁澶勭悊鎬昏�楁椂", + True) except Exception as e: logger_l2_error.exception(e) elif type == 1: @@ -221,40 +233,45 @@ elif type == 5: logger_trade_delegate.debug("鎺ユ敹鍒板鎵樹俊鎭�") - # 浜ゆ槗濮旀墭淇℃伅 - dataList = data_process.parseList(_str) - if self.last_trade_delegate_data != _str: - self.last_trade_delegate_data = _str - # 淇濆瓨濮旀墭淇℃伅 - logger_trade_delegate.info(dataList) + __start_time = round(time.time() * 1000) try: - # 璁剧疆鐢虫姤鏃堕棿 - for item in dataList: - apply_time = item["apply_time"] - if apply_time and len(apply_time) >= 8: - code = item["code"] - trade_state = trade_manager.get_trade_state(code) - # 璁剧疆涓嬪崟鐘舵�佺殑浠g爜涓哄凡濮旀墭 - if trade_state == trade_manager.TRADE_STATE_BUY_PLACE_ORDER: - origin_apply_time = apply_time - apply_time = apply_time[0:6] - apply_time = "{}:{}:{}".format(apply_time[0:2], apply_time[2:4], apply_time[4:6]) - ms = origin_apply_time[6:9] - if int(ms) > 500: - # 鏃堕棿+1s - apply_time = tool.trade_time_add_second(apply_time, 1) + # 浜ゆ槗濮旀墭淇℃伅 + dataList = data_process.parseList(_str) + if self.last_trade_delegate_data != _str: + self.last_trade_delegate_data = _str + # 淇濆瓨濮旀墭淇℃伅 + logger_trade_delegate.info(dataList) + try: + # 璁剧疆鐢虫姤鏃堕棿 + for item in dataList: + apply_time = item["apply_time"] + if apply_time and len(apply_time) >= 8: + code = item["code"] + trade_state = trade_manager.get_trade_state(code) + # 璁剧疆涓嬪崟鐘舵�佺殑浠g爜涓哄凡濮旀墭 + if trade_state == trade_manager.TRADE_STATE_BUY_PLACE_ORDER: + origin_apply_time = apply_time + apply_time = apply_time[0:6] + apply_time = "{}:{}:{}".format(apply_time[0:2], apply_time[2:4], + apply_time[4:6]) + ms = origin_apply_time[6:9] + if int(ms) > 500: + # 鏃堕棿+1s + apply_time = tool.trade_time_add_second(apply_time, 1) - print(apply_time) - except Exception as e: - logging.exception(e) + print(apply_time) + except Exception as e: + logging.exception(e) - try: - trade_manager.process_trade_delegate_data(dataList) - except Exception as e: - logging.exception(e) - trade_manager.save_trade_delegate_data(dataList) - # 鍒锋柊浜ゆ槗鐣岄潰 - trade_gui.THSGuiTrade().refresh_data() + try: + trade_manager.process_trade_delegate_data(dataList) + except Exception as e: + logging.exception(e) + trade_manager.save_trade_delegate_data(dataList) + # 鍒锋柊浜ゆ槗鐣岄潰 + trade_gui.THSGuiTrade().refresh_data() + finally: + pass elif type == 4: # 琛屼笟浠g爜淇℃伅 @@ -278,28 +295,33 @@ buy_one_price = data["buyOnePrice"] buy_one_volumn = data["buyOneVolumn"] buy_queue = data["buyQueue"] - buy_queue_result_list = self.tradeBuyQueue.save(code, gpcode_manager.get_limit_up_price(code), - buy_one_price, buy_time, - buy_queue) - if buy_queue_result_list: - # 鏈夋暟鎹� - try: - buy_one_price_ = decimal.Decimal(round(float(buy_one_price), 2)).quantize( - decimal.Decimal("0.00")) - buy_progress_index = self.tradeBuyQueue.save_traded_index(code, buy_one_price_, - buy_queue_result_list) - if buy_progress_index is not None: - HourCancelBigNumComputer.set_trade_progress(code, buy_progress_index, - l2_data_manager.local_today_datas.get(code), - l2_data_manager.local_today_num_operate_map.get( - code)) - logger_l2_trade_buy_queue.info("鑾峰彇鎴愪氦浣嶇疆鎴愬姛锛� code-{} index-{} 鏁版嵁-{}", code, - buy_progress_index, - json.dumps(buy_queue_result_list)) - except Exception as e: - print("涔板叆闃熷垪", code, buy_queue_result_list) - logger_l2_trade_buy_queue.warning("鑾峰彇鎴愪氦浣嶇疆澶辫触锛� code-{} 鍘熷洜-{} 鏁版嵁-{}", code, str(e), - json.dumps(buy_queue_result_list)) + if buy_one_price is None: + print('涔�1浠锋病鏈夛紝', code) + limit_up_price = gpcode_manager.get_limit_up_price(code) + if limit_up_price is not None: + buy_queue_result_list = self.tradeBuyQueue.save(code, limit_up_price, buy_one_price, buy_time, + buy_queue) + if buy_queue_result_list: + # 鏈夋暟鎹� + try: + buy_one_price_ = decimal.Decimal(round(float(buy_one_price), 2)).quantize( + decimal.Decimal("0.00")) + buy_progress_index = self.tradeBuyQueue.save_traded_index(code, buy_one_price_, + buy_queue_result_list) + if buy_progress_index is not None: + HourCancelBigNumComputer.set_trade_progress(code, buy_progress_index, + l2.l2_data_util.local_today_datas.get( + code), + l2.l2_data_util.local_today_num_operate_map.get( + code)) + logger_l2_trade_buy_queue.info("鑾峰彇鎴愪氦浣嶇疆鎴愬姛锛� code-{} index-{} 鏁版嵁-{}", code, + buy_progress_index, + json.dumps(buy_queue_result_list)) + except Exception as e: + logging.exception(e) + print("涔板叆闃熷垪", code, buy_queue_result_list) + logger_l2_trade_buy_queue.warning("鑾峰彇鎴愪氦浣嶇疆澶辫触锛� code-{} 鍘熷洜-{} 鏁版嵁-{}", code, str(e), + json.dumps(buy_queue_result_list)) # buy_queue鏄惁鏈夊彉鍖� if self.l2_trade_buy_queue_dict.get(code) is None or buy_queue != self.l2_trade_buy_queue_dict.get( @@ -318,7 +340,7 @@ l2_data_manager_new.L2TradeDataProcessor.cancel_buy(code, cancel_msg, "trade_queue") if need_sync: # 鍚屾鏁版嵁 - L2LimitUpMoneyStatisticUtil.verify_num(0, code, int(buy_one_volumn), buy_time) + L2LimitUpMoneyStatisticUtil.verify_num(code, int(buy_one_volumn), buy_time) # print(buy_time, buy_one_price, buy_one_volumn) # print("L2涔板崠闃熷垪",datas) -- Gitblit v1.8.0