From c20c3c10635ce78db4a86ce9c0bb1d02e90f525d Mon Sep 17 00:00:00 2001 From: Administrator <admin@example.com> Date: 星期二, 08 八月 2023 17:40:42 +0800 Subject: [PATCH] 单例+缓存优化 --- server.py | 928 +++++++++++++++++++++++++++++++++++++++++++++------------ 1 files changed, 730 insertions(+), 198 deletions(-) diff --git a/server.py b/server.py index 4a6fcd5..eb8eff7 100644 --- a/server.py +++ b/server.py @@ -1,51 +1,61 @@ """ 鎺ュ彈瀹㈡埛绔暟鎹殑鏈嶅姟鍣� """ -import datetime +import decimal import json import logging +import random import socketserver import socket import threading import time -import alert_util -import client_manager -import code_volumn_manager -import data_process -import global_data_loader -import global_util -import gpcode_manager -import authority -import juejin -import l2_data_log -import l2_data_manager -import l2_data_manager_new +from utils import alert_util, data_process, global_util, ths_industry_util, tool, import_util, socket_util +from code_attribute import code_volumn_manager, code_nature_analyse, global_data_loader, gpcode_manager, \ + gpcode_first_screen_manager, first_target_code_data_processor +import constant +from user import authority +import inited_data +from l2 import l2_data_manager_new, l2_data_manager, l2_data_log, l2_log, code_price_manager import l2_data_util -import limit_up_time_manager -import ths_industry_util -import ths_util -import tool -import trade_data_manager -import trade_gui -import trade_manager -import l2_code_operate -from code_data_util import ZYLTGBUtil +from l2.cancel_buy_strategy import HourCancelBigNumComputer, LCancelBigNumComputer +import l2.l2_data_util -from log import logger_l2_error, logger_device, logger_trade_delegate, logger_buy_1_volumn_record, \ - logger_l2_trade_queue, logger_l2_latest_data -from trade_queue_manager import THSBuy1VolumnManager, Buy1PriceManager, thsl2tradequeuemanager +from output import code_info_output +from third_data import block_info, kpl_api +from third_data.code_plate_key_manager import CodesHisReasonAndBlocksManager +from third_data.history_k_data_util import HistoryKDatasUtils +from third_data.kpl_data_manager import KPLCodeLimitUpReasonManager, KPLLimitUpDataRecordManager +from ths import l2_listen_pos_health_manager, l2_code_operate, client_manager +from trade import trade_data_manager, trade_manager, l2_trade_util, deal_big_money_manager, \ + current_price_process_manager, trade_juejin +from code_attribute.code_data_util import ZYLTGBUtil +import l2.transaction_progress + +from log_module.log import logger_l2_error, logger_device, logger_trade_delegate, logger_buy_1_volumn_record, \ + logger_l2_trade_queue, logger_l2_latest_data, logger_l2_trade_buy_queue, logger_first_code_record, logger_debug +from trade.huaxin import huaxin_trade_record_manager +from trade.trade_manager import TradeTargetCodeModeManager +from trade.trade_queue_manager import THSBuy1VolumnManager, thsl2tradequeuemanager + +ths_util = import_util.import_lib("ths.ths_util") +trade_gui = import_util.import_lib("trade.trade_gui") class MyTCPServer(socketserver.TCPServer): - def __init__(self, server_address, RequestHandlerClass, bind_and_activate=True, pipe_juejin=None, pipe_ui=None): - self.pipe_juejin = pipe_juejin # 澧炲姞鐨勫弬鏁� + def __init__(self, server_address, RequestHandlerClass, bind_and_activate=True, pipe_trade=None, pipe_ui=None): + self.pipe_trade = pipe_trade # 澧炲姞鐨勫弬鏁� self.pipe_ui = pipe_ui + # 鍒濆鍖栨暟鎹� + block_info.init() socketserver.TCPServer.__init__(self, server_address, RequestHandlerClass, bind_and_activate=bind_and_activate) # 濡傛灉浣跨敤寮傛鐨勫舰寮忓垯闇�瑕佸啀閲嶅啓ThreadingTCPServer class MyThreadingTCPServer(socketserver.ThreadingMixIn, MyTCPServer): pass + + +# 棣栨澘tick绾ф暟鎹� class MyBaseRequestHandle(socketserver.BaseRequestHandler): @@ -55,15 +65,28 @@ ths_l2_trade_queue_manager = thsl2tradequeuemanager() latest_buy1_volumn_dict = {} - buy1_price_manager = Buy1PriceManager() l2_trade_queue_time_dict = {} l2_save_time_dict = {} + l2_trade_buy_queue_dict = {} + tradeBuyQueue = l2.transaction_progress.TradeBuyQueue() + last_time = {} + first_tick_datas = [] + latest_oringin_data = {} + last_l2_listen_health_time = {} + __KPLCodeLimitUpReasonManager = KPLCodeLimitUpReasonManager() + __CodesPlateKeysManager = CodesHisReasonAndBlocksManager() + # 鍦↙2鐩戞帶涓婇噰闆嗙殑鐜颁环 + __l2_current_price_data = {} def setup(self): super().setup() # 鍙互涓嶈皟鐢ㄧ埗绫荤殑setup()鏂规硶锛岀埗绫荤殑setup鏂规硶浠�涔堥兘娌″仛 # print("----setup鏂规硶琚墽琛�-----") - # print("鎵撳嵃浼犲叆鐨勫弬鏁帮細", self.server.pipe) + # print("鎵撳嵃浼犲叆鐨勫弬鏁帮細", self.server.pipe_trade) self.l2CodeOperate = l2_code_operate.L2CodeOperate.get_instance() + + def __notify_trade(self, type_): + if self.server.pipe_trade: + self.server.pipe_trade.send(json.dumps({"type": type_})) def handle(self): host = self.client_address[0] @@ -78,88 +101,134 @@ # print("- " * 30) sk: socket.socket = self.request while True: - data = sk.recv(1024000) + data = sk.recv(1024 * 100) if len(data) == 0: # print("瀹㈡埛绔柇寮�杩炴帴") break _str = str(data, encoding="gbk") if len(_str) > 0: # print("缁撴灉锛�",_str) - type = data_process.parseType(_str) + type = -1 + try: + # 濡傛灉甯︽湁澶� + if _str.startswith("##"): + total_length = int(_str[2:10]) + _str = _str[10:] + # 闃叉socket鏁版嵁鍙戠敓绮樿繛 + while total_length > len(_str): + d = sk.recv(1024 * 100) + if d: + _str += d.decode(encoding='gbk') + type = data_process.parseType(_str) + except Exception as e: + print("鎺ュ彈鍒扮殑寮傚父鏁版嵁锛�", f"{_str[:10]}...{_str[-10:]}") + if str(e).find("Unterminated string starting") > -1: + _str = _str.replace("\n", "") + type = data_process.parseType(_str) + else: + print(_str) return_str = "OK" if type == 0: - try: + origin_start_time = round(time.time() * 1000) __start_time = round(time.time() * 1000) # level2鐩樺彛鏁版嵁 - day, client, channel, code, capture_time, process_time, datas, origin_datas = l2_data_manager.parseL2Data( + day, client, channel, code, capture_time, process_time, origin_datas, origin_datas_count = l2.l2_data_util.parseL2Data( _str) - # 闂撮殧1s淇濆瓨涓�鏉2鐨勬渶鍚庝竴鏉℃暟鎹� - if code not in self.l2_save_time_dict or origin_start_time - self.l2_save_time_dict[ - code] >= 1000 and len(datas) > 0: - self.l2_save_time_dict[code] = origin_start_time - logger_l2_latest_data.info("{}#{}#{}", code, capture_time, datas[-1]) + last_health_time = self.last_l2_listen_health_time.get((client, channel)) + # --------------------------------璁剧疆L2鍋ュ悍鐘舵��-------------------------------- + if last_health_time is None or __start_time - last_health_time > 1000: + self.last_l2_listen_health_time[(client, channel)] = __start_time + # 鏇存柊鐩戝惉浣嶅仴搴风姸鎬� + if origin_datas_count == 0: + l2_listen_pos_health_manager.set_unhealthy(client, channel) + else: + l2_listen_pos_health_manager.set_healthy(client, channel) - # 10ms鐨勭綉缁滀紶杈撳欢鏃� - capture_timestamp = __start_time - process_time - 10 - # print("鎴浘鏃堕棿锛�", process_time) - __start_time = l2_data_log.l2_time(code, round(time.time() * 1000) - __start_time, - "鎴浘鏃堕棿锛歿} 鏁版嵁瑙f瀽鏃堕棿".format(process_time)) + l2_log.threadIds[code] = random.randint(0, 100000) + if True: + # 闂撮殧1s淇濆瓨涓�鏉2鐨勬渶鍚庝竴鏉℃暟鎹� + if code not in self.l2_save_time_dict or origin_start_time - self.l2_save_time_dict[ + code] >= 1000 and len(origin_datas) > 0: + self.l2_save_time_dict[code] = origin_start_time + logger_l2_latest_data.info("{}#{}#{}", code, capture_time, origin_datas[-1]) - cid, pid = gpcode_manager.get_listen_code_pos(code) + # 10ms鐨勭綉缁滀紶杈撳欢鏃� + capture_timestamp = __start_time - process_time - 10 + # print("鎴浘鏃堕棿锛�", process_time) + __start_time = l2_data_log.l2_time(code, round(time.time() * 1000) - __start_time, + "鎴浘鏃堕棿锛歿} 鏁版嵁瑙f瀽鏃堕棿".format(process_time)) - __start_time = l2_data_log.l2_time(code, round(time.time() * 1000) - __start_time, - "l2鑾峰彇浠g爜浣嶇疆鑰楁椂") - # 鍒ゆ柇鐩爣浠g爜浣嶇疆鏄惁涓庝笂浼犳暟鎹綅缃竴鑷� - if cid is not None and pid is not None and client == int(cid) and channel == int(pid): - try: - # 鏍¢獙瀹㈡埛绔唬鐮� - l2_code_operate.verify_with_l2_data_pos_info(code, client, channel) - __start_time = round(time.time() * 1000) - if gpcode_manager.is_listen(code): - __start_time = l2_data_log.l2_time(code, round(time.time() * 1000) - __start_time, - "l2澶栭儴鏁版嵁棰勫鐞嗚�楁椂") - l2_data_manager_new.L2TradeDataProcessor.process(code, datas, capture_timestamp) - __start_time = l2_data_log.l2_time(code, round(time.time() * 1000) - __start_time, - "l2鏁版嵁鏈夋晥澶勭悊澶栭儴鑰楁椂", - False) - # 淇濆瓨鍘熷鏁版嵁鏁伴噺 - l2_data_util.save_l2_latest_data_number(code, len(origin_datas)) - if round(time.time() * 1000) - __start_time > 20: - l2_data_log.l2_time(code, round(time.time() * 1000) - __start_time, - "寮傛淇濆瓨鍘熷鏁版嵁鏉℃暟鑰楁椂", - False) + cid, pid = gpcode_manager.get_listen_code_pos(code) - except l2_data_manager.L2DataException as l: - # 鍗曚环涓嶇 - if l.get_code() == l2_data_manager.L2DataException.CODE_PRICE_ERROR: - key = "{}-{}-{}".format(client, channel, code) - if key not in self.l2_data_error_dict or round( - time.time() * 1000) - self.l2_data_error_dict[key] > 10000: - # self.l2CodeOperate.repaire_l2_data(code) - # todo 澶晱鎰熺Щ闄や唬鐮� - logger_l2_error.warning("code-{} l2鍗曚环閿欒:{}", code, l.msg) - # 鍗曚环涓嶄竴鑷存椂闇�瑕佺Щ闄や唬鐮侀噸鏂版坊鍔� - l2_code_operate.L2CodeOperate().remove_l2_listen(code, "l2鐩戝惉鍗曚环閿欒") - self.l2_data_error_dict[key] = round(time.time() * 1000) + __start_time = l2_data_log.l2_time(code, round(time.time() * 1000) - __start_time, + "l2鑾峰彇浠g爜浣嶇疆鑰楁椂") + # 鍒ゆ柇鐩爣浠g爜浣嶇疆鏄惁涓庝笂浼犳暟鎹綅缃竴鑷� + if cid is not None and pid is not None and client == int(cid) and channel == int(pid): + # l2.l2_data_util.set_l2_data_latest_count(code, len(origin_datas)) + l2_data_util.save_l2_latest_data_number(code, origin_datas_count) + # 淇濆瓨l2鏁版嵁鏉℃暟 + if not origin_datas: + # or not l2.l2_data_util.is_origin_data_diffrent(origin_datas,self.latest_oringin_data.get(code)): + raise Exception("鏃犳柊澧炴暟鎹�") + # 淇濆瓨鏈�杩戠殑鏁版嵁 + self.latest_oringin_data[code] = origin_datas + limit_up_price = gpcode_manager.get_limit_up_price(code) + datas = l2.l2_data_util.L2DataUtil.format_l2_data(origin_datas, code, limit_up_price) + try: + # 鏍¢獙瀹㈡埛绔唬鐮� + l2_code_operate.verify_with_l2_data_pos_info(code, client, channel) + __start_time = round(time.time() * 1000) + if gpcode_manager.is_listen(code): + __start_time = l2_data_log.l2_time(code, + round(time.time() * 1000) - __start_time, + "l2澶栭儴鏁版嵁棰勫鐞嗚�楁椂") + l2_data_manager_new.L2TradeDataProcessor.process(code, datas, capture_timestamp) + __start_time = l2_data_log.l2_time(code, + round(time.time() * 1000) - __start_time, + "l2鏁版嵁鏈夋晥澶勭悊澶栭儴鑰楁椂", + False) + # 淇濆瓨鍘熷鏁版嵁鏁伴噺 + # l2_data_util.save_l2_latest_data_number(code, len(origin_datas)) + # if round(time.time() * 1000) - __start_time > 20: + # l2_data_log.l2_time(code, round(time.time() * 1000) - __start_time, + # "寮傛淇濆瓨鍘熷鏁版嵁鏉℃暟鑰楁椂", + # False) - except Exception as e: - print("寮傚父", str(e), code) - logging.exception(e) - logger_l2_error.error("鍑洪敊锛歿}".format(str(e))) - logger_l2_error.error("鍐呭锛歿}".format(_str)) - finally: + except l2_data_manager.L2DataException as l: + # 鍗曚环涓嶇 + if l.get_code() == l2_data_manager.L2DataException.CODE_PRICE_ERROR: + key = "{}-{}-{}".format(client, channel, code) + if key not in self.l2_data_error_dict or round( + time.time() * 1000) - self.l2_data_error_dict[key] > 10000: + # self.l2CodeOperate.repaire_l2_data(code) + # todo 澶晱鎰熺Щ闄や唬鐮� + logger_l2_error.warning("code-{} l2鍗曚环閿欒:{}", code, l.msg) + # 鍗曚环涓嶄竴鑷存椂闇�瑕佺Щ闄や唬鐮侀噸鏂版坊鍔� + l2_code_operate.L2CodeOperate().remove_l2_listen(code, "l2鐩戝惉鍗曚环閿欒") + self.l2_data_error_dict[key] = round(time.time() * 1000) - __end_time = round(time.time() * 1000) - # 鍙褰曞ぇ浜�40ms鐨勬暟鎹� - if __end_time - origin_start_time > 100: - l2_data_log.l2_time(code, round(time.time() * 1000) - origin_start_time, - "l2鏁版嵁澶勭悊鎬昏�楁椂", - True) + except Exception as e: + print("寮傚父", str(e), code) + logging.exception(e) + logger_l2_error.error("鍑洪敊锛歿}".format(str(e))) + logger_l2_error.error("鍐呭锛歿}".format(_str)) + finally: + + __end_time = round(time.time() * 1000) + # 鍙褰曞ぇ浜�40ms鐨勬暟鎹� + if __end_time - origin_start_time > 100: + l2_data_log.l2_time(code, round(time.time() * 1000) - origin_start_time, + "l2鏁版嵁澶勭悊鎬昏�楁椂", + True) except Exception as e: - logger_l2_error.exception(e) + if str(e).find("鏂板鏁版嵁"): + pass + else: + logger_l2_error.exception(e) + elif type == 1: # 璁剧疆鑲$エ浠g爜 data_list, is_add = data_process.parseGPCode(_str) @@ -168,11 +237,11 @@ for data in data_list: code_list.append(data["code"]) # 鑾峰彇鍩烘湰淇℃伅 - code_datas = juejin.JueJinManager.get_gp_latest_info(code_list) - if is_add: - gpcode_manager.add_gp_list(code_datas) - else: - gpcode_manager.set_gp_list(code_datas) + code_datas = HistoryKDatasUtils.get_gp_latest_info(code_list) + # if is_add: + # gpcode_manager.add_gp_list(code_datas) + # else: + # gpcode_manager.set_gp_list(code_datas) if not is_add: # 鍚屾鍚岃姳椤虹洰鏍囦唬鐮� @@ -192,16 +261,33 @@ gp_list = gpcode_manager.get_gp_list() gp_code_set = set(gp_list) now_str = tool.get_now_time_str() - for d in dataList: - if d["time"] == "00:00:00" or tool.get_time_as_second(now_str) < tool.get_time_as_second( - d["time"]): - continue - if d["code"] not in gp_code_set: - continue + if dataList: + for d in dataList: + if d["time"] == "00:00:00" or tool.get_time_as_second(now_str) < tool.get_time_as_second( + d["time"]): + continue + if d["code"] not in gp_code_set: + continue # 鑾峰彇鏄惁鏈夋定鍋滄椂闂� # if limit_up_time_manager.get_limit_up_time(d["code"]) is None: # limit_up_time_manager.save_limit_up_time(d["code"], d["time"]) + elif type == 22: + print("---鎺ュ彈鍒伴鏉夸唬鐮�") + try: + if int(tool.get_now_time_str().replace(":", "")) < int("092500"): + raise Exception('鏈埌鎺ュ彈鏃堕棿') + # 棣栨澘浠g爜 + dataList, is_add = data_process.parseGPCode(_str) + tick_datas = first_target_code_data_processor.process_first_codes_datas(dataList) + # 淇濆瓨鐜颁环 + self.first_tick_datas.clear() + self.first_tick_datas.extend(tick_datas) + except Exception as e: + logging.exception(e) + finally: + print("棣栨澘浠g爜澶勭悊瀹屾瘯锛�") + return_str = socket_util.load_header(json.dumps({"code": 0}).encode("utf-8")).decode("utf-8") elif type == 3: # 浜ゆ槗鎴愬姛淇℃伅 @@ -214,80 +300,194 @@ elif type == 5: logger_trade_delegate.debug("鎺ユ敹鍒板鎵樹俊鎭�") - # 浜ゆ槗濮旀墭淇℃伅 - dataList = data_process.parseList(_str) - if self.last_trade_delegate_data != _str: - self.last_trade_delegate_data = _str - # 淇濆瓨濮旀墭淇℃伅 - logger_trade_delegate.info(dataList) + __start_time = round(time.time() * 1000) try: - # 璁剧疆鐢虫姤鏃堕棿 - for item in dataList: - apply_time = item["apply_time"] - if apply_time and len(apply_time) >= 8: - code = item["code"] - trade_state = trade_manager.get_trade_state(code) - # 璁剧疆涓嬪崟鐘舵�佺殑浠g爜涓哄凡濮旀墭 - if trade_state == trade_manager.TRADE_STATE_BUY_PLACE_ORDER: - origin_apply_time = apply_time - apply_time = apply_time[0:6] - apply_time = "{}:{}:{}".format(apply_time[0:2], apply_time[2:4], apply_time[4:6]) - ms = origin_apply_time[6:9] - if int(ms) > 500: - # 鏃堕棿+1s - apply_time = tool.trade_time_add_second(apply_time, 1) + # 浜ゆ槗濮旀墭淇℃伅 + dataList = data_process.parseList(_str) + if self.last_trade_delegate_data != _str: + self.last_trade_delegate_data = _str + # 淇濆瓨濮旀墭淇℃伅 + logger_trade_delegate.info(dataList) + try: + # 璁剧疆鐢虫姤鏃堕棿 + for item in dataList: + apply_time = item["apply_time"] + if apply_time and len(apply_time) >= 8: + code = item["code"] + trade_state = trade_manager.CodesTradeStateManager().get_trade_state(code) + # 璁剧疆涓嬪崟鐘舵�佺殑浠g爜涓哄凡濮旀墭 + if trade_state == trade_manager.TRADE_STATE_BUY_PLACE_ORDER: + origin_apply_time = apply_time + apply_time = apply_time[0:6] + apply_time = "{}:{}:{}".format(apply_time[0:2], apply_time[2:4], + apply_time[4:6]) + ms = origin_apply_time[6:9] + if int(ms) > 500: + # 鏃堕棿+1s + apply_time = tool.trade_time_add_second(apply_time, 1) - print(apply_time) - l2_data_manager_new.SecondAverageBigNumComputer.set_apply_time(code, apply_time) - except Exception as e: - logging.exception(e) + print(apply_time) + except Exception as e: + logging.exception(e) - try: - trade_manager.process_trade_delegate_data(dataList) - except Exception as e: - logging.exception(e) - trade_manager.save_trade_delegate_data(dataList) - # 鍒锋柊浜ゆ槗鐣岄潰 - trade_gui.THSGuiTrade().refresh_data() + try: + trade_manager.process_trade_delegate_data(dataList) + except Exception as e: + logging.exception(e) + trade_manager.save_trade_delegate_data(dataList) + # 鍒锋柊浜ゆ槗鐣岄潰 + if trade_gui is not None: + trade_gui.THSGuiTrade().refresh_data() + finally: + pass elif type == 4: # 琛屼笟浠g爜淇℃伅 dataList = data_process.parseList(_str) - ths_industry_util.save_industry_code(dataList) + codes = [] + for datas in dataList: + for d in datas: + name = ths_industry_util.get_name_by_code(d['code']) + if not name or name == 'None': + codes.append(d["code"]) + # 鏍规嵁浠g爜鑾峰彇浠g爜鍚嶇О + codes_name = {} + if codes: + codes_name = HistoryKDatasUtils.get_gp_codes_names(codes) + ths_industry_util.save_industry_code(dataList, codes_name) elif type == 6: # 鍙敤閲戦 datas = data_process.parseData(_str) client = datas["client"] money = datas["money"] # TODO瀛樺叆缂撳瓨鏂囦欢 - trade_manager.set_available_money(client, money) + trade_manager.AccountAvailableMoneyManager().set_available_money(client, money) # l2浜ゆ槗闃熷垪 elif type == 10: # 鍙敤閲戦 + __start_time = time.time() datas = data_process.parseData(_str) channel = datas["channel"] code = datas["code"] - data = datas["data"] - buy_time = data["buyTime"] - buy_one_price = data["buyOnePrice"] - buy_one_volumn = data["buyOneVolumn"] - # 淇濆瓨鏈�杩戠殑璁板綍 - if self.ths_l2_trade_queue_manager.save_recod(code, data): - if buy_time != "00:00:00": - logger_l2_trade_queue.info("{}-{}", code, data) - self.buy1_price_manager.save(code, buy_one_price) - need_sync, need_cancel, cancel_msg = self.buy1_volumn_manager.save(code, buy_time, - int(buy_one_volumn), - buy_one_price) - if need_cancel: - l2_data_manager_new.L2TradeDataProcessor.cancel_buy(code, cancel_msg, "trade_queue") - if need_sync: - # 鍚屾鏁版嵁 - l2_data_manager_new.L2LimitUpMoneyStatisticUtil.verify_num(code, int(buy_one_volumn), - buy_time) - # print(buy_time, buy_one_price, buy_one_volumn) + msg = "" + try: - # print("L2涔板崠闃熷垪",datas) + if not gpcode_manager.is_in_gp_pool(code) and not gpcode_manager.is_in_first_gp_codes(code): + # 娌″湪鐩爣浠g爜涓笖娌℃湁鍦ㄩ鏉夸粖鏃ュ巻鍙蹭唬鐮佷腑 + raise Exception("浠g爜娌″湪鐩戝惉涓�") + + data = datas["data"] + buy_time = data["buyTime"] + buy_one_price = data["buyOnePrice"] + buy_one_volumn = data["buyOneVolumn"] + sell_one_price = data["sellOnePrice"] + sell_one_volumn = data["sellOneVolumn"] + + buy_queue = data["buyQueue"] + if buy_one_price is None: + print('涔�1浠锋病鏈夛紝', code) + limit_up_price = gpcode_manager.get_limit_up_price(code) + + if limit_up_price is not None: + code_price_manager.Buy1PriceManager().process(code, buy_one_price, buy_time, limit_up_price, + sell_one_price, sell_one_volumn) + _start_time = time.time() + msg += "涔�1浠锋牸澶勭悊锛�" + f"{_start_time - __start_time} " + + buy_queue_result_list = self.tradeBuyQueue.save(code, limit_up_price, buy_one_price, + buy_time, + buy_queue) + msg += "涔伴槦鍒椾繚瀛橈細" + f"{time.time() - _start_time} " + _start_time = time.time() + + if buy_queue_result_list: + + # 鏈夋暟鎹� + try: + buy_one_price_ = decimal.Decimal(round(float(buy_one_price), 2)).quantize( + decimal.Decimal("0.00")) + # 鑾峰彇鎵ц浣嶆椂闂� + + buy_single_index, buy_exec_index, compute_index, num, count, max_num_set, volume_rate = l2_data_manager.TradePointManager().get_buy_compute_start_data( + code) + if True: + # 鍙湁涓嬪崟杩囧悗鎵嶈幏鍙栦氦鏄撹繘搴� + exec_time = None + try: + if buy_exec_index: + exec_time = \ + l2.l2_data_util.local_today_datas.get(code)[buy_exec_index]["val"][ + "time"] + except: + pass + buy_progress_index = self.tradeBuyQueue.compute_traded_index(code, + buy_one_price_, + buy_queue_result_list, + exec_time) + if buy_progress_index is not None: + HourCancelBigNumComputer().set_trade_progress(code, buy_time, + buy_exec_index, + buy_progress_index, + l2.l2_data_util.local_today_datas.get( + code), + l2.l2_data_util.local_today_num_operate_map.get( + code)) + LCancelBigNumComputer().set_trade_progress(code, buy_progress_index, + l2.l2_data_util.local_today_datas.get( + code)) + + logger_l2_trade_buy_queue.info("鑾峰彇鎴愪氦浣嶇疆鎴愬姛锛� code-{} index-{} 鏁版嵁-{}", code, + buy_progress_index, + json.dumps(buy_queue_result_list)) + # 璁$畻澶у崟鎴愪氦棰� + deal_big_money_manager.set_trade_progress(code, buy_progress_index, + l2.l2_data_util.local_today_datas.get( + code), + l2.l2_data_util.local_today_num_operate_map.get( + code)) + + else: + raise Exception("鏆傛湭鑾峰彇鍒颁氦鏄撹繘搴�") + msg += "璁$畻鎴愪氦杩涘害锛�" + f"{time.time() - _start_time} " + _start_time = time.time() + except Exception as e: + logging.exception(e) + print("涔板叆闃熷垪", code, buy_queue_result_list) + logger_l2_trade_buy_queue.warning("鑾峰彇鎴愪氦浣嶇疆澶辫触锛� code-{} 鍘熷洜-{} 鏁版嵁-{}", code, str(e), + json.dumps(buy_queue_result_list)) + + # buy_queue鏄惁鏈夊彉鍖� + if self.l2_trade_buy_queue_dict.get( + code) is None or buy_queue != self.l2_trade_buy_queue_dict.get( + code): + self.l2_trade_buy_queue_dict[code] = buy_queue + logger_l2_trade_buy_queue.info("{}-{}", code, buy_queue) + msg += "淇濆瓨璁板綍鏃ュ織锛�" + f"{time.time() - _start_time} " + _start_time = time.time() + # 淇濆瓨鏈�杩戠殑璁板綍 + if self.ths_l2_trade_queue_manager.save_recod(code, data): + if buy_time != "00:00:00": + logger_l2_trade_queue.info("{}-{}", code, data) + need_sync, need_cancel, cancel_msg = self.buy1_volumn_manager.save(code, buy_time, + int(buy_one_volumn), + buy_one_price) + # if need_sync: + # # 鍚屾鏁版嵁 + # s = time.time() + # L2LimitUpMoneyStatisticUtil.verify_num(code, int(buy_one_volumn), buy_time) + # msg += "閲忔牎楠岋細"+f"{time.time()-s} " + # print(buy_time, buy_one_price, buy_one_volumn) + + # print("L2涔板崠闃熷垪",datas) + msg += "涔�1澶勭悊锛�" + f"{time.time() - _start_time} " + _start_time = time.time() + except: + pass + finally: + space = time.time() - __start_time + if space > 0.1: + logger_debug.info("{}鎴愪氦闃熷垪澶勭悊鏃堕棿锛歿},{}", code, space, msg) + elif type == 20: # 鐧诲綍 data = data_process.parse(_str)["data"] @@ -299,14 +499,28 @@ return_str = data_process.toJson({"code": 1, "msg": str(e)}) # 鐜颁环鏇存柊 elif type == 40: - data = data_process.parse(_str)["data"] - if data is not None: - print("鐜颁环鏁伴噺", len(data)) - for item in data: - volumn = item["volumn"] - volumnUnit = item["volumnUnit"] + datas = data_process.parse(_str)["data"] + if datas is None: + datas = [] + print("浜屾澘鐜颁环") + # 鑾峰彇鏆傚瓨鐨勪簩鐗堢幇浠锋暟鎹� + if self.first_tick_datas: + datas.extend(self.first_tick_datas) + if datas is not None: + print("浜屾澘鐜颁环鏁伴噺", len(datas)) + for item in datas: + volumn = item["volume"] + volumnUnit = item["volumeUnit"] code_volumn_manager.save_today_volumn(item["code"], volumn, volumnUnit) - juejin.accpt_prices(data) + current_price_process_manager.accept_prices(datas) + # L2鐜颁环鏇存柊 + elif type == 41: + datas = data_process.parse(_str)["data"] + if datas: + for d in datas: + code = d["code"] + self.__l2_current_price_data[code] = d + elif type == 50: data = data_process.parse(_str)["data"] if data is not None: @@ -325,19 +539,16 @@ # 璁板綍鏁版嵁 logger_buy_1_volumn_record.info("{}-{}", code, data) self.latest_buy1_volumn_dict[code] = "{}-{}".format(volumn, price) - # 淇濆瓨涔�1浠锋牸 - self.buy1_price_manager.save(code, price) # 鏍℃鏃堕棿 time_ = tool.compute_buy1_real_time(time_) # 淇濆瓨鏁版嵁 need_sync, need_cancel, cancel_msg = self.buy1_volumn_manager.save(code, time_, volumn, price) - if need_cancel: - l2_data_manager_new.L2TradeDataProcessor.cancel_buy(code, cancel_msg, "trade_queue") - if need_sync: - # 鍚屾鏁版嵁 - l2_data_manager_new.L2LimitUpMoneyStatisticUtil.verify_num(code, volumn, time_) - + # if need_cancel: + # l2_data_manager_new.L2TradeDataProcessor.cancel_buy(code, cancel_msg, "trade_queue") + # if need_sync: + # # 鍚屾鏁版嵁 + # L2LimitUpMoneyStatisticUtil.verify_num(code, volumn, time_) elif type == 30: # 蹇冭烦淇℃伅 data = data_process.parse(_str)["data"] @@ -345,21 +556,24 @@ thsDead = data.get("thsDead") logger_device.info("锛坽}锛夊鎴风淇℃伅锛歿}".format(client_id, json.dumps(data))) client_manager.saveClientActive(int(client_id), host, thsDead) - if ths_util.is_ths_dead(client_id): - # TODO 閲嶅惎鍚岃姳椤� - # 鎶ヨ - l2_clients = authority.get_l2_clients() - if client_id in l2_clients: - alert_util.alarm() + if constant.is_windows(): + # 鍔ㄦ�佸鍏� + + if ths_util.is_ths_dead(client_id): + # TODO 閲嶅惎鍚岃姳椤� + # 鎶ヨ + l2_clients = authority.get_l2_clients() + if client_id in l2_clients: + alert_util.alarm() elif type == 60: - # 蹇冭烦淇℃伅 + # L2鑷惎鍔ㄦ垚鍔� data = data_process.parse(_str)["data"] client_id = data["client"] print("L2鑷惎鍔ㄦ垚鍔�", client_id) now_str = tool.get_now_time_str() ts = tool.get_time_as_second(now_str) - # 9鐐�25鍒�9鐐�28涔嬮棿鐨勮嚜鍚姩灏遍渶瑕佹壒閲忚缃唬鐮� - if tool.get_time_as_second("09:24:50") <= ts <= tool.get_time_as_second("09:28:00"): + # 9鐐�25鍒�9鐐�28涔嬮棿鐨勮嚜鍚姩灏遍渶瑕佹壒閲忚缃唬鐮�,鐩墠姘歌繙涓嶆墽琛� + if tool.get_time_as_second("09:24:50") <= ts <= tool.get_time_as_second("09:28:00") and False: # 鍑嗗鎵归噺璁剧疆浠g爜 return_json = {"code": 1, "msg": "绛夊緟鎵归噺璁剧疆浠g爜"} return_str = json.dumps(return_json) @@ -367,11 +581,11 @@ codes = trade_data_manager.CodeActualPriceProcessor().get_top_rate_codes(16) codes = sorted(codes) if client_id == 2: - codes = codes[:8] + codes = codes[:constant.L2_CODE_COUNT_PER_DEVICE] else: - codes = codes[8:] + codes = codes[constant.L2_CODE_COUNT_PER_DEVICE:] codes_datas = [] - for i in range(0, 8): + for i in range(0, constant.L2_CODE_COUNT_PER_DEVICE): if i >= len(codes): break codes_datas.append((i, codes[i])) @@ -382,17 +596,285 @@ break else: time.sleep(3) - - else: return_json = {"code": 0, "msg": "寮�鍚湪绾跨姸鎬�"} return_str = json.dumps(return_json) - - # print("蹇冭烦锛�", client_id) - elif type == 100: - # 鍥惧儚璇嗗埆 - return_str = data_process.toJson({"code": 0, "data": {"datas": []}}) + elif type == 70: + # 閫夎偂瀹濈儹闂ㄦ蹇� + data_json = data_process.parse(_str) + day = data_json["day"] + datas = data_json["data"] + # if datas: + # hot_block_data_process.save_datas(day, datas) + print(datas) + elif type == 71: + # 鏍规嵁浠g爜鑾峰彇閫夎偂瀹濈儹闂ㄦ蹇� + day = tool.get_now_date_str() + code = data_process.parse(_str)["data"]["code"] + __start_time = time.time() + final_data = {'code': code, 'data': code_info_output.get_output_html(code)} + return_str = json.dumps({"code": 0, "data": final_data}) + print("浠g爜淇℃伅鑾峰彇鏃堕棿", code, round((time.time() - __start_time) * 1000)) pass + # 鑾峰彇鏈�杩�2涓氦鏄撴棩娑ㄥ仠浠g爜 + elif type == 72: + day = tool.get_now_date_str() + data_dict = {} + for i in range(0, 2): + day = HistoryKDatasUtils.get_previous_trading_date(day) + data_list = list(block_info.KPLLimitUpDataRecordManager.list_all(day)) + codes_set = set() + if data_list: + for d in data_list: + if len(d[4]) > 6: + codes_set.add(d[3]) + data_dict[day] = list(codes_set) + return_str = json.dumps({"code": 0, "data": data_dict}) + elif type == 80: + # 鎾ゅ崟 + data = json.loads(_str) + code = data["data"]["code"] + if code: + state = trade_manager.CodesTradeStateManager().get_trade_state(code) + if state == trade_manager.TRADE_STATE_BUY_PLACE_ORDER or state == trade_manager.TRADE_STATE_BUY_DELEGATED or state == trade_manager.TRADE_STATE_BUY_CANCEL_ING: + try: + l2_data_manager_new.L2TradeDataProcessor.cancel_buy(code, "鎵嬪姩鎾ら攢") + return_str = json.dumps({"code": 0}) + except Exception as e: + return_str = json.dumps({"code": 2, "msg": str(e)}) + else: + return_str = json.dumps({"code": 1, "msg": "鏈浜庡彲鎾ゅ崟鐘舵��"}) + else: + return_str = json.dumps({"code": 1, "msg": "璇蜂笂浼犱唬鐮�"}) + + elif type == 82: + # 鑾峰彇濮旀墭鍒楄〃 + data = json.loads(_str) + update_time = data["data"]["update_time"] + results, update_time = huaxin_trade_record_manager.DelegateRecordManager.list_by_day( + tool.get_now_date_str("%Y%m%d"), update_time) + return_str = json.dumps( + {"code": 0, "data": {"list": results, "updateTime": update_time}, "msg": "璇蜂笂浼犱唬鐮�"}) + + elif type == 201: + # 鍔犲叆榛戝悕鍗� + data = json.loads(_str) + codes = data["data"]["codes"] + for code in codes: + l2_trade_util.forbidden_trade(code) + name = gpcode_manager.get_code_name(code) + if not name: + results = HistoryKDatasUtils.get_gp_codes_names([code]) + if results: + gpcode_manager.CodesNameManager.add_first_code_name(code, results[code]) + + return_str = json.dumps({"code": 0}) + self.__notify_trade("black_list") + elif type == 202: + # 鍔犲叆鐧藉悕鍗� + data = json.loads(_str) + codes = data["data"]["codes"] + try: + for code in codes: + # 鑷敱娴侀�氬競鍊�>50浜�,鑲′环楂樹簬30鍧楃殑涓嶈兘鍔犵櫧鍚嶅崟 + # limit_up_price = gpcode_manager.get_limit_up_price(code) + # if float(limit_up_price) > 30: + # raise Exception("鑲′环楂樹簬30鍏�") + # zyltgb = global_util.zyltgb_map.get(code) + # if zyltgb is None: + # global_data_loader.load_zyltgb() + # zyltgb = global_util.zyltgb_map.get(code) + # if zyltgb > 50 * 100000000: + # raise Exception("鑷敱娴侀�氳偂鏈ぇ浜�50浜�") + + l2_trade_util.WhiteListCodeManager().add_code(code) + name = gpcode_manager.get_code_name(code) + if not name: + results = HistoryKDatasUtils.get_gp_codes_names([code]) + if results: + gpcode_manager.CodesNameManager.add_first_code_name(code, results[code]) + return_str = json.dumps({"code": 0}) + except Exception as e: + return_str = json.dumps({"code": 1, "msg": str(e)}) + self.__notify_trade("white_list") + + elif type == 203: + # 绉婚櫎榛戝悕鍗� + data = json.loads(_str) + codes = data["data"]["codes"] + for code in codes: + l2_trade_util.remove_from_forbidden_trade_codes(code) + return_str = json.dumps({"code": 0}) + self.__notify_trade("black_list") + elif type == 204: + # 绉婚櫎鐧藉悕鍗� + data = json.loads(_str) + codes = data["data"]["codes"] + for code in codes: + l2_trade_util.WhiteListCodeManager().remove_code(code) + return_str = json.dumps({"code": 0}) + self.__notify_trade("white_list") + elif type == 301: + # 榛戝悕鍗曞垪琛� + codes = l2_trade_util.BlackListCodeManager().list_codes() + datas = [] + for code in codes: + name = gpcode_manager.get_code_name(code) + datas.append(f"{name}:{code}") + return_str = json.dumps({"code": 0, "data": datas}) + elif type == 302: + # 榛戝悕鍗曞垪琛� + codes = l2_trade_util.WhiteListCodeManager().list_codes() + datas = [] + for code in codes: + name = gpcode_manager.get_code_name(code) + datas.append(f"{name}:{code}") + return_str = json.dumps({"code": 0, "data": datas}) + + elif type == 401: + # 鍔犲叆鎯宠涔� + data = json.loads(_str) + codes = data["data"]["codes"] + for code in codes: + gpcode_manager.WantBuyCodesManager().add_code(code) + name = gpcode_manager.get_code_name(code) + if not name: + results = HistoryKDatasUtils.get_gp_codes_names([code]) + if results: + gpcode_manager.CodesNameManager.add_first_code_name(code, results[code]) + if "plates" in data["data"]: + for i in range(len(data["data"]["plates"])): + self.__KPLCodeLimitUpReasonManager.save_reason(codes[i], data["data"]["plates"][i]) + + return_str = json.dumps({"code": 0}) + self.__notify_trade("want_list") + elif type == 402: + data = json.loads(_str) + codes = data["data"]["codes"] + for code in codes: + gpcode_manager.WantBuyCodesManager().remove_code(code) + return_str = json.dumps({"code": 0}) + self.__notify_trade("want_list") + elif type == 403: + plate = None + include_codes = set() + if _str: + data = json.loads(_str) + plate = data.get("plate") + if plate: + code_map = self.__KPLCodeLimitUpReasonManager.list_all() + for k in code_map: + if code_map[k] == plate: + include_codes.add(k) + + codes = gpcode_manager.WantBuyCodesManager().list_code_cache() + datas = [] + for code in codes: + if plate and plate != '鍏朵粬' and code not in include_codes: + continue + name = gpcode_manager.get_code_name(code) + datas.append(f"{name}:{code}") + + return_str = json.dumps({"code": 0, "data": datas}) + elif type == 411: + data = json.loads(_str) + codes = data["data"]["codes"] + for code in codes: + gpcode_manager.PauseBuyCodesManager().add_code(code) + name = gpcode_manager.get_code_name(code) + if not name: + results = HistoryKDatasUtils.get_gp_codes_names([code]) + if results: + gpcode_manager.CodesNameManager.add_first_code_name(code, results[code]) + return_str = json.dumps({"code": 0}) + self.__notify_trade("pause_buy_list") + # 鍔犲叆鏆傚仠涔板叆鍒楄〃 + elif type == 412: + # 绉婚櫎鏆傚仠涔板叆鍒楄〃 + data = json.loads(_str) + codes = data["data"]["codes"] + for code in codes: + gpcode_manager.PauseBuyCodesManager().remove_code(code) + return_str = json.dumps({"code": 0}) + self.__notify_trade("pause_buy_list") + + elif type == 413: + # 鏆傚仠涔板叆鍒楄〃 + codes = gpcode_manager.PauseBuyCodesManager().list_code() + datas = [] + for code in codes: + name = gpcode_manager.get_code_name(code) + datas.append(f"{name}:{code}") + return_str = json.dumps({"code": 0, "data": datas}) + + elif type == 420: + # 鏄惁鍙互鎾ゅ崟 + data = json.loads(_str) + codes = data["data"]["codes"] + code = codes[0] + state = trade_manager.CodesTradeStateManager().get_trade_state(code) + if state != trade_manager.TRADE_STATE_BUY_CANCEL_SUCCESS and state != trade_manager.TRADE_STATE_BUY_SUCCESS: + return_str = json.dumps({"code": 0, "msg": "鍙互鍙栨秷"}) + else: + return_str = json.dumps({"code": 1, "msg": "涓嶅彲浠ュ彇娑�"}) + + elif type == 430: + # 鏌ヨ浠g爜灞炴�� + data = json.loads(_str) + code = data["data"]["code"] + # 鏌ヨ鏄惁鎯充拱鍗�/鐧藉悕鍗�/榛戝悕鍗�/鏆備笉涔� + code_name = gpcode_manager.get_code_name(code) + want = gpcode_manager.WantBuyCodesManager().is_in_cache(code) + white = l2_trade_util.WhiteListCodeManager().is_in_cache(code) + black = l2_trade_util.is_in_forbidden_trade_codes(code) + pause_buy = gpcode_manager.PauseBuyCodesManager().is_in_cache(code) + + desc_list = [] + if want: + desc_list.append("銆愭兂涔板崟銆�") + if white: + desc_list.append("銆愮櫧鍚嶅崟銆�") + if black: + desc_list.append("銆愰粦鍚嶅崟銆�") + if pause_buy: + desc_list.append("銆愭殏涓嶄拱銆�") + return_str = json.dumps( + {"code": 0, "data": {"code_info": (code, code_name), "desc": "".join(desc_list)}}) + + + elif type == 501: + data = json.loads(_str) + is_open = data["data"]["open"] + if is_open: + trade_manager.TradeStateManager().open_buy() + else: + trade_manager.TradeStateManager().close_buy() + return_str = json.dumps({"code": 0, "msg": ("寮�鍚垚鍔�" if is_open else "鍏抽棴鎴愬姛")}) + self.__notify_trade("trade_state") + elif type == 502: + can_buy = trade_manager.TradeStateManager().is_can_buy_cache() + return_str = json.dumps({"code": 0, "data": {"can_buy": can_buy}}) + elif type == 503: + # 璁剧疆浜ゆ槗鐩爣浠g爜鐨勬ā寮� + data = json.loads(_str) + mode = data["data"]["mode"] + try: + TradeTargetCodeModeManager().set_mode(mode) + return_str = json.dumps({"code": 0, "data": {"mode": mode}}) + except Exception as e: + return_str = json.dumps({"code": 1, "msg": str(e)}) + self.__notify_trade("trade_mode") + elif type == 504: + # 鑾峰彇浜ゆ槗鐩爣浠g爜妯″紡 + mode = TradeTargetCodeModeManager().get_mode_cache() + return_str = json.dumps({"code": 0, "data": {"mode": mode}}) + elif type == 601: + pass + # 鍔犺嚜閫� + elif type == 602: + pass + # 绉婚櫎鑷�� + sk.send(return_str.encode()) # print("----------handler end ----------") @@ -404,7 +886,7 @@ def send_msg(client_id, data): _ip = client_manager.getActiveClientIP(client_id) - print("ip", client_id, _ip) + # print("ip", client_id, _ip) if _ip is None or len(_ip) <= 0: raise Exception("瀹㈡埛绔疘P涓虹┖") socketClient = socket.socket(socket.AF_INET, socket.SOCK_STREAM) @@ -424,7 +906,7 @@ while True: clients = authority.get_l2_clients() for client in clients: - print("蹇冭烦", client) + # print("蹇冭烦", client) try: send_msg(client, {"action": "test"}) except: @@ -454,11 +936,11 @@ # 鍚屾鐩爣鏍囩殑鍒板悓鑺遍『 def sync_target_codes_to_ths(): - codes = gpcode_manager.get_gp_list() + codes = gpcode_manager.get_second_gp_list() code_list = [] for code in codes: code_list.append(code) - client = authority._get_client_ids_by_rule("client-industry") + client = authority._get_client_ids_by_rule("data-maintain") result = send_msg(client[0], {"action": "syncTargetCodes", "data": code_list}) return result @@ -469,11 +951,61 @@ result = json.loads(result) if result["code"] != 0: raise Exception(result["msg"]) + else: + # 娴嬮�熸垚鍔� + client_infos = [] + for index in range(0, constant.L2_CODE_COUNT_PER_DEVICE): + client_infos.append((client, index)) + l2_listen_pos_health_manager.init_all(client_infos) if __name__ == "__main__": - try: - thsl2tradequeuemanager().test() - # repair_ths_main_site(2) - except Exception as e: - print(str(e)) + + # 浜ゆ槗鎴愬姛鏃犳硶璇诲彇鏃跺鐢� + while True: + try: + datas = trade_juejin.get_execution_reports() + # 涓婁紶鏁版嵁 + fdatas = [] + for d in datas: + fdatas.append( + {"code": d[0], "money": d[4], "num": d[2], "price": d[3], "time": d[7], "trade_num": d[5], + "type": d[1] - 1}) + print(fdatas) + if fdatas: + try: + trade_manager.process_trade_success_data(fdatas) + except Exception as e: + logging.exception(e) + trade_manager.save_trade_success_data(fdatas) + except: + pass + time.sleep(1.5) + +if __name__ == "__main__1": + codes = gpcode_manager.get_first_gp_codes() + for code in codes: + try: + global_data_loader.load_zyltgb() + limit_up_price = float(gpcode_manager.get_limit_up_price(code)) + volumes_data = inited_data.get_volumns_by_code(code, 150) + volumes_data = volumes_data[1:] + volumes = inited_data.parse_max_volume(volumes_data[:60], + code_nature_analyse.is_new_top(limit_up_price, + volumes_data[:60])) + logger_first_code_record.info("{} 鑾峰彇鍒伴鏉�60澶╂渶澶ч噺锛歿}", code, volumes) + code_volumn_manager.set_histry_volumn(code, volumes[0], volumes[1], volumes[2]) + # 鍒ゆ柇K绾垮舰鎬� + k_format = code_nature_analyse.get_k_format( + limit_up_price, volumes_data) + print(k_format) + + code_nature_analyse.set_record_datas(code, + gpcode_manager.get_limit_up_price(code), + volumes_data) + except: + pass + + # code_nature_analyse.set_record_datas(code, + # limit_up_price, + # volumes_data) -- Gitblit v1.8.0