From 892b50e242e3c59a738b92dfdfee1bf1ff8932f2 Mon Sep 17 00:00:00 2001 From: Administrator <admin@example.com> Date: 星期五, 21 十月 2022 16:59:58 +0800 Subject: [PATCH] 新策略修改 --- l2_data_manager_new.py | 629 +++++++++++++ ths_data.py | 4 big_money_num_manager.py | 2 l2_trade_factor.py | 162 ++- trade_gui.py | 504 ++++++++++ limit_up_time_manager.py | 18 authority.py | 47 l2_data_util.py | 168 +++ tool.py | 23 trade_manager.py | 50 l2_trade_test.py | 29 global_util.py | 21 log.py | 66 + data_export_util.py | 23 data_process.py | 69 - mysql_data.py | 85 + alert_util.py | 10 server.py | 61 + juejin.py | 37 trade_queue_manager.py | 134 ++ l2_data_log.py | 9 gui.py | 93 + ths_industry_util.py | 66 + gpcode_manager.py | 40 l2_data_manager.py | 395 +++++++ 25 files changed, 2,453 insertions(+), 292 deletions(-) diff --git a/alert_util.py b/alert_util.py index f12cac2..5646f23 100644 --- a/alert_util.py +++ b/alert_util.py @@ -4,9 +4,15 @@ # 鎶ヨ +import tool + + def alarm(): - AlertUtil().stop_audio() - AlertUtil().play_audio() + if not tool.is_trade_time(): + return + # TODO 鏆傛椂鍏抽棴鎶ヨ + # AlertUtil().stop_audio() + # AlertUtil().play_audio() class AlertUtil: diff --git a/authority.py b/authority.py index 12dd29f..c976654 100644 --- a/authority.py +++ b/authority.py @@ -2,25 +2,39 @@ 鐢ㄦ埛楠岃瘉 """ -import mongo_data +import mysql_data # 鏂板鐢ㄦ埛 def add_user(id, account, pwd, rule): - _dict = {"_id": id, "account": account, "pwd": pwd, "rule": rule} - mongo_data.save_one("clients", _dict) + mysqldb = mysql_data.Mysqldb() + + result = mysqldb.select_one("select * from clients where _id={}".format(id)) + if result is None: + mysqldb.execute( + "insert into clients(_id,account,pwd,rule) values({},'{}','{}','{}')".format(id, account, pwd, rule)) + else: + mysqldb.execute("update clients set account='{}', pwd='{}', rule='{}' where _id={}".format(account, pwd, rule,id)) def add_rule(rule, authritys): - _dict = {"_id": rule, "authritys": authritys} - mongo_data.save_one("clients-authritys", _dict) + mysqldb = mysql_data.Mysqldb() + result = mysqldb.select_one("select * from clients_authritys where _id='{}'".format(rule)) + if result is None: + mysqldb.execute( + "insert into clients_authritys(_id,authritys) values('{}','{}')".format(id, authritys)) + else: + mysqldb.execute( + "update clients_authritys set authritys='{}' where _id={}".format(authritys.replace("\"", "\\" + "\""), + rule)) def _get_client_ids_by_rule(rule): - results = mongo_data.find("clients", {"rule": rule}) + mysqldb = mysql_data.Mysqldb() + results = mysqldb.select_all("select * from clients where rule='{}'".format(rule)) _ids = [] for result in results: - _ids.append(result["_id"]) + _ids.append(result[0]) return _ids @@ -31,15 +45,16 @@ # 鐧诲綍锛岃繑鍥炵敤鎴稩D涓庢潈闄� def login(account, pwd): - results = mongo_data.find("clients", {"account": account}) - if mongo_data.count("clients", {"account": account}) == 0: + mysqldb = mysql_data.Mysqldb() + results = mysqldb.select_all("select * from clients where account='{}'".format(account)) + if len(results) <= 0: raise Exception("璐﹀彿涓嶅瓨鍦�") result = results[0] - if result["pwd"] != pwd: + if result[2] != pwd: raise Exception("瀵嗙爜閿欒") - results = mongo_data.find("clients-authritys", {"_id": result["rule"]}) - return result["_id"], results[0]["authritys"] + results_ = mysqldb.select_one("select * from clients_authritys where _id='{}'".format(result[3])) + return result[0], results_[1] if __name__ == '__main__': @@ -49,15 +64,17 @@ # add_rule("uploader", ["code_upload"]) # # - # add_user(1, "super", "123456", "super") + #add_user(10, "super1", "123456", "super") # add_user(2, "client1", "123456", "client-l2") # add_user(3, "client2", "123456", "client-l2") # add_user(4, "client3", "123456", "client-l2") # add_user(5, "client2", "123456", "client-industry") # add_user(6, "admin", "123456", "uploader") + get_l2_clients() + try: - print(login("client1", "12345")) + print(login("client1", "1234567")) except Exception as e: - print() + print(str(e)) print(get_l2_clients()) diff --git a/big_money_num_manager.py b/big_money_num_manager.py index f9458a0..7e1b78f 100644 --- a/big_money_num_manager.py +++ b/big_money_num_manager.py @@ -30,7 +30,7 @@ num = redis.get("big_money-{}".format(code)) if num is None: return 0 - return int(num) + return round(int(num)/1000/4) if __name__ == "__main__": diff --git a/data_export_util.py b/data_export_util.py index 09c5865..7cc574b 100644 --- a/data_export_util.py +++ b/data_export_util.py @@ -1,6 +1,7 @@ """ 鏁版嵁瀵煎嚭宸ュ叿 """ +import json import os import time @@ -10,6 +11,13 @@ def export_l2_data(code, datas, dest_dir="D:/export/l2"): local_time = time.strftime("%Y%m%dT%H%M%S", time.localtime(time.time())) file_name = "{}/{}_{}.xls".format(dest_dir, code, local_time) + file_name_txt = "{}/{}_{}.txt".format(dest_dir, code, local_time) + openfile = open(file_name_txt,'w') + try: + for data in datas: + openfile.write(json.dumps(data)+"\n") + finally: + openfile.close() wb = xlwt.Workbook() ws = wb.add_sheet('sheet1') ws.write(0, 0, '搴忓彿') @@ -37,7 +45,10 @@ ws.write(index, 2, cancel_time) ws.write(index, 3, data["val"]["price"]) - ws.write(index, 4, data["val"]["num"]) + if int(data["val"]["operateType"]) == 1 or int(data["val"]["operateType"]) == 2: + ws.write(index, 4, 0-int(data["val"]["num"])) + else: + ws.write(index, 4, int(data["val"]["num"])) limit_price="" if int(data["val"]["limitPrice"]) == 1: @@ -55,6 +66,16 @@ ws.write(index, 5, '涔版挙 ({})'.format(limit_price)) else: ws.write(index, 5, '涔版挙') + elif int(data["val"]["operateType"]) == 2: + if len(limit_price) > 0: + ws.write(index, 5, '鍗� ({})'.format(limit_price)) + else: + ws.write(index, 5, '鍗�') + elif int(data["val"]["operateType"]) == 3: + if len(limit_price) > 0: + ws.write(index, 5, '鍗栨挙 ({})'.format(limit_price)) + else: + ws.write(index, 5, '鍗栨挙') ws.write(index, 6, data["re"]) wb.save(file_name) return file_name diff --git a/data_process.py b/data_process.py index 2dafd7d..fbe6a51 100644 --- a/data_process.py +++ b/data_process.py @@ -5,9 +5,9 @@ import time as t import authority +import mysql_data import redis_manager import gpcode_manager -import mongo_data # 缁熻浠婃棩鍗栧嚭 # 缁熻浠婃棩涔板叆 @@ -17,27 +17,6 @@ __redisManager = redis_manager.RedisManager(0) - -def _mysql_insert_data(day, code, item, conn): - try: - with conn.cursor() as cursor: - sql = f"insert into level2_data(day,code,time,price,num,limit_price,operate_type,cancel_time,cancel_time_unit, md5,create_time) values ('{day}','{code}','{item['time']}','{item['price']}',{item['num']},{item['limitPrice']},{item['operateType']},{item['cancelTime']},{item['cancelTimeUnit']},'{item['md5']}',now())" - print(sql) - cursor.execute(sql) - conn.commit() - except Exception as e: - conn.rollback() - - -def _mysql_update_data(item, conn): - try: - with conn.cursor() as cursor: - sql = "update level2_data set re = {}, update_time=now() where md5='{}'".format(item['re'], item['md5']) - print(sql) - cursor.execute(sql) - conn.commit() - except Exception as e: - conn.rollback() def parse(str): @@ -117,53 +96,21 @@ return json.loads(data_str) -def _getIndustry(datas): - ors = [] - codes = set() - for data in datas: - codes.add(data["code"]) - - for code in codes: - ors.append({'first_code': code}) - result = mongo_data.find("ths-industry", {'$or': ors}) - - _fname = None - for a in result: - _fname = a["_id"] - break - print("鏈�缁堢殑浜岀骇琛屼笟鍚嶇О涓猴細", _fname) - return _fname - - -def saveIndustryCode(datasList): - for datas in datasList: - # 鏌ヨ杩欐壒鏁版嵁鎵�灞炶涓� - industry_name = _getIndustry(datas); - _list = [] - for data in datas: - # 淇濆瓨 - _dict = {"_id": data["code"]} - _dict["second_industry"] = industry_name - _dict["zyltgb"] = data["zyltgb"] - _dict["zyltgb_unit"] = data["zyltgb_unit"] - _dict["update_time"] = int(round(t.time() * 1000)) - _list.append(_dict) - mongo_data.save("ths-industry-codes", _list) - - # 淇濆瓨鑷敱娴侀�氬競鍊� def saveZYLTSZ(datasList): - redis = __redisManager.getRedis() - _list = [] + mysqldb = mysql_data.Mysqldb() for data in datasList: # 淇濆瓨 _dict = {"_id": data["code"], "zyltgb": data["zyltgb"], "zyltgb_unit": data["zyltgb_unit"], "update_time": int(round(t.time() * 1000))} if float(data["zyltgb"]) > 0: - _list.append(_dict) # 淇濆瓨10澶� ZYLTGBUtil.save(data["code"], data["zyltgb"], data["zyltgb_unit"]) - mongo_data.save("ths-zylt", _list) + result = mysqldb.select_one("select * from ths_zylt where _id='{}'".format(data["code"])) + if result is None: + mysqldb.execute("insert into ths_zylt(_id,zyltgb,zyltgb_unit,update_time) values ('{}','{}',{},{})".format(data["code"],data["zyltgb"],data["zyltgb_unit"],round(t.time()*1000))) + else: + mysqldb.execute("update ths_zylt set zyltgb='{}',zyltgb_unit={},update_time={} where _id='{}'".format(data["zyltgb"],data["zyltgb_unit"],round(t.time()*1000),data["code"])) def saveClientActive(client_id, host, thsDead): @@ -182,7 +129,7 @@ _id = k.split("client-active-")[1] # 瀹㈡埛绔悓鑺遍『娌″崱姝绘墠鑳藉姞鍏� if not ths_util.is_ths_dead(_id): - client_ids.append(_id) + client_ids.append(int(_id)) l2_clients = authority.get_l2_clients() return list(set(client_ids).intersection(set(l2_clients))) diff --git a/global_util.py b/global_util.py index bbc3194..9502d52 100644 --- a/global_util.py +++ b/global_util.py @@ -3,16 +3,14 @@ """ # 浠g爜琛屼笟鏄犲皠 -import pymongo -import mongo_data import code_volumn_manager import gpcode_manager import ths_industry_util from code_data_util import ZYLTGBUtil -TEST = False +TEST = True code_industry_map = {} # 琛屼笟浠g爜鏄犲皠 @@ -23,6 +21,12 @@ today_limit_up_codes = {} # 琛屼笟鐑害鎸囨暟 industry_hot_num = {} +# 娑ㄥ仠鑲$エ鐨勬定骞� +limit_up_codes_percent = {} + +# 鍚嶇О浠g爜鏄犲皠 +name_codes = {} + # 浠婃棩閲� today_volumn = {} # 60鏃ユ渶澶ч噺 @@ -39,6 +43,7 @@ load_volumn() load_zyltgb() load_industry() + load_name_codes() # 鍔犺浇琛屼笟鏁版嵁 @@ -59,6 +64,14 @@ zyltgb_map[code] = result +# 鍔犺浇鍚嶇О浠g爜闅愬皠 +def load_name_codes(): + dict_ = gpcode_manager.get_name_codes() + if dict_: + for key in dict_: + name_codes[key] = dict_[key] + + # 鍔犺浇閲� def load_volumn(): codes = gpcode_manager.get_gp_list() @@ -72,6 +85,8 @@ # 娣诲姞浠婃棩娑ㄥ仠鏁版嵁 def add_limit_up_codes(datas, clear=False): + if datas is None: + return if clear: today_limit_up_codes.clear() # 娑ㄥ仠鏁伴噺 diff --git a/gpcode_manager.py b/gpcode_manager.py index 6c0e3fd..9a0f239 100644 --- a/gpcode_manager.py +++ b/gpcode_manager.py @@ -20,22 +20,49 @@ # 鑾峰彇鍩烘湰淇℃伅 code_datas = juejin.JueJinManager.get_gp_latest_info(gpset) codes = [] + name_codes = {} for _data in code_datas: # 姝e父鐨勮偂绁� if _data["sec_type"] == 1 and _data["sec_level"] == 1: code = _data["symbol"].split(".")[1] if code.find("30") != 0 and code.find("68") != 0: + name = _data["sec_name"] codes.append(code) + # 淇濆瓨浠g爜瀵瑰簲鐨勫悕绉� + name_codes[name] = code redis_instance = __redisManager.getRedis() # 鍒犻櫎涔嬪墠鐨� redis_instance.delete("gp_list") + redis_instance.delete("gp_list_names") for d in codes: redis_instance.sadd("gp_list", d) + redis_instance.set("gp_list_names", json.dumps(name_codes)) + + +# 鑾峰彇鍚嶇О瀵瑰簲鐨勪唬鐮� +def get_name_code(name): + redis_instance = __redisManager.getRedis() + val = redis_instance.get("gp_list_names") + if not val: + return None + val = json.loads(val) + return val.get(name) + + +def get_name_codes(): + redis_instance = __redisManager.getRedis() + val = redis_instance.get("gp_list_names") + if not val: + return None + val = json.loads(val) + return val # 娑ㄥ仠鐘佺鍧� def set_limit_up_list(gpset): + if gpset is None: + return # 淇濆瓨鍒板唴瀛樹腑 global_util.add_limit_up_codes(gpset) # 鑾峰彇鍩烘湰淇℃伅 @@ -45,7 +72,7 @@ for d in gpset: redis_instance.sadd("gp_limit_up_list", json.dumps(d)) redis_instance.expire("gp_limit_up_list", tool.get_expire()) - redis_instance.setex("gp_limit_up_list_update_time",tool.get_expire(),round( time.time()*1000)) + redis_instance.setex("gp_limit_up_list_update_time", tool.get_expire(), round(time.time() * 1000)) # 鑾峰彇娑ㄥ仠鍒楄〃 @@ -92,7 +119,7 @@ # 璁剧疆鏀剁洏浠� def set_price_pre(code, price): - codes= get_gp_list() + codes = get_gp_list() if code not in codes: return redis_instance = __redisManager.getRedis() @@ -145,11 +172,10 @@ # 鏍规嵁浣嶇疆鑾峰彇姝e湪鐩戝惉鐨勪唬鐮� def get_listen_code_by_pos(client_id, pos): redis_instance = __redisManager.getRedis() - key="listen_code-{}-{}".format(client_id, pos) + key = "listen_code-{}-{}".format(client_id, pos) value = redis_instance.get(key) # print("redis:", key,value) return value - # 璁剧疆浣嶇疆鐨勭洃鍚唬鐮� @@ -221,7 +247,7 @@ def is_listen_full(): clients = data_process.getValidL2Clients() codes = get_listen_codes() - return len(codes) >= 8*len(clients) + return len(codes) >= 8 * len(clients) # 鏄惁姝e湪鎿嶄綔 @@ -248,5 +274,5 @@ # print(is_listen_full()) # print(is_listen("002271")) # print(get_listen_code_pos("002272")) - code= get_listen_code_by_pos(2, 7) - print(code) \ No newline at end of file + code = get_listen_code_by_pos(2, 7) + print(code) diff --git a/gui.py b/gui.py index 8bfe3ce..d4fc3aa 100644 --- a/gui.py +++ b/gui.py @@ -12,8 +12,9 @@ import multiprocessing import global_util +import log +import mysql_data import redis_manager -import mongo_data import server import trade_gui from l2_code_operate import L2CodeOperate @@ -22,12 +23,39 @@ from server import * -def createServer(pipe): +# 璇诲彇server杩涚▼鐨勬秷鎭� +def __read_server_pipe(pipe): + while True: + value = pipe.recv() + if value is not None: + value = json.loads(value) + if value.get("type") == "clear_l2": + code = value["data"]["code"] + print("娓呴櫎l2鏁版嵁", code) + if len(code) != 6: + continue + l2_data_manager.clear_l2_data(code) + # 鍒犻櫎level2鐨勬暟鎹� + if l2_data_manager.local_today_datas and code in l2_data_manager.local_today_datas: + l2_data_manager.local_today_datas.pop(code) + if l2_data_manager.local_latest_datas and code in l2_data_manager.local_latest_datas: + l2_data_manager.local_latest_datas.pop(code) + + time.sleep(0.1) + + +def createServer(pipe_juejin, pipe_gui): print("create SocketServer") # 鍒濆鍖栧弬鏁� global_util.init() + + t1 = threading.Thread(target=lambda: __read_server_pipe(pipe_gui)) + # 鍚庡彴杩愯 + t1.setDaemon(True) + t1.start() + laddr = "", 9001 - tcpserver = MyThreadingTCPServer(laddr, MyBaseRequestHandle, pipe=pipe) # 娉ㄦ剰锛氬弬鏁版槸MyBaseRequestHandle + tcpserver = MyThreadingTCPServer(laddr, MyBaseRequestHandle, pipe_juejin=pipe_juejin) # 娉ㄦ剰锛氬弬鏁版槸MyBaseRequestHandle # tcpserver.handle_request() # 鍙帴鍙椾竴涓鎴风杩炴帴 tcpserver.serve_forever() # 姘镐箙寰幆鎵ц,鍙互鎺ュ彈澶氫釜瀹㈡埛绔繛鎺� @@ -37,11 +65,17 @@ class GUI: + + def __init__(self): p1, p2 = multiprocessing.Pipe() + gs_gui_pipe, gs_server_pipe = multiprocessing.Pipe() - self.serverProcess = multiprocessing.Process(target=createServer, args=(p1,)) + self.serverProcess = multiprocessing.Process(target=createServer, args=(p1, gs_server_pipe,)) self.jueJinProcess = multiprocessing.Process(target=startJueJin, args=(p2,)) + self.p1 = p1 + self.p2 = p2 + self.gs_gui_pipe = gs_gui_pipe # L2鏄剧ず self.l2_codes = {} # 鑾峰彇l2鐨勫鎴风鍒楄〃 @@ -51,6 +85,19 @@ for i in range(0, 8): code = gpcode_manager.get_listen_code_by_pos(client_id, i) self.l2_codes[client_id].append(code) + + # 璇诲彇server杩涚▼鐨勬秷鎭� + def __read_gui_server_pipe(self,pipe): + while True: + value = pipe.recv() + if value is not None: + value = json.loads(value) + if value.get("type") == "l2_data_notify": + code = value["data"]["code"] + count =value["data"]["count"] + print("l2鏁版嵁閫氱煡锛歿}-{}", code,count) + + time.sleep(0.1) def run(self): # TODO @@ -63,6 +110,11 @@ # 瀹㈡埛绔痵erver杩炴帴 t1 = threading.Thread(target=lambda: server.test_client_server()) + # 鍚庡彴杩愯 + t1.setDaemon(True) + t1.start() + + t1 = threading.Thread(target=lambda: self.__read_gui_server_pipe(self.gs_gui_pipe)) # 鍚庡彴杩愯 t1.setDaemon(True) t1.start() @@ -93,12 +145,12 @@ _set_error_color(text, 1, error) # 楠岃瘉mongodb try: - count = mongo_data.count("clients", {}) - if count < 1: + counts = mysql_data.Mysqldb().select_one("select count(*) from clients") + if counts[0] < 1: raise Exception("") - text.insert(END, "mongodb杩炴帴鎴愬姛锛乗n") + text.insert(END, "mysql杩炴帴鎴愬姛锛乗n") except: - error = "mongodb杩炴帴澶辫触...\n" + error = "mysql杩炴帴澶辫触...\n" text.insert(END, error) _set_error_color(text, 2, error) pass @@ -221,18 +273,18 @@ # 缁樺埗浜ゆ槗鐘舵�� def __draw_trade_state(self, frame): def refresh_data(): - normal=True + normal = True if l2_code_operate.L2CodeOperate.is_read_queue_valid(): cl_queue.configure(text="姝e父", foreground="#008000") else: cl_queue.configure(text="寮傚父", foreground="#FF7F27") - normal=False + normal = False try: trade_gui.THSGuiTrade.checkEnv() cl_win.configure(text="姝e父", foreground="#008000") except Exception as e: normal = False - cl_win.configure(text="寮傚父:{}".format(str(e)),foreground="#FF7F27") + cl_win.configure(text="寮傚父:{}".format(str(e)), foreground="#FF7F27") # 鐘舵�佹湁闂锛岄渶瑕佹姤璀� if not normal: alert_util.alarm() @@ -247,11 +299,10 @@ pass time.sleep(2) - start_y=230 + start_y = 230 btn = Button(frame, text="鍒锋柊鐘舵��", command=refresh_data) btn.place(x=10, y=start_y) - auo_refresh = IntVar() ch1 = Checkbutton(frame, text='鑷姩鍒锋柊', variable=auo_refresh, onvalue=1, offvalue=0, background="#DDDDDD", @@ -260,7 +311,7 @@ auo_refresh.set(1) ch1.place(x=100, y=start_y) - y_=start_y+30 + y_ = start_y + 30 cl = Label(frame, text="鎿嶄綔闃熷垪鐘舵�侊細", bg="#DDDDDD") cl.place(x=10, y=y_) cl_queue = Label(frame, text="鏈煡", bg="#DDDDDD") @@ -687,6 +738,10 @@ m = L2TradeFactorUtil.compute_m_value(code) showinfo("鎻愮ず", "{}".format(m)) + def clear_l2(code): + self.gs_gui_pipe.send(json.dumps({"type": "clear_l2", "data": {"code": code}})) + pass + frame = Frame(root, {"height": 280, "width": 300, "bg": "#DDDDDD"}) frame.grid(row=2, column=2, rowspan=2, pady=5) btntext = StringVar() @@ -723,11 +778,17 @@ btn.place(x=220, y=100) btn = Button(frame, text="鑾峰彇m鍊�", command=lambda: compute_m(code.get())) - btn.place(x=10, y=120) + btn.place(x=10, y=130) + + btn = Button(frame, text="瀵煎嚭浜ゆ槗鏃ュ織", command=lambda: log.export_l2_log(code.get())) + btn.place(x=80, y=130) + + btn = Button(frame, text="娓呯┖l2鏁版嵁", command=lambda: clear_l2(code.get())) + btn.place(x=150, y=130) # 浜ゆ槗鎸夐挳 btn = Button(frame, textvariable=btntext, command=startJueJinGui) - btn.place(x=10, y=150) + btn.place(x=10, y=160) btntext.set("鍚姩鎺橀噾") btn = Button(frame, text="閲嶆柊璁㈤槄琛屾儏", command=resub) diff --git a/juejin.py b/juejin.py index 1c171c3..20379aa 100644 --- a/juejin.py +++ b/juejin.py @@ -6,6 +6,7 @@ import datetime import json +import logging import time as t import schedule @@ -24,10 +25,15 @@ import redis_manager import authority import decimal + +import trade_gui from l2_code_operate import L2CodeOperate +from l2_data_manager import L2LimitUpMoneyStatisticUtil from log import logger_juejin_tick, logger_system +from trade_queue_manager import JueJinBuy1VolumnManager redisManager = redis_manager.RedisManager() +__jueJinBuy1VolumnManager = JueJinBuy1VolumnManager() # 璁剧疆璐︽埛淇℃伅 @@ -173,8 +179,6 @@ def on_tick(context, tick): - if global_util.TEST: - return # print(tick["created_at"]) relative_timestamp = t.time() % (24 * 60 * 60) + 8 * 60 * 60 # 9鐐�20-15:05鎺ュ彈鏁版嵁 @@ -193,6 +197,16 @@ # 淇濆瓨鏈�鏂颁环 symbol = symbol.split(".")[1] + time_ = tick["created_at"].strftime("%H:%M:%S") + data_=(symbol,time_,tick["quotes"][0]["bid_v"], tick["quotes"][0]["bid_p"]) + logger_juejin_tick.info("涔�1閲� {},{},{},{}", data_[1], data_[0], data_[2], + data_[3]) + need_sync = __jueJinBuy1VolumnManager.save(data_[0], data_[1], data_[2],data_[3]) + if need_sync: + # 鍚屾鏁版嵁 + L2LimitUpMoneyStatisticUtil.verify_num(data_[0], data_[2], data_[1]) + + # print(tick["created_at"],tick["quotes"][0]["bid_v"]) accpt_price(symbol, price) __prices_now[symbol] = price @@ -220,6 +234,7 @@ # 鑾峰彇鍒扮幇浠� def accpt_prices(prices): + print("浠锋牸浠g爜鏁伴噺锛�", len(prices)) now_str = datetime.datetime.now().strftime("%H:%M:%S") now_strs = now_str.split(":") now_second = int(now_strs[0]) * 60 * 60 + int(now_strs[1]) * 60 + int(now_strs[2]) @@ -237,7 +252,7 @@ # 鑾峰彇鏀剁洏浠� pricePre = gpcode_manager.get_price_pre(code) if pricePre is not None: - rate = round((price - pricePre) * 100 / pricePre, 1) + rate = round((price - pricePre) * 100 / pricePre, 2) if rate >= 0: # 鏆傚瓨娑ㄥ箙涓烘鐨勪唬鐮� _code_list.append((rate, code)) @@ -246,9 +261,23 @@ _delete_list.append((rate, code)) # 鎺掑簭 new_code_list = sorted(_code_list, key=lambda e: e.__getitem__(0), reverse=True) + # 棰勫~鍏呬笅鍗曚唬鐮� + _buy_win_codes = [] + for d in new_code_list: + _buy_win_codes.append(d[1]) + for d in _delete_list: + _buy_win_codes.append(d[1]) + try: + trade_gui.THSBuyWinManagerNew.fill_codes(_buy_win_codes) + except Exception as e: + logging.exception(e) + pass + client_ids = data_process.getValidL2Clients() # 鏈�澶氬~鍏呯殑浠g爜鏁伴噺 max_count = len(client_ids) * 8 + if max_count == 0: + max_count = 8 # 鎴彇鍓嶅嚑涓唬鐮佸~鍏� add_list = new_code_list[:max_count] # 鍚庨潰鐨勪唬鐮佸叏閮ㄥ垹闄� @@ -272,8 +301,6 @@ for code in add_code_list: if not gpcode_manager.is_listen(code): L2CodeOperate.get_instance().add_operate(1, code, "鐜颁环鍙樺寲") - - print(add_code_list, del_list) diff --git a/l2_data_log.py b/l2_data_log.py new file mode 100644 index 0000000..a0e7732 --- /dev/null +++ b/l2_data_log.py @@ -0,0 +1,9 @@ +# l2鏁版嵁鐨勬棩蹇� +import time + +import log + + +def l2_time(code, time_, description, new_line=False): + log.logger_l2_process_time.info("{}: {}-{}{}", description, code, time_, "\n" if new_line else "") + return int(time.time() * 1000) diff --git a/l2_data_manager.py b/l2_data_manager.py index c19499f..a5eca4d 100644 --- a/l2_data_manager.py +++ b/l2_data_manager.py @@ -73,9 +73,9 @@ _key = "buy_compute_index_info-{}".format(code) _data_json = redis.get(_key) if _data_json is None: - return None, None, None, 0 + return None, None, None, 0, 0 _data = json.loads(_data_json) - return _data[0], _data[1], _data[2], _data[3] + return _data[0], _data[1], _data[2], _data[3], _data[4] # 璁剧疆涔板叆鐐圭殑鍊� # buy_single_index 涔板叆淇″彿浣� @@ -83,16 +83,16 @@ # compute_index 璁$畻浣嶇疆 # nums 绱绾拱棰� @staticmethod - def set_buy_compute_start_data(code, buy_single_index, buy_exec_index, compute_index, nums): + def set_buy_compute_start_data(code, buy_single_index, buy_exec_index, compute_index, nums, count): redis = TradePointManager.__get_redis() expire = tool.get_expire() _key = "buy_compute_index_info-{}".format(code) if buy_single_index is not None: - redis.setex(_key, expire, json.dumps((buy_single_index, buy_exec_index, compute_index, nums))) + redis.setex(_key, expire, json.dumps((buy_single_index, buy_exec_index, compute_index, nums, count))) else: - _buy_single_index, _buy_exec_index, _compute_index, _nums = TradePointManager.get_buy_compute_start_data( + _buy_single_index, _buy_exec_index, _compute_index, _nums, _count = TradePointManager.get_buy_compute_start_data( code) - redis.setex(_key, expire, json.dumps((_buy_single_index, buy_exec_index, compute_index, nums))) + redis.setex(_key, expire, json.dumps((_buy_single_index, buy_exec_index, compute_index, nums, count))) # 鑾峰彇鎾や拱鍏ュ紑濮嬭绠楃殑淇℃伅 # 杩斿洖鏁版嵁鐨勫唴瀹逛负锛氭挙閿�鐐圭储寮� 鎾や拱绾拱棰� 璁$畻鐨勬暟鎹储寮� @@ -265,6 +265,16 @@ saveL2Data(code, add_datas) +# 娓呴櫎l2鏁版嵁 +def clear_l2_data(code): + redis_l2 = redis_manager.RedisManager(1).getRedis() + keys = redis_l2.keys("l2-{}-*".format(code)) + for k in keys: + redis_l2.delete(k) + + redis_l2.delete("l2-data-latest-{}".format(code)) + + class L2DataUtil: @classmethod def is_same_time(cls, time1, time2): @@ -363,10 +373,11 @@ else: limitPrice = 0 item["limitPrice"] = "{}".format(limitPrice) - # 涓嶉渶瑕侀潪娑ㄥ仠鏁版嵁/闈炶穼鍋滄暟鎹� - if int(item["limitPrice"]) == 0: - continue operateType = item["operateType"] + # 涓嶉渶瑕侀潪娑ㄥ仠涔颁笌涔版挙 + if int(item["limitPrice"]) != 1 and (int(operateType) == 0 or int(operateType) == 1): + continue + cancelTime = item["cancelTime"] cancelTimeUnit = item["cancelTimeUnit"] key = "{}-{}-{}-{}-{}-{}-{}-{}".format(code, operateType, time, num, price, limitPrice, cancelTime, @@ -380,6 +391,8 @@ dataIndexs.setdefault(key, len(datas) - 1) l2_data_util.save_big_data(code, same_time_num, data) return datas + + @classmethod def get_time_as_second(cls, time_str): @@ -406,6 +419,20 @@ return False return True + @classmethod + def is_limit_up_price_sell(cls, val): + if int(val["limitPrice"]) != 1: + return False + + if int(val["operateType"]) != 2: + return False + + price = float(val["price"]) + num = int(val["num"]) + if price * num * 100 < 50 * 10000: + return False + return True + # 鏄惁娑ㄥ仠涔版挙 @classmethod def is_limit_up_price_buy_cancel(cls, val): @@ -420,6 +447,20 @@ if price * num * 100 < 50 * 10000: return False return True + + # 鏄惁鍗栨挙 + @classmethod + def is_sell_cancel(cls, val): + if int(val["operateType"]) == 3: + return True + return False + + # 鏄惁涓哄崠 + @classmethod + def is_sell(cls, val): + if int(val["operateType"]) == 2: + return True + return False # L2浜ゆ槗鏁版嵁澶勭悊鍣� @@ -484,17 +525,17 @@ latest_time = add_datas[len(add_datas) - 1]["val"]["time"] # 鏃堕棿宸笉鑳藉お澶ф墠鑳藉鐞� # TODO 鏆傛椂鍏抽棴澶勭悊 - if L2DataUtil.is_same_time(now_time_str, latest_time): - # 鍒ゆ柇鏄惁宸茬粡鎸傚崟 - state = trade_manager.get_trade_state(code) - start_index = len(total_datas) - len(add_datas) - end_index = len(total_datas) - 1 - if state == trade_manager.TRADE_STATE_BUY_DELEGATED or state == trade_manager.TRADE_STATE_BUY_PLACE_ORDER: - # 宸叉寕鍗� - cls.__process_order(code, start_index, end_index, capture_timestamp) - else: - # 鏈寕鍗� - cls.__process_not_order(code, start_index, end_index, capture_timestamp) + # if L2DataUtil.is_same_time(now_time_str, latest_time): + # # 鍒ゆ柇鏄惁宸茬粡鎸傚崟 + # state = trade_manager.get_trade_state(code) + # start_index = len(total_datas) - len(add_datas) + # end_index = len(total_datas) - 1 + # if state == trade_manager.TRADE_STATE_BUY_DELEGATED or state == trade_manager.TRADE_STATE_BUY_PLACE_ORDER: + # # 宸叉寕鍗� + # cls.__process_order(code, start_index, end_index, capture_timestamp) + # else: + # # 鏈寕鍗� + # cls.__process_not_order(code, start_index, end_index, capture_timestamp) logger_l2_process.info("code:{} 澶勭悊鏁版嵁鑼冨洿: {}-{} 澶勭悊鏃堕棿:{}", code, add_datas[0]["index"], add_datas[-1]["index"], round(t.time() * 1000) - __start_time) # 淇濆瓨鏁版嵁 @@ -722,6 +763,8 @@ except Exception as e: cls.debug(code, "鎵ц涔板叆寮傚父:{}", str(e)) pass + finally: + cls.debug(code, "m鍊煎奖鍝嶅洜瀛愶細", l2_trade_factor.L2TradeFactorUtil.factors_to_string(code)) # 鏄惁鍙互涔� @classmethod @@ -774,10 +817,19 @@ # 鍒犻櫎澶х兢鎾や簨浠剁殑澶у崟 L2BetchCancelBigNumProcessor.del_recod(code) L2ContinueLimitUpCountManager.del_data(code) + if code in cls.unreal_buy_dict: cls.unreal_buy_dict.pop(code) + # 鍙栨秷涔板叆鏍囪瘑 + TradePointManager.delete_buy_point(code) + TradePointManager.delete_buy_cancel_point(code) + TradePointManager.delete_compute_info_for_cancel_buy(code) + TradePointManager.delete_count_info_for_cancel_buy(code) + # 鍒犻櫎澶х兢鎾や簨浠剁殑澶у崟 + L2BetchCancelBigNumProcessor.del_recod(code) else: cls.__cancel_buy(code) + L2BigNumProcessor.del_big_num_pos(code) @classmethod @@ -905,7 +957,7 @@ count += datas[i]["re"] if count >= continue_count: return True, start - else: + elif not L2DataUtil.is_limit_up_price_sell(_val): last_index = None count = 0 start = None @@ -931,7 +983,7 @@ start = i start_time = L2DataUtil.get_time_as_second(_val["time"]) count += datas[i]["re"] - else: + elif not L2DataUtil.is_limit_up_price_sell(_val): if count >= continue_count: return start, i - 1 start = -1 @@ -967,7 +1019,7 @@ start = i start_time = L2DataUtil.get_time_as_second(_val["time"]) count += int(datas[i]["re"]) - else: + elif not L2DataUtil.is_limit_up_price_sell(_val): if count >= continue_count: return start, i - 1 start = -1 @@ -1323,7 +1375,7 @@ @classmethod def test_can_order(cls): - code = "002393" + code = "000948" global_util.load_industry() limit_up_time_manager.load_limit_up_time() @@ -1618,8 +1670,9 @@ if need_cancel: # 闇�瑕佹挙鍗� # 鎾ゅ崟 - cls.__cancel_buy(code, max_num_data["index"]) - L2TradeDataProcessor.cancel_debug(code, "璺熻釜鍒板ぇ鍗曟棤鎾や拱淇″彿-{}锛屾柊璺熻釜鐨勫ぇ鍗曢渶瑕佹挙涔�-{}", index, max_num_data["index"]) + cls.__cancel_buy(code, max_num_data["index"] if cancel_data is None else cancel_data) + L2TradeDataProcessor.cancel_debug(code, "鍘熸潵璺熻釜鍒板ぇ鍗曟棤鎾や拱淇″彿-{}锛屾柊璺熻釜鐨勫ぇ鍗曢渶瑕佹挙涔�-{}", index, + max_num_data["index"]) return True, cancel_data else: # 鏃犻渶鎾ゅ崟 @@ -1695,8 +1748,8 @@ if i <= latest_buy_index: total_count += total_datas[i]["re"] L2TradeDataProcessor.debug(code, "澶х兢鎾ゅぇ鍗曟暟閲忥細{}/{}", count, total_count) - # 澶у崟灏忎簬5绗旀棤鑴戞挙 - if total_count <= 5: + # 澶у崟灏忎簬5绗旀棤鑴戞挙锛屽悗淇敼涓烘棤澶у崟鏃犺剳鎾� + if total_count <= 0: return True # 澶у崟鎾ゅ崟绗旀暟澶т簬鎬诲ぇ鍗曠瑪鏁扮殑1/5灏辨挙鍗� @@ -1788,6 +1841,287 @@ index_set.add(d[1]) big_nums_info_new.append(d) cls.__save_recod(code, max_big_num_info, big_nums_info_new) + + +# 鍗栬窡韪� +class L2SellProcessor: + @classmethod + def __get_recod(cls, code): + redis = _redisManager.getRedis() + _val = redis.get("sell_num-{}".format(code)) + if _val is None: + return None, None + else: + datas = json.loads(_val) + return datas[0], datas[1] + + @classmethod + def del_recod(cls, code): + redis = _redisManager.getRedis() + key = "sell_num-{}".format(code) + redis.delete(key) + + @classmethod + def __save_recod(cls, code, process_index, count): + redis = _redisManager.getRedis() + key = "sell_num-{}".format(code) + redis.setex(key, tool.get_expire(), json.dumps((process_index, count))) + + # 鏆傛椂寮冪敤 + @classmethod + def need_cancel(cls, code, start_index, end_index): + # 鏄惁闇�瑕佹挙鍗� + process_index, count = cls.__get_recod(code) + if process_index is None: + # 鏃犲崠鐨勪俊鎭� + return False + if count is None: + count = 0 + limit_up_price = gpcode_manager.get_limit_up_price(code) + if limit_up_price is None: + return False + if float(limit_up_price) * count * 100 >= l2_trade_factor.L2TradeFactorUtil.get_base_safe_val( + global_util.zyltgb_map[code]): + return True + return False + + @classmethod + def process(cls, code, start_index, end_index): + # 澶勭悊澶у崟 + # 鑾峰彇澶у崟鍒楄〃,澶у崟鏍煎紡涓�:((num,index,re),[(num,index,re),(num,index,re)]) + total_datas = local_today_datas[code] + process_index, count = cls.__get_recod(code) + # 瀵绘壘鏈�澶у�� + for index in range(start_index, end_index + 1): + # 鍙鐞嗘定鍋滃崠 + if not L2DataUtil.is_limit_up_price_sell( + total_datas[index]["val"]): + continue + # 涓嶅鐞嗗巻鍙叉暟鎹� + if process_index is not None and process_index >= index: + continue + if count is None: + count = 0 + count += int(total_datas[index]["val"]["num"]) + if process_index is None: + process_index = end_index + cls.__save_recod(code, process_index, count) + + +# 娑ㄥ仠灏佸崟棰濈粺璁� +class L2LimitUpMoneyStatisticUtil: + _redisManager = redis_manager.RedisManager(1) + + @classmethod + def __get_redis(cls): + return cls._redisManager.getRedis() + + # 璁剧疆l2鐨勬瘡涓�绉掓定鍋滃皝鍗曢鏁版嵁 + @classmethod + def __set_l2_second_money_record(cls, code, time, num, from_index, to_index): + old_num, old_from, old_to = cls.__get_l2_second_money_record(code, time) + if old_num is None: + old_num = num + old_from = from_index + old_to = to_index + else: + old_num += num + old_to = to_index + + key = "l2_limit_up_second_money-{}-{}".format(code, time.replace(":", "")) + + cls.__get_redis().setex(key, tool.get_expire(), json.dumps((old_num, old_from, old_to))) + + @classmethod + def __get_l2_second_money_record(cls, code, time): + key = "l2_limit_up_second_money-{}-{}".format(code, time.replace(":", "")) + val = cls.__get_redis().get(key) + return cls.__format_second_money_record_val(val) + + @classmethod + def __format_second_money_record_val(cls, val): + if val is None: + return None, None, None + val = json.loads(val) + return val[0], val[1], val[2] + + @classmethod + def __get_l2_second_money_record_keys(cls, code, time_regex): + key = "l2_limit_up_second_money-{}-{}".format(code, time_regex) + keys = cls.__get_redis().keys(key) + return keys + + # 璁剧疆l2鏈�鏂扮殑灏佸崟棰濇暟鎹� + @classmethod + def __set_l2_latest_money_record(cls, code, index, num): + key = "l2_limit_up_money-{}".format(code) + cls.__get_redis().setex(key, tool.get_expire(), json.dumps((num, index))) + + # 杩斿洖鏁伴噺,绱㈠紩 + @classmethod + def __get_l2_latest_money_record(cls, code): + key = "l2_limit_up_money-{}".format(code) + result = cls.__get_redis().get(key) + if result: + result = json.loads(result) + return result[0], result[1] + else: + return 0, -1 + + # 鐭鏁版嵁 + # 鐭鏂规硶涓哄彇鐭鏃堕棿涓や晶鐨勭鍒嗗竷鏁版嵁锛岀敤浜庣‘瀹氳绠楃粨鏉熷潗鏍� + @classmethod + def verify_num(cls, code, num, time_str): + time_ = time_str.replace(":", "") + key = None + for i in range(4, -2, -2): + # 鑾峰彇鏈�(鍒嗛挓/灏忔椂/澶�)鍐呯鍒嗗竷鏁版嵁 + time_regex = "{}*".format(time_[:i]) + keys_ = cls.__get_l2_second_money_record_keys(code, time_regex) + if keys_ and len(keys_) > 1: + # 闇�瑕佹帓搴� + keys = [] + for k in keys_: + keys.append(k) + keys.sort(key=lambda tup: int(tup.split("-")[-1])) + # 鏈�2涓厓绱� + for index in range(0, len(keys) - 1): + time_1 = keys[index].split("-")[-1] + time_2 = keys[index + 1].split("-")[-1] + if int(time_1) <= int(time_) <= int(time_2): + # 鍦ㄦ鏃堕棿鑼冨洿鍐� + if time_ == time_2: + key = keys[index + 1] + else: + key = keys[index] + break + if key: + val = cls.__get_redis().get(key) + old_num, old_from, old_to = cls.__format_second_money_record_val(val) + end_index = old_to + # 淇濆瓨鏈�杩戠殑鏁版嵁 + cls.__set_l2_latest_money_record(code, end_index, num) + break + + # 璁$畻閲忥紝鐢ㄤ簬娑ㄥ仠灏佸崟閲忕殑璁$畻 + @classmethod + def __compute_num(cls, code, data, buy_single_data): + if L2DataUtil.is_limit_up_price_buy_cancel(data["val"]) or L2DataUtil.is_sell(data["val"]): + # 娑ㄥ仠涔版挙涓庡崠 + return 0 - int(data["val"]["num"]) * data["re"] + else: + # 鍗栨挙 + if L2DataUtil.is_sell_cancel(data["val"]): + # 鍗栨挙鐨勪拱鏁版嵁鏄惁鍦ㄤ拱鍏ヤ俊鍙蜂箣鍓嶏紝濡傛灉鍦ㄤ箣鍓嶅氨涓嶈绠楋紝涓嶅湪涔嬪墠灏辫绠� + if l2_data_util.is_sell_index_before_target(data, buy_single_data, + local_today_num_operate_map.get(code)): + return 0 + + return int(data["val"]["num"]) * data["re"] + + @classmethod + def clear(cls, code): + key = "l2_limit_up_money-{}".format(code) + cls.__get_redis().delete(key) + + # 杩斿洖鍙栨秷鐨勬爣蹇楁暟鎹� + # with_cancel 鏄惁闇�瑕佸垽鏂槸鍚︽挙閿� + @classmethod + def process_data(cls, code, start_index, end_index, buy_single_begin_index, with_cancel=True): + start_time = round(t.time() * 1000) + total_datas = local_today_datas[code] + time_dict_num = {} + # 璁板綍璁$畻鐨勫潗鏍� + time_dict_num_index = {} + num_dict = {} + # 缁熻鏃堕棿鍒嗗竷 + time_dict = {} + for i in range(start_index, end_index + 1): + data = total_datas[i] + val = data["val"] + time_ = val["time"] + if time_ not in time_dict: + time_dict[time_] = i + + for i in range(start_index, end_index + 1): + data = total_datas[i] + val = data["val"] + time_ = val["time"] + if time_ not in time_dict_num: + time_dict_num[time_] = 0 + time_dict_num_index[time_] = {"s": i, "e": i} + time_dict_num_index[time_]["e"] = i + num = cls.__compute_num(code, data, total_datas[buy_single_begin_index]) + num_dict[i] = num + time_dict_num[time_] = time_dict_num[time_] + num + for t_ in time_dict_num: + cls.__set_l2_second_money_record(code, t_, time_dict_num[t_], time_dict_num_index[t_]["s"], + time_dict_num_index[t_]["e"]) + + print("淇濆瓨娑ㄥ仠灏佸崟棰濇椂闂达細", round(t.time() * 1000) - start_time) + + # 绱鏈�鏂扮殑閲戦 + total_num, index = cls.__get_l2_latest_money_record(code) + if index == -1: + # 娌℃湁鑾峰彇鍒版渶鏂扮殑鐭灏佸崟棰濓紝闇�瑕佷粠涔板叆淇″彿寮�濮嬬偣璁$畻 + index = buy_single_begin_index - 1 + total_num = 0 + # TODO 寰呬紭鍖栬绠� + cancel_index = None + cancel_msg = None + # 寰呰绠楅噺 + limit_up_price = gpcode_manager.get_limit_up_price(code) + min_volumn = round(10000000 / (limit_up_price * 100)) + # 涓嶅悓鏃堕棿鐨勬暟鎹紑濮嬪潗鏍� + time_start_index_dict = {} + # 鏁版嵁鏃堕棿鍒嗗竷 + time_list = [] + # 鍒板綋鍓嶆椂闂寸疮绉殑涔�1閲� + time_total_num_dict = {} + for i in range(index + 1, end_index + 1): + data = total_datas[i] + time_ = data["val"]["time"] + if time_ not in time_start_index_dict: + # 璁板綍姣忎竴绉掔殑寮�濮嬩綅缃� + time_start_index_dict[time_] = i + # 璁板綍鏃堕棿鍒嗗竷 + time_list.append(time_) + # 涓婁竴娈垫椂闂寸殑鎬绘暟 + time_total_num_dict[time_] = total_num + + val = num_dict.get(i) + if val is None: + val = cls.__compute_num(code, data, total_datas[buy_single_begin_index]) + total_num += val + # 濡傛灉鏄噺灏忛」锛屼笖鍦ㄥ鐞嗘暟鎹殑鑼冨洿鍐咃紝灏遍渶瑕佸垽鏂槸鍚﹁鎾ゅ崟浜� + if val < 0 and start_index <= i <= end_index: + # 绱灏佸崟閲戦灏忎簬1000涓� + if total_num < min_volumn: + cancel_index = i + cancel_msg = "灏佸崟閲戦灏忎簬1000涓�" + break + # 鐩搁偦2s鍐呯殑鏁版嵁鍑忓皬50% + # 涓�1s鐨勬�绘暟 + last_second_total_volumn = time_total_num_dict.get(time_list[-1]) + if last_second_total_volumn > 0 and ( + last_second_total_volumn - total_num) / last_second_total_volumn >= 0.5: + # 鐩搁偦2s鍐呯殑鏁版嵁鍑忓皬50% + cancel_index = i + cancel_msg = "鐩搁偦2s({})鍐呯殑灏佸崟閲忓噺灏�50%({}->{})".format(time_, last_second_total_volumn, + total_num) + break + if not with_cancel: + cancel_index = None + + print("灏佸崟棰濊绠楁椂闂达細", round(t.time() * 1000) - start_time) + process_end_index = end_index + if cancel_index: + process_end_index = cancel_index + # 淇濆瓨鏈�鏂扮疮璁¢噾棰� + # cls.__set_l2_latest_money_record(code, process_end_index, total_num) + if cancel_index: + return total_datas[cancel_index], cancel_msg + return None, None def __get_time_second(time_str): @@ -2035,4 +2369,7 @@ if __name__ == "__main__": - L2TradeDataProcessor.test_can_order() + # 澶勭悊鏁版嵁 + code = "002898" + load_l2_data(code) + L2LimitUpMoneyStatisticUtil.verify_num(code, 70582, "09:42:00") diff --git a/l2_data_manager_new.py b/l2_data_manager_new.py new file mode 100644 index 0000000..6138940 --- /dev/null +++ b/l2_data_manager_new.py @@ -0,0 +1,629 @@ +import datetime +import logging +import random +import time as t + +import big_money_num_manager +import data_process +import global_util +import gpcode_manager +import l2_data_log +import l2_data_manager +import l2_data_util +import l2_trade_factor +import l2_trade_test +import limit_up_time_manager +import log +import redis_manager +import ths_industry_util +import tool +import trade_manager +from l2_data_manager import L2DataException, TradePointManager, local_today_datas, L2DataUtil, load_l2_data, \ + local_today_num_operate_map, L2LimitUpMoneyStatisticUtil +from log import logger_l2_trade, logger_l2_trade_cancel, logger_l2_trade_buy, logger_l2_process + + +# TODO l2鏁版嵁绠$悊 +class L2DataManager: + # 鏍煎紡鍖栨暟鎹� + def format_data(self, datas): + format_datas = [] + for data in datas: + format_datas.append({"val": data, "re": 1}) + return format_datas + + # 鑾峰彇鏂板鏁版嵁 + def get_add_datas(self, format_datas): + pass + + # 浠庢暟鎹簱鍔犺浇鏁版嵁 + def load_data(self, code=None, force=False): + pass + + # 淇濆瓨鏁版嵁 + def save_datas(self, add_datas, datas): + pass + + +# m鍊煎ぇ鍗曞鐞� +class L2BigNumForMProcessor: + + def __init__(self): + self._redis_manager = redis_manager.RedisManager(1) + + def __get_redis(self): + return self._redis_manager.getRedis() + + # 淇濆瓨璁$畻寮�濮嬩綅缃� + def set_begin_pos(self, code, index): + if self.__get_begin_pos(code) is None: + # 淇濆瓨浣嶇疆 + key = "m_big_money_begin-{}".format(code) + self.__get_redis().setex(key, tool.get_expire(), index) + + # 鑾峰彇璁$畻寮�濮嬩綅缃� + def __get_begin_pos(self, code): + key = "m_big_money_begin-{}".format(code) + val = self.__get_redis().get(key) + if val is None: + return None + return int(val) + + # 娓呴櫎宸茬粡澶勭悊鐨勬暟鎹� + def clear_processed_end_index(self, code): + key = "m_big_money_process_index-{}".format(code) + self.__get_redis().delete(key) + + # 娣诲姞宸茬粡澶勭悊杩囩殑鍗� + def __set_processed_end_index(self, code, index): + key = "m_big_money_process_index-{}".format(code) + self.__get_redis().setex(key, tool.get_expire(), index) + + # 鏄惁宸茬粡澶勭悊杩� + def __get_processed_end_index(self, code): + key = "m_big_money_process_index-{}".format(code) + val = self.__get_redis().get(key) + if val is None: + return None + return int(val) + + # 澶勭悊澶у崟 + def process(self, code, start_index, end_index, limit_up_price): + + begin_pos = self.__get_begin_pos(code) + if begin_pos is None: + # 娌℃湁鑾峰彇鍒板紑濮嬩拱鍏ヤ俊鍙� + return + # 涓婃澶勭悊鍒扮殑鍧愭爣 + processed_index = self.__get_processed_end_index(code) + if processed_index is None: + processed_index = 0 + if processed_index >= end_index: + return + + start_time = round(t.time() * 1000) + total_datas = local_today_datas[code] + + num_splites = [round(5000 / limit_up_price), round(10000 / limit_up_price), round(20000 / limit_up_price), + round(30000 / limit_up_price)] + total_num = 0 + for i in range(max(start_index, processed_index), end_index + 1): + data = total_datas[i] + if not L2DataUtil.is_limit_up_price_buy_cancel(data["val"]) and not L2DataUtil.is_limit_up_price_buy( + data["val"]): + continue + # 濡傛灉鏄定鍋滀拱鎾や俊鍙烽渶瑕佺湅鏁版嵁浣嶇疆鏄惁姣斿紑濮嬪鐞嗘椂闂存棭 + if L2DataUtil.is_limit_up_price_buy_cancel(data["val"]): + # 鑾峰彇涔板叆淇″彿 + buy_index, buy_data = l2_data_util.get_buy_data_with_cancel_data(total_datas[i], + local_today_num_operate_map.get(code)) + if buy_index is not None and buy_index < begin_pos: + continue + + # 璁$畻鎴愪氦閲戦 + num = int(data["val"]["num"]) + temp = 0 + if num < num_splites[0]: + pass + elif num < num_splites[1]: + temp = 1 + elif num < num_splites[2]: + temp = round(4 / 3, 3) + elif num < num_splites[3]: + temp = 2 + else: + temp = 4 + count = int(temp * data["re"] * 1000) + if L2DataUtil.is_limit_up_price_buy_cancel(data["val"]): + count = 0 - count + total_num += count + self.__set_processed_end_index(code, end_index) + big_money_num_manager.add_num(code, total_num) + + print("m鍊煎ぇ鍗曡绠楄寖鍥达細{}-{} 鏃堕棿锛歿}".format(max(start_index, processed_index), end_index, + round(t.time() * 1000) - start_time)) + + +class L2TradeDataProcessor: + unreal_buy_dict = {} + random_key = {} + l2BigNumForMProcessor = L2BigNumForMProcessor() + + @classmethod + def debug(cls, code, content, *args): + logger_l2_trade.debug(("thread-id={} code={} ".format(cls.random_key[code], code) + content).format(*args)) + + @classmethod + def cancel_debug(cls, code, content, *args): + logger_l2_trade_cancel.debug( + ("thread-id={} code={} ".format(cls.random_key[code], code) + content).format(*args)) + + @classmethod + def buy_debug(cls, code, content, *args): + logger_l2_trade_buy.debug( + ("thread-id={} code={} ".format(cls.random_key[code], code) + content).format(*args)) + + @classmethod + # 鏁版嵁澶勭悊鍏ュ彛 + # datas: 鏈鎴浘鏁版嵁 + # capture_timestamp:鎴浘鏃堕棿鎴� + def process(cls, code, datas, capture_timestamp): + cls.random_key[code] = random.randint(0, 100000) + now_time_str = datetime.datetime.now().strftime("%H:%M:%S") + __start_time = round(t.time() * 1000) + try: + if len(datas) > 0: + # 鍒ゆ柇浠锋牸鍖洪棿鏄惁姝g‘ + if not data_process.is_same_code_with_price(code, float(datas[0]["val"]["price"])): + raise L2DataException(L2DataException.CODE_PRICE_ERROR, + "鑲′环涓嶅尮閰� code-{} price-{}".format(code, datas[0]["val"]["price"])) + # 鍔犺浇鍘嗗彶鏁版嵁 + l2_data_manager.load_l2_data(code) + # 绾犳鏁版嵁 + datas = l2_data_manager.L2DataUtil.correct_data(code, datas) + _start_index = 0 + if local_today_datas.get(code) is not None and len( + local_today_datas[code]) > 0: + _start_index = local_today_datas[code][-1]["index"] + 1 + add_datas = l2_data_manager.L2DataUtil.get_add_data(code, datas, _start_index) + if len(add_datas) > 0: + # 鎷兼帴鏁版嵁 + local_today_datas[code].extend(add_datas) + l2_data_util.load_num_operate_map(l2_data_manager.local_today_num_operate_map, code, add_datas) + total_datas = local_today_datas[code] + __start_time = l2_data_log.l2_time(code, round(t.time() * 1000) - __start_time, "l2鏁版嵁棰勫鐞嗘椂闂�") + if len(add_datas) > 0: + _start_time = round(t.time() * 1000) + latest_time = add_datas[len(add_datas) - 1]["val"]["time"] + # 鏃堕棿宸笉鑳藉お澶ф墠鑳藉鐞� + # TODO 鏆傛椂鍏抽棴澶勭悊 + if l2_data_manager.L2DataUtil.is_same_time(now_time_str, latest_time): + # 鍒ゆ柇鏄惁宸茬粡鎸傚崟 + state = trade_manager.get_trade_state(code) + start_index = len(total_datas) - len(add_datas) + end_index = len(total_datas) - 1 + if state == trade_manager.TRADE_STATE_BUY_DELEGATED or state == trade_manager.TRADE_STATE_BUY_PLACE_ORDER: + # 宸叉寕鍗� + cls.__process_order(code, start_index, end_index, capture_timestamp) + else: + # 鏈寕鍗� + cls.__process_not_order(code, start_index, end_index, capture_timestamp) + logger_l2_process.info("code:{} 澶勭悊鏁版嵁鑼冨洿: {}-{} 澶勭悊鏃堕棿:{}", code, add_datas[0]["index"], + add_datas[-1]["index"], round(t.time() * 1000) - __start_time) + __start_time = l2_data_log.l2_time(code, round(t.time() * 1000) - __start_time, "l2鏁版嵁澶勭悊鏃堕棿") + # 淇濆瓨鏁版嵁 + l2_data_manager.save_l2_data(code, datas, add_datas) + __start_time = l2_data_log.l2_time(code, round(t.time() * 1000) - __start_time, "淇濆瓨鏁版嵁鏃堕棿") + + finally: + if code in cls.unreal_buy_dict: + cls.unreal_buy_dict.pop(code) + + # 澶勭悊鏈寕鍗� + @classmethod + def __process_not_order(cls, code, start_index, end_index, capture_time): + # 鑾峰彇闃堝�� + threshold_money = cls.__get_threshmoney(code) + cls.__start_compute_buy(code, start_index, end_index, threshold_money, capture_time) + + @classmethod + def __statistic_count_l2_data_for_cancel(cls, code, start_index, end_index, has_cancel_single=False): + index, old_buy_count, old_cancel_count = l2_data_manager.TradePointManager.get_count_info_for_cancel_buy(code) + for i in range(start_index, end_index + 1): + buy_count, buy_cancel_count = cls.__count_l2_data_for_cancel(code, i, i) + old_buy_count += buy_count + + old_cancel_count += buy_cancel_count + if old_buy_count > 0 and (old_buy_count - old_cancel_count) / old_buy_count < 0.3 and has_cancel_single: + return i, True + l2_data_manager.TradePointManager.set_count_info_for_cancel_buy(code, end_index, old_buy_count, + old_cancel_count) + return end_index, False + + # 澶勭悊宸叉寕鍗� + @classmethod + def __process_order(cls, code, start_index, end_index, capture_time, new_add=True): + if start_index < 0: + start_index = 0 + + if end_index < start_index: + return + # 鑾峰彇涔板叆淇″彿璧峰鐐� + buy_single_index, buy_exec_index, buy_compute_index, num, count = cls.__get_order_begin_pos(code) + # 鎾ゅ崟璁$畻,鍙湅涔�1 + cancel_data, cancel_msg = L2LimitUpMoneyStatisticUtil.process_data(code, start_index, end_index, + buy_single_index) + + # 璁$畻m鍊煎ぇ鍗� + cls.l2BigNumForMProcessor.process(code, max(buy_single_index, start_index), end_index, + gpcode_manager.get_limit_up_price(code)) + + if cancel_data: + cls.debug(code, "瑙﹀彂鎾ゅ崟锛屾挙鍗曚綅缃細{} 锛屾挙鍗曞師鍥狅細{}", cancel_data["index"], cancel_msg) + # 鎾ゅ崟 + cls.cancel_buy(code) + # 缁х画璁$畻涓嬪崟 + cls.__process_not_order(code, cancel_data["index"] + 1, end_index, capture_time) + else: + # 濡傛灉鏈夎櫄鎷熶笅鍗曢渶瑕佺湡瀹炰笅鍗� + unreal_buy_info = cls.unreal_buy_dict.get(code) + if unreal_buy_info is not None: + cls.debug(code, "鏈夎櫄鎷熶笅鍗曪紝鏃犱拱鎾や俊鍙凤紝寮�濮嬫墽琛屼拱鍏ワ紝鎵ц浣嶇疆锛歿},鎴浘鏃堕棿锛歿}", unreal_buy_info[0], capture_time) + # unreal_buy_info 鐨勫唴瀹规牸寮忎负锛�(瑙︽硶涔版搷浣滀笅鏍�,鎴浘鏃堕棿) + # 鐪熷疄涓嬪崟 + cls.__buy(code, unreal_buy_info[1], local_today_datas[code][unreal_buy_info[0]], + unreal_buy_info[0]) + + @classmethod + def __buy(cls, code, capture_timestamp, last_data, last_data_index): + can, reason = cls.__can_buy(code) + # 涓嶈兘璐拱 + if not can: + cls.debug(code, "涓嶅彲浠ヤ笅鍗曪紝鍘熷洜锛歿}", reason) + return + else: + cls.debug(code, "鍙互涓嬪崟锛屽師鍥狅細{}", reason) + + # 鍒犻櫎铏氭嫙涓嬪崟 + if code in cls.unreal_buy_dict: + cls.unreal_buy_dict.pop(code) + cls.debug(code, "寮�濮嬫墽琛屼拱鍏�") + try: + trade_manager.start_buy(code, capture_timestamp, last_data, + last_data_index) + l2_data_manager.TradePointManager.delete_buy_cancel_point(code) + cls.debug(code, "鎵ц涔板叆鎴愬姛") + except Exception as e: + cls.debug(code, "鎵ц涔板叆寮傚父:{}", str(e)) + pass + finally: + cls.debug(code, "m鍊煎奖鍝嶅洜瀛愶細{}", l2_trade_factor.L2TradeFactorUtil.factors_to_string(code)) + + # 鏄惁鍙互涔� + @classmethod + def __can_buy(cls, code): + limit_up_time = limit_up_time_manager.get_limit_up_time(code) + if limit_up_time is not None and l2_data_manager.L2DataUtil.get_time_as_second( + limit_up_time) >= l2_data_manager.L2DataUtil.get_time_as_second( + "14:30:00"): + return False, "14:30鍚庢定鍋滅殑涓嶈兘涔帮紝娑ㄥ仠鏃堕棿涓簕}".format(limit_up_time) + + # 鍚屼竴鏉垮潡涓�佷簩鍚庨潰鐨勪笉鑳戒拱 + industry, codes = ths_industry_util.get_same_industry_codes(code, gpcode_manager.get_gp_list()) + if industry is None: + return True, "娌℃湁鑾峰彇鍒拌涓�" + codes_index = limit_up_time_manager.sort_code_by_limit_time(codes) + if codes_index is not None and codes_index.get(code) is not None and codes_index.get(code) > 1: + return False, "鍚屼竴鏉垮潡涓�佷笁,鑰佸洓,...涓嶈兘涔�" + + # 13:00鍚庢定鍋滐紝鏈澘鍧椾腑娑ㄥ仠绁ㄦ暟<29涓嶈兘涔� + limit_up_time = limit_up_time_manager.get_limit_up_time(code) + if limit_up_time is not None: + if int(limit_up_time.replace(":", "")) >= 130000 and global_util.industry_hot_num.get(industry) is not None: + if global_util.industry_hot_num.get(industry) < 29: + return False, "13:00鍚庢定鍋滐紝鏈澘鍧椾腑娑ㄥ仠绁ㄦ暟<29涓嶈兘涔�" + # 鑰佷簩锛屾湰鏉垮潡涓定鍋滅エ鏁�<29 涓嶈兘涔� + if codes_index.get(code) is not None and codes_index.get(code) == 1 and global_util.industry_hot_num.get( + industry) is not None: + if global_util.industry_hot_num.get(industry) < 29: + return False, "鑰佷簩锛屾湰鏉垮潡涓定鍋滅エ鏁�<29涓嶈兘涔�" + # 鍙互涓嬪崟 + return True, None + + @classmethod + def __cancel_buy(cls, code): + try: + cls.debug(code, "寮�濮嬫墽琛屾挙鍗�") + trade_manager.start_cancel_buy(code) + # 鍙栨秷涔板叆鏍囪瘑 + l2_data_manager.TradePointManager.delete_buy_point(code) + l2_data_manager.TradePointManager.delete_buy_cancel_point(code) + l2_data_manager.TradePointManager.delete_compute_info_for_cancel_buy(code) + l2_data_manager.TradePointManager.delete_count_info_for_cancel_buy(code) + # 鍒犻櫎澶х兢鎾や簨浠剁殑澶у崟 + l2_data_manager.L2BetchCancelBigNumProcessor.del_recod(code) + cls.debug(code, "鎵ц鎾ゅ崟鎴愬姛") + except Exception as e: + cls.debug(code, "鎵ц鎾ゅ崟寮傚父锛歿}", str(e)) + + @classmethod + def cancel_buy(cls, code): + # 鍒犻櫎澶х兢鎾や簨浠剁殑澶у崟 + l2_data_manager.L2BetchCancelBigNumProcessor.del_recod(code) + l2_data_manager.L2ContinueLimitUpCountManager.del_data(code) + + if code in cls.unreal_buy_dict: + cls.unreal_buy_dict.pop(code) + # 鍙栨秷涔板叆鏍囪瘑 + l2_data_manager.TradePointManager.delete_buy_point(code) + l2_data_manager.TradePointManager.delete_buy_cancel_point(code) + l2_data_manager.TradePointManager.delete_compute_info_for_cancel_buy(code) + l2_data_manager.TradePointManager.delete_count_info_for_cancel_buy(code) + # 鍒犻櫎澶х兢鎾や簨浠剁殑澶у崟 + l2_data_manager.L2BetchCancelBigNumProcessor.del_recod(code) + else: + cls.__cancel_buy(code) + + l2_data_manager.L2BigNumProcessor.del_big_num_pos(code) + + @classmethod + def __start_compute_buy(cls, code, compute_start_index, compute_end_index, threshold_money, capture_time, + new_add=True): + if compute_end_index < compute_start_index: + return + + total_datas = local_today_datas[code] + # 鑾峰彇涔板叆淇″彿璁$畻璧峰浣嶇疆 + buy_single_index, buy_exec_index, buy_compute_index, num, count = cls.__get_order_begin_pos(code) + # 鏄惁涓烘柊鑾峰彇鍒扮殑浣嶇疆 + if buy_single_index is None: + # 鏈変拱鍏ヤ俊鍙� + has_single, _index = cls.__compute_order_begin_pos(code, max( + compute_start_index - 2 if new_add else compute_start_index, 0), 3, compute_end_index) + buy_single_index = _index + if has_single: + num = 0 + count = 0 + cls.debug(code, "鑾峰彇鍒颁拱鍏ヤ俊鍙疯捣濮嬬偣锛歿} 鏁版嵁锛歿}", buy_single_index, total_datas[buy_single_index]) + # 濡傛灉鏄粖澶╃涓�娆℃湁涓嬪崟寮�濮嬩俊鍙凤紝闇�瑕佽缃ぇ鍗曡捣濮嬬偣 + cls.l2BigNumForMProcessor.set_begin_pos(code, buy_single_index) + + if buy_single_index is None: + # 鏈幏鍙栧埌涔板叆淇″彿锛岀粓姝㈢▼搴� + return None + # 璁$畻m鍊煎ぇ鍗� + cls.l2BigNumForMProcessor.process(code, max(buy_single_index, compute_start_index), compute_end_index, + gpcode_manager.get_limit_up_price(code)) + threshold_money = cls.__get_threshmoney(code) + # 涔板叆绾拱棰濈粺璁� + compute_index, buy_nums, buy_count, rebegin_buy_pos = cls.__sum_buy_num_for_order_3(code, max(buy_single_index, + compute_start_index), + compute_end_index, num, + count, threshold_money, + buy_single_index, + capture_time) + # 涔板叆淇″彿浣嶄笌璁$畻浣嶇疆闂撮殧2s鍙婁互涓婁簡 + if rebegin_buy_pos is not None: + # 闇�瑕侀噸鏂拌绠楃函涔伴 + cls.__start_compute_buy(code, rebegin_buy_pos, compute_end_index, threshold_money, capture_time, False) + return + + if compute_index is not None: + cls.debug(code, "鑾峰彇鍒颁拱鍏ユ墽琛屼綅缃細{} m鍊硷細{} 绾拱鎵嬫暟锛歿} 绾拱鍗曟暟锛歿} 鏁版嵁锛歿}", compute_index, threshold_money, buy_nums, + buy_count, + total_datas[compute_index]) + # 璁板綍涔板叆淇″彿浣嶇疆 + cls.__save_order_begin_data(code, buy_single_index, compute_index, compute_index, buy_nums, buy_count) + # 濡傛灉鏄粖澶╃涓�娆℃湁涓嬪崟鎵ц淇″彿锛屾定鍋滄椂闂达紙涔板叆鎵ц浣嶆椂闂达級 + limit_up_time_manager.save_limit_up_time(code, total_datas[compute_index]["val"]["time"]) + # 铏氭嫙涓嬪崟 + cls.unreal_buy_dict[code] = (compute_index, capture_time) + # 鍒犻櫎涔嬪墠鐨勬墍鏈夋挙鍗曚俊鍙� + l2_data_manager.TradePointManager.delete_buy_cancel_point(code) + + # 娑ㄥ仠灏佸崟棰濊绠� + L2LimitUpMoneyStatisticUtil.process_data(code, buy_single_index, compute_index, buy_single_index, False) + + # 鏁版嵁鏄惁澶勭悊瀹屾瘯 + if compute_index >= compute_end_index: + cls.debug(code, "鏁版嵁澶勭悊瀹屾瘯锛屼笅鍗�, 鏁版嵁鎴浘鏃堕棿-{}", capture_time) + # 鏁版嵁宸茬粡澶勭悊瀹屾瘯锛屽鏋滆繕娌℃挙鍗曞氨瀹為檯涓嬪崟 + cls.__buy(code, capture_time, total_datas[compute_index], compute_index) + else: + # 鏁版嵁灏氭湭澶勭悊瀹屾瘯锛岃繘琛屼笅涓�姝ュ鐞� + cls.debug(code, "鏁版嵁灏氭湭澶勭悊瀹屾瘯锛岃繘琛屼笅涓�姝ュ鐞嗭紝澶勭悊杩涘害锛歿}", compute_index) + # 澶勭悊鎾ゅ崟姝ラ + cls.__process_order(code, compute_index + 1, compute_end_index, capture_time, False) + else: + # 鏈揪鍒颁笅鍗曟潯浠讹紝淇濆瓨绾拱棰濓紝璁剧疆绾拱棰� + # 璁板綍涔板叆淇″彿浣嶇疆 + cls.__save_order_begin_data(code, buy_single_index, -1, compute_end_index, buy_nums, buy_count) + pass + + # 鑾峰彇涓嬪崟璧峰淇″彿 + @classmethod + def __get_order_begin_pos(cls, code): + buy_single_index, buy_exec_index, compute_index, num, count = l2_data_manager.TradePointManager.get_buy_compute_start_data( + code) + return buy_single_index, buy_exec_index, compute_index, num, count + + # 淇濆瓨涓嬪崟璧峰淇″彿 + @classmethod + def __save_order_begin_data(self, code, buy_single_index, buy_exec_index, compute_index, num, count): + TradePointManager.set_buy_compute_start_data(code, buy_single_index, buy_exec_index, compute_index, num, count) + + # 璁$畻涓嬪崟璧峰淇″彿 + # compute_data_count 鐢ㄤ簬璁$畻鐨刲2鏁版嵁鏁伴噺 + @classmethod + def __compute_order_begin_pos(cls, code, start_index, continue_count, end_index): + second_930 = 9 * 3600 + 30 * 60 + 0 + # 鍊掓暟100鏉℃暟鎹煡璇� + datas = local_today_datas[code] + if end_index - start_index + 1 < continue_count: + return False, None + __time = None + + last_index = None + count = 0 + start = None + + for i in range(start_index, end_index + 1): + _val = datas[i]["val"] + # 鏃堕棿瑕�>=09:30:00 + if L2DataUtil.get_time_as_second(_val["time"]) < second_930: + continue + + if L2DataUtil.is_limit_up_price_buy(_val): + + if last_index is None or (datas[last_index]["val"]["time"] == datas[i]["val"]["time"]): + if start is None: + start = i + last_index = i + count += datas[i]["re"] + if count >= continue_count: + return True, start + else: + # 鏈潯鏁版嵁浣滀负璧风偣 + last_index = i + count = datas[i]["re"] + start = i + + elif not L2DataUtil.is_sell(_val) and not L2DataUtil.is_sell_cancel(_val): + # 鍓旈櫎鍗栦笌鍗栨挙 + last_index = None + count = 0 + start = None + + return False, None + + @classmethod + def __get_threshmoney(cls, code): + return l2_trade_factor.L2TradeFactorUtil.compute_m_value(code) + + # 缁熻涔板叆鍑�涔伴噺锛屼笉璁$畻鍦ㄤ拱鍏ヤ俊鍙蜂箣鍓嶇殑涔版挙鍗� + @classmethod + def __sum_buy_num_for_order_3(cls, code, compute_start_index, compute_end_index, origin_num, origin_count, + threshold_money, buy_single_index, capture_time): + total_datas = local_today_datas[code] + buy_nums = origin_num + buy_count = origin_count + limit_up_price = gpcode_manager.get_limit_up_price(code) + if limit_up_price is None: + raise Exception("娑ㄥ仠浠锋棤娉曡幏鍙�") + # 鐩爣鎵嬫暟 + threshold_num = threshold_money / (limit_up_price * 100) + # 鐩爣璁㈠崟鏁伴噺 + threshold_count = l2_trade_factor.L2TradeFactorUtil.get_safe_buy_count(code) + + buy_single_time_seconds = L2DataUtil.get_time_as_second(total_datas[buy_single_index]["val"]["time"]) + for i in range(compute_start_index, compute_end_index + 1): + data = total_datas[i] + _val = total_datas[i]["val"] + if L2DataUtil.get_time_as_second(_val["time"]) - buy_single_time_seconds > 1: + TradePointManager.delete_buy_point(code) + if i == compute_end_index: + # 鏁版嵁澶勭悊瀹屾瘯 + return None, buy_nums, buy_count, None + else: + # 璁$畻涔板叆淇″彿锛屼笉鑳藉悓涓�鏃堕棿寮�濮嬭绠� + for ii in range(buy_single_index + 1, compute_end_index + 1): + if total_datas[buy_single_index]["val"]["time"] != total_datas[ii]["val"]["time"]: + return None, buy_nums, buy_count, ii + + # 娑ㄥ仠涔� + if L2DataUtil.is_limit_up_price_buy(_val): + # 娑ㄥ仠涔� + buy_nums += int(_val["num"]) * int(total_datas[i]["re"]) + buy_count += int(total_datas[i]["re"]) + if buy_nums >= threshold_num and buy_count >= threshold_count: + logger_l2_trade_buy.info("{}鑾峰彇鍒颁拱鍏ユ墽琛岀偣锛歿} 缁熻绾拱鎵嬫暟锛歿} 鐩爣绾拱鎵嬫暟锛歿} 缁熻绾拱鍗曟暟锛歿} 鐩爣绾拱鍗曟暟锛歿}", code, i, buy_nums, + threshold_num, buy_count, threshold_count) + elif L2DataUtil.is_limit_up_price_buy_cancel(_val): + # 娑ㄥ仠涔版挙 + # 鍒ゆ柇涔板叆浣嶇疆鏄惁鍦ㄤ拱鍏ヤ俊鍙蜂箣鍓� + buy_index, buy_data = l2_data_util.get_buy_data_with_cancel_data(total_datas[i], + local_today_num_operate_map.get(code)) + if buy_index is not None: + # 鎵惧埌涔版挙鏁版嵁鐨勪拱鍏ョ偣 + if buy_index >= buy_single_index: + buy_nums -= int(_val["num"]) * int(data["re"]) + buy_count -= int(data["re"]) + cls.buy_debug(code, "{}鏁版嵁鍦ㄤ拱鍏ヤ俊鍙蜂箣鍚� 鎾や拱绾拱鎵嬫暟锛歿} 鐩爣鎵嬫暟锛歿}", i, buy_nums, threshold_num) + else: + cls.buy_debug(code, "{}鏁版嵁鍦ㄤ拱鍏ヤ俊鍙蜂箣鍓嶏紝涔板叆浣嶏細{}", i, buy_index) + if total_datas[buy_single_index]["val"]["time"] == buy_data["val"]["time"]: + # 鍚屼竴绉�,褰撲綔涔板叆淇″彿涔嬪悗澶勭悊 + buy_nums -= int(_val["num"]) * int(data["re"]) + buy_count -= int(data["re"]) + cls.buy_debug(code, "{}鏁版嵁涔板叆浣嶄笌棰勪及涔板叆浣嶅湪鍚屼竴绉�", i) + else: + # 鏈壘鍒颁拱鎾ゆ暟鎹殑涔板叆鐐� + cls.buy_debug(code, "鏈壘鍒颁拱鎾ゆ暟鎹殑涔板叆鐐�: 浣嶇疆-{} 鏁版嵁-{}", i, data) + buy_nums -= int(_val["num"]) * int(total_datas[i]["re"]) + buy_count -= int(total_datas[i]["re"]) + cls.buy_debug(code, "浣嶇疆-{}锛屾�绘墜鏁帮細{}锛岀洰鏍囨墜鏁帮細{}", i, + buy_nums, threshold_num) + # 鏈夋挙鍗曚俊鍙凤紝涓斿皬浜庨槇鍊� + if buy_nums >= threshold_num and buy_count >= threshold_count: + return i, buy_nums, buy_count, None + + cls.buy_debug(code, "灏氭湭鑾峰彇鍒颁拱鍏ユ墽琛岀偣锛岃捣濮嬭绠椾綅缃細{} 缁熻绾拱鎵嬫暟锛歿} 鐩爣绾拱鎵嬫暟锛歿} 缁熻绾拱鍗曟暟锛歿} 鐩爣绾拱鍗曟暟锛歿}", compute_start_index, + buy_nums, + threshold_num, buy_count, threshold_count) + return None, buy_nums, buy_count, None + + @classmethod + def test(cls): + code = "002898" + l2_trade_test.clear_trade_data(code) + load_l2_data(code, True) + + if True: + state = trade_manager.get_trade_state(code) + cls.random_key[code] = random.randint(0, 100000) + capture_timestamp = 1999988888 + try: + if state == trade_manager.TRADE_STATE_BUY_DELEGATED or state == trade_manager.TRADE_STATE_BUY_PLACE_ORDER: + # 宸叉寕鍗� + cls.__process_order(code, 0, 140, capture_timestamp) + else: + # 鏈寕鍗� + cls.__process_not_order(code, 0, 140, capture_timestamp) + except Exception as e: + logging.exception(e) + return + + _start = t.time() + # 鎸塻鎵归噺鍖栨暟鎹� + total_datas = local_today_datas[code] + start_time = total_datas[0]["val"]["time"] + start_index = 0 + for i in range(0, len(total_datas)): + if total_datas[i]["val"]["time"] != start_time: + cls.random_key[code] = random.randint(0, 100000) + # 澶勭悊鏁版嵁 + start = start_index + # if start != 201: + # continue + end = i - 1 + print("澶勭悊杩涘害锛歿},{}".format(start, end)) + capture_timestamp = 1999999999 + state = trade_manager.get_trade_state(code) + try: + if state == trade_manager.TRADE_STATE_BUY_DELEGATED or state == trade_manager.TRADE_STATE_BUY_PLACE_ORDER: + # 宸叉寕鍗� + cls.__process_order(code, start, end, capture_timestamp) + else: + # 鏈寕鍗� + cls.__process_not_order(code, start, end, capture_timestamp) + except Exception as e: + logging.exception(e) + # t.sleep(1) + start_index = i + start_time = total_datas[i]["val"]["time"] + + print("鏃堕棿鑺辫垂:", round((t.time() - _start) * 1000)) + + +if __name__ == "__main__": + L2TradeDataProcessor.test() diff --git a/l2_data_util.py b/l2_data_util.py index c3de649..4fe0481 100644 --- a/l2_data_util.py +++ b/l2_data_util.py @@ -13,6 +13,19 @@ from trade_gui import async_call +def run_time(): + def decorator(func): + def infunc(*args, **kwargs): + start = round(time.time() * 1000) + result = func(args, **kwargs) + print("鎵ц鏃堕棿", round(time.time() * 1000) - start) + return result + + return infunc + + return decorator + + def compare_time(time1, time2): result = int(time1.replace(":", "", 2)) - int(time2.replace(":", "", 2)) return result @@ -110,6 +123,33 @@ return None, None +# 鍒ゆ柇鍗栨挙鐨勫崠淇″彿鏄惁鍦ㄧ洰鏍囦俊鍙蜂箣鍓� +def is_sell_index_before_target(sell_cancel_data, target_data, local_today_num_operate_map): + min_space, max_space = compute_time_space_as_second(sell_cancel_data["val"]["cancelTime"], + sell_cancel_data["val"]["cancelTimeUnit"]) + max_time = __sub_time(sell_cancel_data["val"]["time"], min_space) + min_time = __sub_time(sell_cancel_data["val"]["time"], max_space) + # 濡傛灉鏈�澶у�奸兘鍦ㄧ洰鏍囦俊鍙蜂箣鍓嶅垯淇″彿鑲畾鍦ㄧ洰鏍囦俊鍙蜂箣鍓� + if int(target_data["val"]["time"].replace(":", "")) > int(max_time.replace(":", "")): + return True + sell_datas = local_today_num_operate_map.get( + "{}-{}-{}".format(sell_cancel_data["val"]["num"], "2", sell_cancel_data["val"]["price"])) + for i in range(0, len(sell_datas)): + data = sell_datas[i] + if int(data["val"]["operateType"]) != 2: + continue + if int(data["val"]["num"]) != int(sell_cancel_data["val"]["num"]): + continue + if min_space == 0 and max_space == 0: + # 鏈鍐� + if compare_time(data["val"]["time"], min_time) == 0: + return data["index"] < target_data["index"] + # 鏁版嵁鍦ㄦ纭殑鍖洪棿 + elif compare_time(data["val"]["time"], min_time) > 0 and compare_time(data["val"]["time"], max_time) <= 0: + return data["index"] < target_data["index"] + return False + + __last_big_data = {} @@ -140,17 +180,127 @@ break +# l2鏁版嵁鎷兼帴宸ュ叿 +class L2DataConcatUtil: + + # 鍒濆鍖� + def __init__(self, code, last_datas, datas): + self.last_datas = last_datas + self.datas = datas + self.code = code + + def __get_data_identity(self, data_): + data=data_["val"] + return "{}-{}-{}-{}-{}-{}".format(data.get("time"), data.get("num"), data.get("price"), data.get("operateType"), + data.get("cancelTime"), data.get("cancelTimeUnit")) + + # 鑾峰彇鎷兼帴鐨勭壒寰�,鑾峰彇鏈�鍚�3绗� + def __get_concat_feature(self): + # 鏈�灏戦渶瑕�3鏉℃暟鎹�+2鏉¢渶瑕佹湁鐗瑰緛鐐圭殑鏁版嵁 + min_identity = 2 + min_count = 3 + + identity_set = set() + count = 0 + start_index = -1 + for i in range(len(self.last_datas) - 1, -1, -1): + identity_set.add(self.__get_data_identity(self.last_datas[i])) + count += 1 + start_index = i + if count >= min_count and len(identity_set) >= min_identity: + break + return start_index, len(self.last_datas) - 1 + + # 鑾峰彇鏂板鏁版嵁 + def get_add_datas(self): + # 鏌ヨ褰撳墠鏁版嵁鏄惁鍦ㄦ渶杩戜竴娆℃暟鎹箣鍚� + if self.last_datas and self.datas: + if int(self.datas[-1]["val"]["time"].replace(":", "")) - int(self.last_datas[-1]["val"]["time"].replace(":", "")) < 0: + return [] + + # 鑾峰彇鎷兼帴鐐� + start_index, end_index = self.__get_concat_feature() + if start_index < 0: + return self.datas + print("鐗瑰緛浣嶇疆锛�", start_index, end_index) + # 鎻愬彇鐗瑰緛鐐圭殑鏍囪瘑鏁版嵁 + identity_list = [] + for i in range(start_index, end_index + 1): + identity_list.append(self.__get_data_identity(self.last_datas[i])) + + # 鏌ユ壘瀹屾暣鐨勭壒寰� + identity_count = len(identity_list) + for n in range(0, identity_count): + # 姣忔閬嶅巻鍑忓皯鏈�鍓嶉潰涓�涓壒寰侀噺 + for i in range(0, len(self.datas) - len(identity_list) + n): + if self.__get_data_identity(self.datas[i]) == identity_list[n]: + # n==0 琛ㄧず瀹屽叏鍖归厤 锛� i=0 琛ㄧず鍗充娇涓嶆槸瀹屽叏鍖归厤锛屼絾蹇呴』鏂版暟鎹涓�涓厓绱犲尮閰� + if n == 0 or i == 0: + find_identity = True + for j in range(n + 1, len(identity_list)): + if identity_list[j] != self.__get_data_identity(self.datas[i + j - n]): + find_identity = False + break + + if find_identity: + return self.datas[i + len(identity_list) - n:] + else: + continue + print("鏂版暟鎹腑鏈壘鍒扮壒寰佹爣璇�") + return self.datas + + +def test_add_datas(): + def load_data(datas): + data_list = [] + for data in datas: + data_list.append({"val":{"time": data}}) + return data_list + + # 涓嶅尮閰� + latest_datas = [] + datas = ["10:00:02", "10:00:02", "10:00:03", "10:00:04", "10:00:05"] + latest_datas = load_data(latest_datas) + datas = load_data(datas) + print(L2DataConcatUtil("000333", latest_datas, datas).get_add_datas()) + + # 涓嶅尮閰� + latest_datas = ["10:00:02"] + datas = ["10:00:02", "10:00:02", "10:00:03", "10:00:04", "10:00:05"] + latest_datas = load_data(latest_datas) + datas = load_data(datas) + print(L2DataConcatUtil("000333", latest_datas, datas).get_add_datas()) + + # 涓嶅尮閰� + latest_datas = ["10:00:00", "10:00:01", "10:00:02", "10:00:03"] + datas = ["10:00:02", "10:00:02", "10:00:03", "10:00:04", "10:00:05"] + latest_datas = load_data(latest_datas) + datas = load_data(datas) + print(L2DataConcatUtil("000333", latest_datas, datas).get_add_datas()) + + # 鍖归厤 + latest_datas = ["10:00:00", "10:00:01", "10:00:02", "10:00:03"] + datas = ["10:00:01", "10:00:02", "10:00:03", "10:00:04", "10:00:05"] + latest_datas = load_data(latest_datas) + datas = load_data(datas) + print(L2DataConcatUtil("000333", latest_datas, datas).get_add_datas()) + + latest_datas = ["10:00:00", "10:00:01", "10:00:02", "10:00:02"] + datas = ["10:00:02", "10:00:02", "10:00:03", "10:00:04", "10:00:05"] + latest_datas = load_data(latest_datas) + datas = load_data(datas) + print(L2DataConcatUtil("000333", latest_datas, datas).get_add_datas()) + + latest_datas = ["10:00:00", "10:00:01", "10:00:02", "10:00:02"] + datas = ["10:00:02", "10:00:02", "10:00:00", "10:00:01", "10:00:02", "10:00:02", "10:00:04", "10:00:05"] + latest_datas = load_data(latest_datas) + datas = load_data(datas) + print(L2DataConcatUtil("000333", latest_datas, datas).get_add_datas()) + + def test(datas): datas["code"] = "test" if __name__ == "__main__": - # cancel_data = {"val": {"operateType": 1, "num": 1520, "cancelTime": 1, "cancelTimeUnit": 1, "time": "09:32:30"}} - # today_datas=[{"val": {"operateType": 1, "num": 1520, "cancelTime": 1, "cancelTimeUnit": 0, "time": "09:32:30"}},{"val": {"operateType": 0, "num": 1520, "cancelTime": 0, "cancelTimeUnit": 0, "time": "09:31:31"}}] - # result= get_buy_data_with_cancel_data(cancel_data,today_datas) - # print(result) - code = "001209" - l2_data_manager.load_l2_data(code) - total_datas = l2_data_manager.local_today_datas[code] - index, data = get_buy_data_with_cancel_data(total_datas[118], l2_data_manager.local_today_num_operate_map.get(code)) - print(index, data) + test_add_datas() diff --git a/l2_trade_factor.py b/l2_trade_factor.py index aa783ba..cda2229 100644 --- a/l2_trade_factor.py +++ b/l2_trade_factor.py @@ -18,53 +18,57 @@ yi = 1 return 5000000 + (yi - 1) * 500000 - # 鑷敱娴侀�氬競鍊煎奖鍝嶆瘮渚� - @classmethod - def get_zylt_rate(cls, zyltgb): - yi = round(zyltgb / 100000000) - if yi < 1: - yi = 1 - if yi <= 30: - rate = -0.04 + 0.01 * (yi - 1) - if rate > 0.1: - rate = 0.1 - else: - rate = 0.09 - (yi - 31) * 0.002 - if rate < -0.1: - rate = -0.1 - return round(rate, 4) - # 鑾峰彇琛屼笟褰卞搷姣斾緥 # total_limit_percent涓虹粺璁$殑姣斾緥涔嬪拰涔樹互100 @classmethod def get_industry_rate(cls, total_limit_percent): t = total_limit_percent / 10 - rate = t / 0.5 * 0.02 + 0.26 - if rate > 0.52: - rate = 0.52 - return round(rate, 2) + if t < 0.9: + return 0 + elif t <= 1.1: + return 0.2 + elif t <= 1.6: + return 0 + elif t <= 2.1: + return 0.03 + elif t <= 2.6: + return 0.06 + elif t <= 3.1: + return 0.09 + elif t <= 3.6: + return 0.12 + elif t <= 4.1: + return 0.15 + elif t <= 4.6: + return 0.18 + elif t <= 5.1: + return 0.21 + elif t <= 5.6: + return 0.24 + elif t <= 6.1: + return 0.27 + else: + return 0.30 # 鑾峰彇閲忓奖鍝嶆瘮渚� @classmethod def get_volumn_rate(cls, day60_max, yest, today): old_volumn = yest - base_rate = 0.49 if day60_max > yest: old_volumn = day60_max - base_rate = 0.50 r = round(today / old_volumn, 2) + if r < 0.01: + r = 0.01 print("姣斾緥锛�", r) rate = 0 - if r <= 0.25: - rate = base_rate - (r - 0.01) * 2 - elif r <= 0.5: - rate = 0.25 - r + (0.01 if day60_max > yest else 0) - elif r < 0.75: - rate = r - 0.75 + (0.01 if day60_max > yest else 0) - elif r < 1.74: - rate = base_rate - (r - 0.75) + if r < 0.5: + rate = 0.3 - (r - 0.01) + elif r <= 0.75: + rate = -0.2 + (r - 0.5) * 2 + elif r <= 1.35: + rate = 0.3 - (r - 0.75) else: - rate = base_rate - 0.99 + rate = -0.3 return round(rate, 4) # 褰撳墠鑲$エ棣栨娑ㄥ仠鏃堕棿鐨勫奖鍝嶆瘮渚� @@ -74,36 +78,33 @@ start_m = 9 * 60 + 30 m = int(times[0]) * 60 + int(times[1]) dif = m - start_m - base_rate = 0.5 + base_rate = 0.3 rate = 0 if dif < 1: rate = base_rate - elif dif <= 5: - rate = base_rate - dif * 0.02 elif dif <= 120: # 11:30涔嬪墠 - rate = 0.39 - (dif - 6) * 0.004 + rate = base_rate - dif * 0.0035 else: - rate = 0.39 - (120 - 6) * 0.004 - (dif - 210 + 1) * 0.004 - if rate < -0.5: - rate = -0.5 + rate = base_rate - (dif - 89) * 0.0035 + if rate < -0.3020: + rate = -0.3020 return round(rate, 4) # 绾竾鎵嬪摜褰卞搷鍊硷紙鎵嬫暟銆�=9000 OR 閲戦銆�=500w锛� @classmethod def get_big_money_rate(cls, num): - if num < 0: - num = 0 - if num >= 10: - return 0.5 - else: - return round(num * 0.05, 2) + if num < 4: + return 0 + rate = (num - 4) * 0.035 / 4 + 0.06 + if rate > 0.9: + rate = 0.9 + return round(rate, 4) @classmethod def compute_rate(cls, zyltgb, total_industry_limit_percent, volumn_day60_max, volumn_yest, volumn_today, limit_up_time, big_money_num): - # 鑷敱娴侀�氳偂鏈奖鍝嶆瘮渚� - zyltgb_rate = cls.get_zylt_rate(zyltgb) + # 琛屼笟娑ㄥ仠褰卞搷姣斾緥 industry_rate = 0 if total_industry_limit_percent is not None: @@ -121,16 +122,23 @@ if big_money_num is not None: big_money_rate = cls.get_big_money_rate(big_money_num) print( - "zyltgb_rate锛歿} industry_rate锛歿} volumn_rate锛歿} limit_up_time_rate锛歿} big_money_rate锛歿}".format(zyltgb_rate, - industry_rate, - volumn_rate, - limit_up_time_rate, - big_money_rate)) + "industry_rate锛歿} volumn_rate锛歿} limit_up_time_rate锛歿} big_money_rate锛歿}".format(industry_rate, + volumn_rate, + limit_up_time_rate, + big_money_rate)) - return round(1 - (zyltgb_rate + industry_rate + volumn_rate + limit_up_time_rate + big_money_rate), 4) + final_rate = round(1 - (industry_rate + volumn_rate + limit_up_time_rate + big_money_rate), 4) + if final_rate < 0.1: + final_rate = 0.1 + return final_rate @classmethod def compute_rate_by_code(cls, code): + factors = cls.__get_rate_factors(code) + return cls.compute_rate(factors[0], factors[1], factors[2], factors[3], factors[4], factors[5], factors[6]) + + @classmethod + def __get_rate_factors(cls, code): zyltgb = global_util.zyltgb_map.get(code) # 鑾峰彇琛屼笟鐑害 industry = global_util.code_industry_map.get(code) @@ -139,6 +147,11 @@ industry = global_util.code_industry_map.get(code) total_industry_limit_percent = global_util.industry_hot_num.get(industry) if industry is not None else None + # 褰撳墠绁ㄦ槸鍚︽定鍋� + if total_industry_limit_percent is not None: + if code in global_util.limit_up_codes_percent: + total_industry_limit_percent -= global_util.limit_up_codes_percent[code] + # 鑾峰彇閲� volumn_day60_max, volumn_yest, volumn_today = global_util.max60_volumn.get( code), global_util.yesterday_volumn.get(code), global_util.today_volumn.get(code) @@ -154,8 +167,22 @@ big_money_num = global_util.big_money_num.get(code) if big_money_num is None: big_money_num = big_money_num_manager.get_num(code) - return cls.compute_rate(zyltgb, total_industry_limit_percent, volumn_day60_max, volumn_yest, volumn_today, - limit_up_time, big_money_num) + return ( + zyltgb, total_industry_limit_percent, volumn_day60_max, volumn_yest, volumn_today, limit_up_time, + big_money_num) + + @classmethod + def factors_to_string(cls, code): + vals = cls.__get_rate_factors(code) + return "zyltgb:%s, total_industry_limit_percent:%s, volumn_day60_max:%s, volumn_yest:%s, volumn_today:%s,limit_up_time:%s, big_money_num:%s" % vals + + @classmethod + def __get_zyltgb(cls, code): + zyltgb = global_util.zyltgb_map.get(code) + if zyltgb is None: + global_util.load_zyltgb() + zyltgb = global_util.zyltgb_map.get(code) + return zyltgb @classmethod def compute_m_value(cls, code): @@ -166,12 +193,24 @@ if zyltgb is None: print("娌℃湁鑾峰彇鍒拌嚜鐢辨祦閫氬競鍊�") return 10000000 - if code == '002476': - print("") zyltgb = cls.get_base_safe_val(zyltgb) rate = cls.compute_rate_by_code(code) # print("m鍊艰幏鍙栵細", code, round(zyltgb * rate)) return round(zyltgb * rate) + + # 鑾峰彇瀹夊叏绗旀暟 + @classmethod + def get_safe_buy_count(cls, code): + gb = cls.__get_zyltgb(code) + if not gb: + # 榛樿10绗� + return 8 + count = gb // 100000000 + if count > 30: + return 30 + if count < 5: + return 5 + return count # l2鍥犲瓙褰掑洜鏁版嵁 @@ -191,7 +230,14 @@ if __name__ == "__main__": - L2TradeFactorUtil.compute_m_value("000036") - # print(L2TradeFactorUtil.get_big_money_rate(1)) + # print(L2TradeFactorUtil.get_rate_factors("003004")) + # print(L2TradeFactorUtil.factors_to_string("003004")) + print(L2TradeFactorUtil.get_limit_up_time_rate("09:30:30")) + print(L2TradeFactorUtil.get_limit_up_time_rate("11:30:00")) + print(L2TradeFactorUtil.get_limit_up_time_rate("13:00:00")) + print(L2TradeFactorUtil.get_limit_up_time_rate("13:48:00")) + print(L2TradeFactorUtil.get_limit_up_time_rate("13:53:23")) + print(L2TradeFactorUtil.get_limit_up_time_rate("14:23:23")) + # print(L2TradeFactorUtil.get_big_money_rate(2)) # print(L2TradeFactorUtil.get_big_money_rate(3)) diff --git a/l2_trade_test.py b/l2_trade_test.py new file mode 100644 index 0000000..6cf7259 --- /dev/null +++ b/l2_trade_test.py @@ -0,0 +1,29 @@ +# 浜ゆ槗娴嬭瘯 +# 娓呴櫎浜ゆ槗鏁版嵁 +import big_money_num_manager +import redis_manager +from l2_data_manager import TradePointManager + + +def clear_trade_data(code): + redis_l2 = redis_manager.RedisManager(1).getRedis() + keys = ["buy1_volumn_latest_info-{}", "m_big_money_begin-{}", "m_big_money_process_index-{}"] + for k in keys: + redis_l2.delete(k.format(code)) + TradePointManager.delete_buy_point(code) + big_money_num_manager.reset(code) + redis_trade = redis_manager.RedisManager(2).getRedis() + redis_trade.delete("trade-state-{}".format(code)) + + redis_info = redis_manager.RedisManager(0).getRedis() + keys = redis_info.keys("*{}*".format(code)) + for k in keys: + if k.find("pre") is not None: + continue + if k.find("zyltgb") is not None: + continue + + redis_info.delete(k) + + + diff --git a/limit_up_time_manager.py b/limit_up_time_manager.py index bfc3d52..f9f62d5 100644 --- a/limit_up_time_manager.py +++ b/limit_up_time_manager.py @@ -37,6 +37,7 @@ global_util.limit_up_time[code] = redis.get(key) +# 鏉垮潡寮哄害鎺掑簭 def sort_code_by_limit_time(codes): if not global_util.limit_up_time: load_limit_up_time() @@ -47,10 +48,23 @@ list.append((code, limit_up_time)) new_s = sorted(list, key=lambda e: int(e[1].replace(":", ""))) dict_ = {} + # 鐩稿悓鍊间负鍚屼竴鎺掑簭 + sort_index = 0 for i in range(0, len(new_s)): - dict_[new_s[i][0]] = i + if new_s[i - 1][1] != new_s[i][1] and i > 0: + sort_index += 1 + dict_[new_s[i][0]] = sort_index return dict_ if __name__ == "__main__": - print(sort_code_by_limit_time(["002393", "002476", "002614", "002750", "600082", "002751"])) + list = [("1234578", "09:00:03"), ("12345", "09:00:00"), ("123456", "09:00:00"), ("123457", "09:00:03")] + new_s = sorted(list, key=lambda e: int(e[1].replace(":", ""))) + dict_ = {} + # 鐩稿悓鍊间负鍚屼竴鎺掑簭 + sort_index = 0 + for i in range(0, len(new_s)): + if new_s[i - 1][1] != new_s[i][1] and i > 0: + sort_index += 1 + dict_[new_s[i][0]] = sort_index + print(dict_) diff --git a/log.py b/log.py index 3aa707c..cc7143f 100644 --- a/log.py +++ b/log.py @@ -1,15 +1,20 @@ """ 鏃ュ織 """ - - - +import datetime +import os from loguru import logger def get_path(dir_name, log_name): return "D:/logs/gp/{}/{}".format(dir_name, log_name) + ".{time:YYYY-MM-DD}.log" + + +logger.add(get_path("l2", "l2_process_time"), filter=lambda record: record["extra"].get("name") == "l2_process_time", + rotation="00:00", compression="zip", enqueue=True) +logger_l2_process_time = logger.bind(name="l2_process_time") +logger.remove(handler_id=None) # 姣忎竴澶╃敓鎴愪竴涓棩蹇楁枃浠讹紝鍘嗗彶鏃ュ織鏂囦欢閲囩敤zip鍘嬬缉,寮傛鍐欏叆鏃ュ織 @@ -26,9 +31,9 @@ logger.add(get_path("l2", "l2_process"), filter=lambda record: record["extra"].get("name") == "l2_process", rotation="00:00", compression="zip", enqueue=True) + logger.add(get_path("l2", "l2_trade"), filter=lambda record: record["extra"].get("name") == "l2_trade", rotation="00:00", compression="zip", enqueue=True) - logger.add(get_path("l2", "l2_trade_cancel"), filter=lambda record: record["extra"].get("name") == "l2_trade_cancel", rotation="00:00", compression="zip", enqueue=True) @@ -50,15 +55,17 @@ logger.add(get_path("system", "system"), filter=lambda record: record["extra"].get("name") == "system", rotation="00:00", compression="zip", enqueue=True) + + logger_trade_gui = logger.bind(name="trade_gui") logger_trade = logger.bind(name="trade") logger_trade_delegate = logger.bind(name="delegate") logger_l2_error = logger.bind(name="l2_error") logger_l2_process = logger.bind(name="l2_process") + logger_l2_trade = logger.bind(name="l2_trade") logger_l2_trade_cancel = logger.bind(name="l2_trade_cancel") logger_l2_trade_buy = logger.bind(name="l2_trade_buy") - logger_l2_big_data = logger.bind(name="l2_big_data") logger_juejin_tick = logger.bind(name="juejin_tick") @@ -66,5 +73,52 @@ logger_device = logger.bind(name="device") logger_system = logger.bind(name="system") + +class LogUtil: + @classmethod + def extract_log_from_key(cls, key, path, target_path): + fw = open(target_path, mode='w', encoding="utf-8") + try: + with open(path, 'r', encoding="utf-8") as f: + lines = f.readlines() + for line in lines: + if line.find("{}".format(key)) > 0: + fw.write(line) + finally: + fw.close() + + +# 瀵煎嚭鏁版嵁澶勭悊浣嶇疆鏃ュ織 +def __export_l2_pos_range(code, date, dir): + LogUtil.extract_log_from_key("{} 澶勭悊鏁版嵁鑼冨洿".format(code), "D:/logs/gp/l2/l2_process.{}.log".format(date), + "{}/l2_process_{}.log".format(dir, date)) + + +# 瀵煎嚭浜ゆ槗鏃ュ織 +def __export_l2_trade_log(code, date, dir): + LogUtil.extract_log_from_key(code, "D:/logs/gp/l2/l2_trade.{}.log".format(date), + "{}/l2_trade_{}.log".format(dir, date)) + + +# 瀵煎嚭浜ゆ槗鍙栨秷鏃ュ織 +def __export_l2_trade_cancel_log(code, date, dir): + LogUtil.extract_log_from_key(code, "D:/logs/gp/l2/l2_trade_cancel.{}.log".format(date), + "{}/l2_trade_cancel_{}.log".format(dir, date)) + + +def export_l2_log(code): + if len(code) < 6: + return + date = datetime.datetime.now().strftime("%Y-%m-%d") + dir_ = "D:/logs/gp/l2/{}".format(code) + if not os.path.exists(dir_): + os.mkdir(dir_) + __export_l2_pos_range(code, date, dir_) + __export_l2_trade_cancel_log(code, date, dir_) + __export_l2_trade_log(code, date, dir_) + + if __name__ == '__main__': - logger_trade_gui.info("娴嬭瘯") + date = datetime.datetime.now().strftime("%Y-%m-%d") + LogUtil.extract_log_from_key("003005", "D:/logs/gp/l2/l2_process_time.{}.log".format(date), + "D:/logs/gp/l2/l2_process_time{}.{}.log".format("003005", date)) diff --git a/mysql_data.py b/mysql_data.py new file mode 100644 index 0000000..ba7f63d --- /dev/null +++ b/mysql_data.py @@ -0,0 +1,85 @@ +# 鍏堣瀵煎叆pymysql +import pymysql + +# 鎶婅繛鎺ュ弬鏁板畾涔夋垚瀛楀吀 + +config = { + "host": "127.0.0.1", + "port": 3306, + "database": "gp", + "charset": "utf8", + "user": "root", + "passwd": "123456" +} + + +class Mysqldb: + # 鍒濆鍖栨柟娉� + def __init__(self): + # 鍒濆鍖栨柟娉曚腑璋冪敤杩炴帴鏁版嵁搴撶殑鏂规硶 + self.conn = self.get_conn() + # 璋冪敤鑾峰彇娓告爣鐨勬柟娉� + self.cursor = self.get_cursor() + + # 杩炴帴鏁版嵁搴撶殑鏂规硶 + def get_conn(self): + # **config浠h〃涓嶅畾闀垮弬鏁� + conn = pymysql.connect(**config) + return conn + + # 鑾峰彇娓告爣 + def get_cursor(self): + cursor = self.conn.cursor() + return cursor + + # 鏌ヨsql璇彞杩斿洖鐨勬墍鏈夋暟鎹� + def select_all(self, sql): + self.cursor.execute(sql) + return self.cursor.fetchall() + + # 鏌ヨsql璇彞杩斿洖鐨勪竴鏉℃暟鎹� + def select_one(self, sql): + self.cursor.execute(sql) + return self.cursor.fetchone() + + # 鏌ヨsql璇彞杩斿洖鐨勫嚑鏉℃暟鎹� + def select_many(self, sql, num): + self.cursor.execute(sql) + return self.cursor.fetchmany(num) + + # 澧炲垹鏀归櫎浜哠QL璇彞涓嶄竴鏍峰叾浠栭兘鏄竴鏍风殑锛岄兘闇�瑕佹彁浜� + def execute(self, sql, args=None): + try: + # 鎵ц璇彞 + self.cursor.execute(sql, args) + # 鎻愪氦 + self.conn.commit() + except Exception as e: + print("鎻愪氦鍑洪敊\n:", e) + # 濡傛灉鍑洪敊瑕佸洖婊� + self.conn.rollback() + + def execute_many(self, sql, args=None): + try: + # 鎵ц璇彞 + self.cursor.executemany(sql, args) + # 鎻愪氦 + self.conn.commit() + except Exception as e: + print("鎻愪氦鍑洪敊\n:", e) + # 濡傛灉鍑洪敊瑕佸洖婊� + self.conn.rollback() + + # 褰撳璞¤閿�姣佹椂锛屾父鏍囪鍏抽棴,杩炴帴涔熻鍏抽棴 + # 鍒涘缓鏃舵槸鍏堝垱寤鸿繛鎺ュ悗鍒涘缓娓告爣锛屽叧闂椂鏄厛鍏抽棴娓告爣鍚庡叧闂繛鎺� + def __del__(self): + self.cursor.close() + self.conn.close() + + +if __name__ == '__main__': + mysqldb = Mysqldb() + # 鎻掑叆鍗曟潯鏁版嵁 + mysqldb.execute("insert into clients(account,pwd,rule) values(%s,%s,%s)", ("test", 123456, "\"123")) + # 鎻掑叆澶氭潯鏁版嵁 + mysqldb.execute_many("insert into clients(account,pwd,rule) values(%s,%s,%s)", [("test", 123456, "\"123"),("test", 123456, "\"123")]) diff --git a/server.py b/server.py index d3dc3ce..0d53edd 100644 --- a/server.py +++ b/server.py @@ -12,10 +12,14 @@ import alert_util import code_volumn_manager import data_process +import global_util import gpcode_manager import authority import juejin +import l2_data_log import l2_data_manager +import l2_data_manager_new +import log import ths_industry_util import ths_util import tool @@ -24,11 +28,13 @@ from log import logger_l2_error, logger_l2_process, logger_device, logger_trade_delegate from trade_data_manager import TradeCancelDataManager +from trade_queue_manager import THSBuy1VolumnManager class MyTCPServer(socketserver.TCPServer): - def __init__(self, server_address, RequestHandlerClass, bind_and_activate=True, pipe=None): - self.pipe = pipe # 澧炲姞鐨勫弬鏁� + def __init__(self, server_address, RequestHandlerClass, bind_and_activate=True, pipe_juejin=None, pipe_ui=None): + self.pipe_juejin = pipe_juejin # 澧炲姞鐨勫弬鏁� + self.pipe_ui = pipe_ui socketserver.TCPServer.__init__(self, server_address, RequestHandlerClass, bind_and_activate=bind_and_activate) @@ -41,6 +47,7 @@ set_operate_code_state_dict = {} l2_data_error_dict = {} last_trade_delegate_data = None + buy1_volumn_manager = THSBuy1VolumnManager() def setup(self): super().setup() # 鍙互涓嶈皟鐢ㄧ埗绫荤殑setup()鏂规硶锛岀埗绫荤殑setup鏂规硶浠�涔堥兘娌″仛 @@ -65,7 +72,7 @@ if len(data) == 0: # print("瀹㈡埛绔柇寮�杩炴帴") break; - _str = str(data, encoding="gb2312") + _str = str(data, encoding="gbk") if len(_str) > 0: # print("缁撴灉锛�",_str) type = data_process.parseType(_str) @@ -74,14 +81,23 @@ try: __start_time = round(time.time() * 1000) + _start_time = round(time.time() * 1000) # level2鐩樺彛鏁版嵁 day, client, channel, code, capture_time, process_time, datas = l2_data_manager.parseL2Data( _str) # 10ms鐨勭綉缁滀紶杈撳欢鏃� capture_timestamp = __start_time - process_time - 10 - # 淇濆瓨l2鎴浘鏃堕棿 - TradeCancelDataManager.save_l2_capture_time(client, channel, code, capture_time) + + __start_time = l2_data_log.l2_time(code, round(time.time() * 1000) - __start_time, "鏁版嵁瑙f瀽鏃堕棿") + # try: + # self.pipe_ui.send( + # json.dumps({"type": "l2_data_notify", "data": {"count": len(datas), "code": code}})) + # except: + # pass + + # 杩囨椂 淇濆瓨l2鎴浘鏃堕棿 + # TradeCancelDataManager.save_l2_capture_time(client, channel, code, capture_time) cid, pid = gpcode_manager.get_listen_code_pos(code) # 鍒ゆ柇鐩爣浠g爜浣嶇疆鏄惁涓庝笂浼犳暟鎹綅缃竴鑷� if cid is not None and pid is not None and client == int(cid) and channel == int(pid): @@ -109,8 +125,10 @@ self.set_operate_code_state_dict[key] = round(time.time() * 1000) self.l2CodeOperate.set_operate_code_state(client, channel, 1) + __start_time = l2_data_log.l2_time(code, round(time.time() * 1000) - __start_time, + "l2鏁版嵁姝g‘鎬у垽鏂椂闂�") if gpcode_manager.is_listen(code): - l2_data_manager.L2TradeDataProcessor.process(code, datas, capture_timestamp) + l2_data_manager_new.L2TradeDataProcessor.process(code, datas, capture_timestamp) except l2_data_manager.L2DataException as l: # 鍗曚环涓嶇 if l.get_code() == l2_data_manager.L2DataException.CODE_PRICE_ERROR: @@ -133,9 +151,10 @@ __end_time = round(time.time() * 1000) # 鍙褰曞ぇ浜�40ms鐨勬暟鎹� if __end_time - __start_time > 40: - logger_l2_process.info("l2澶勭悊鏃堕棿锛歿}-{}".format(code, __end_time - __start_time)); - except: - pass + l2_data_log.l2_time(code, round(time.time() * 1000) - _start_time, "l2鏁版嵁澶勭悊鎬昏�楁椂", + True) + except Exception as e: + logging.exception(e) elif type == 10: # level2浜ゆ槗闃熷垪 try: @@ -154,7 +173,7 @@ gpcode_manager.set_gp_list(code_list) # 閲嶆柊璁㈤槄 - self.server.pipe.send(json.dumps({"type": "resub"})) + self.server.pipe_juejin.send(json.dumps({"type": "resub"})) # 鍚屾鍚岃姳椤虹洰鏍囦唬鐮� t1 = threading.Thread(target=lambda: sync_target_codes_to_ths()) t1.setDaemon(True) @@ -190,7 +209,7 @@ elif type == 4: # 琛屼笟浠g爜淇℃伅 dataList = data_process.parseList(_str) - data_process.saveIndustryCode(dataList) + ths_industry_util.save_industry_code(dataList) elif type == 6: # 鍙敤閲戦 datas = data_process.parseData(_str) @@ -217,6 +236,20 @@ volumnUnit = item["volumnUnit"] code_volumn_manager.save_today_volumn(item["code"], volumn, volumnUnit) juejin.accpt_prices(data) + elif type == 50: + data = data_process.parse(_str)["data"] + if data is not None: + index = data["index"] + code_name = data["codeName"] + volumn = data["volumn"] + time_ = data["time"] + code = global_util.name_codes.get(code_name) + if code is None: + global_util.load_name_codes() + code = global_util.name_codes.get(code_name) + if code is not None: + # 淇濆瓨鏁版嵁 + self.buy1_volumn_manager.save(code, time_, volumn) elif type == 30: # 蹇冭烦淇℃伅 @@ -228,7 +261,9 @@ if ths_util.is_ths_dead(client_id): # TODO 閲嶅惎鍚岃姳椤� # 鎶ヨ - alert_util.alarm() + l2_clients = authority.get_l2_clients() + if client_id in l2_clients: + alert_util.alarm() # print("蹇冭烦锛�", client_id) sk.send(return_str.encode()) @@ -250,7 +285,7 @@ try: socketClient.send(json.dumps(data).encode()) recv = socketClient.recv(1024) - result = recv.decode().lstrip() + result = str(recv, encoding="gbk") return result finally: socketClient.close() diff --git a/ths_data.py b/ths_data.py index ff0cd7c..a7041ab 100644 --- a/ths_data.py +++ b/ths_data.py @@ -13,7 +13,7 @@ from scrapy import cmdline from selenium.webdriver import ActionChains from selenium.webdriver.common.by import By -import mongo_data +import mysql_data def save(dn_name, datas): @@ -251,4 +251,4 @@ code = str.split("锛�")[1].strip() _list.append({"_id": name, "first_code": code}) - mongo_data.save("ths-industry", _list) + #mongo_data.save("ths-industry", _list) diff --git a/ths_industry_util.py b/ths_industry_util.py index 9428230..bb2efd5 100644 --- a/ths_industry_util.py +++ b/ths_industry_util.py @@ -3,18 +3,21 @@ """ # 鍚岃姳椤鸿涓� +import time + import global_util -import mongo_data +import mysql_data # 鑾峰彇琛屼笟鏄犲皠 def get_code_industry_maps(): __code_map = {} __industry_map = {} - results = mongo_data.find("ths-industry-codes", {}) + mysqldb = mysql_data.Mysqldb() + results = mysqldb.select_all("select * from ths_industry_codes") for r in results: - code = r["_id"] - industry = r["second_industry"] + code = r[0] + industry = r[1] __code_map[code] = industry if __industry_map.get(industry) is None: __industry_map[industry] = set() @@ -24,6 +27,8 @@ # 璁剧疆琛屼笟鐑害 def set_industry_hot_num(limit_up_datas): + if limit_up_datas is None: + return industry_hot_dict = {} code_industry_map = global_util.code_industry_map if code_industry_map is None or len(code_industry_map) == 0: @@ -44,6 +49,10 @@ percent = float(data["limitUpPercent"]) if percent > 21: percent = 21 + percent = round(percent, 2) + # 淇濆瓨娑ㄥ箙 + global_util.limit_up_codes_percent[code] = percent + industry_hot_dict[industry] = round(industry_hot_dict[industry] + percent, 2) global_util.industry_hot_num = industry_hot_dict @@ -57,7 +66,7 @@ global_util.load_industry() industry = global_util.code_industry_map.get(code) if industry is None: - return None,None + return None, None codes_ = set() for code_ in codes: if global_util.code_industry_map.get(code_) == industry: @@ -66,6 +75,53 @@ return industry, codes_ +# 鑾峰彇杩欎竴鎵规暟鎹殑琛屼笟 +def __get_industry(datas): + ors = [] + codes = set() + for data in datas: + codes.add(data["code"]) + + " or ".join(codes) + for code in codes: + ors.append("first_code='{}'".format(code)) + + mysqldb = mysql_data.Mysqldb() + results = mysqldb.select_all("select * from ths_industry where {}".format(" or ".join(ors))) + + _fname = None + for a in results: + _fname = a[0] + break + print("鏈�缁堢殑浜岀骇琛屼笟鍚嶇О涓猴細", _fname) + return _fname + + +# 淇濆瓨鍗曚釜浠g爜鐨勮涓� +def __save_code_industry(code, industry_name, zyltgb, zyltgb_unit): + mysqldb = mysql_data.Mysqldb() + result = mysqldb.select_one("select * from ths_industry_codes where _id={}".format(code)) + if result is None: + mysqldb.insert( + "insert into ths_industry_codes(code,industry_name,zyltgb,zyltgb_unit) values('{}','{}','{}',{})".format( + code, industry_name, zyltgb, zyltgb_unit, round(time.time() * 1000))) + else: + mysqldb.update( + "update ths_industry_codes set industry_name='{}',zyltgb='{}',zyltgb_unit={} where _id='{}'".format( + industry_name, zyltgb, zyltgb_unit, code)) + + +# 淇濆瓨琛屼笟浠g爜 +def save_industry_code(datasList): + for datas in datasList: + # 鏌ヨ杩欐壒鏁版嵁鎵�灞炶涓� + industry_name = __get_industry(datas) + _list = [] + for data in datas: + # 淇濆瓨 + __save_code_industry(data["code"],industry_name,data["zyltgb"],data["zyltgb_unit"]) + + if __name__ == "__main__": _code_map, _industry_map = get_code_industry_maps() print(_code_map, _industry_map) diff --git a/tool.py b/tool.py index bd2df31..bf5971e 100644 --- a/tool.py +++ b/tool.py @@ -3,6 +3,7 @@ """ import decimal import random +import time import time as t import datetime @@ -47,9 +48,21 @@ return False -if __name__=="__main__": - d1 = decimal.Decimal("0.12") - d2 = decimal.Decimal("0.12") - if d1==d2: - print("123") +def run_time(): + def decorator(func): + def infunc(*args, **kwargs): + start = round(time.time() * 1000) + result = func(args, **kwargs) + print("鎵ц鏃堕棿", round(time.time() * 1000) - start) + return result + return infunc + + return decorator + + +if __name__ == "__main__": + d1 = decimal.Decimal("0.12") + d2 = decimal.Decimal("0.12") + if d1 == d2: + print("123") diff --git a/trade_gui.py b/trade_gui.py index bb64ef9..2d218d1 100644 --- a/trade_gui.py +++ b/trade_gui.py @@ -5,12 +5,16 @@ import array import threading import time +import random import win32gui import win32api import win32con import global_util +import gpcode_manager +import redis_manager +import tool from log import * from threading import Thread @@ -66,9 +70,9 @@ @classmethod def checkEnv(cls): # 妫�娴嬩氦鏄撶獥鍙� - buy_wins = cls.get_buy_wins() - if len(buy_wins) < 3: - raise Exception("闂數涔板叆绐楀彛鏈�浣庨渶瑕�3涓�") + buy_wins = THSBuyWinManagerNew.get_buy_wins() + if len(buy_wins) < 10: + raise Exception("涓嬪崟绐楀彛鏈�浣庨渶瑕�10涓�") # 妫�娴嬫挙鍗曠獥鍙� cancel_trade_win = cls.getCancelBuyWin() @@ -169,7 +173,8 @@ def getLimitUpPrice(self, win): hwnd = win32gui.GetDlgItem(win, 0x000006C8) - return self.getText(hwnd) + text_ = self.getText(hwnd) + return text_.replace("娑ㄥ仠锛�", "") # 鑾峰彇浜ゆ槗缁撴灉 def getTradeResultWin(self): @@ -217,36 +222,40 @@ try: logger_trade_gui.info("寮�濮嬩拱鍏ワ細code-{}".format(code)) if win < 1: - win = self.get_available_buy_win() - if win < 1: + win = THSBuyWinManagerNew.get_distributed_code_win(code) # self.get_available_buy_win() + if win is None or win < 1: raise Exception("鏃犲彲鐢ㄧ殑浜ゆ槗绐楀彛") print("浣跨敤绐楀彛", win) t = time.time() print(t) start = int(round(t * 1000)) - # 杈撳叆浠g爜 - # 浠g爜杈撳叆妗嗙殑鎺т欢ID:0x00000408 - hwnd1 = win32gui.GetDlgItem(win, 0x00000408) - # 鍚嶇О 鍚嶇О鐨勬帶浠禝D:0x0000040C - hwnd_name = win32gui.GetDlgItem(win, 0x0000040C) - self.input_number(hwnd1, code) - # 鏈�澶氱瓑寰�2s閽� - data_fill = False - for i in range(0, 500): - bufSize = win32gui.SendMessage(hwnd_name, win32con.WM_GETTEXTLENGTH, 0, 0) + 1 - print(i, bufSize) - if bufSize > 1: - data_fill = True - break; - time.sleep(0.004) - - if not data_fill: - raise Exception("浠g爜杈撳叆濉厖鍑洪敊") - time.sleep(0.001) + # # 杈撳叆浠g爜 + # # 浠g爜杈撳叆妗嗙殑鎺т欢ID:0x00000408 + # hwnd1 = win32gui.GetDlgItem(win, 0x00000408) + # # 鍚嶇О 鍚嶇О鐨勬帶浠禝D:0x0000040C + # hwnd_name = win32gui.GetDlgItem(win, 0x0000040C) + # self.input_number(hwnd1, code) + # # 鏈�澶氱瓑寰�2s閽� + # data_fill = False + # for i in range(0, 500): + # bufSize = win32gui.SendMessage(hwnd_name, win32con.WM_GETTEXTLENGTH, 0, 0) + 1 + # print(i, bufSize) + # if bufSize > 1: + # data_fill = True + # break; + # time.sleep(0.004) + # + # if not data_fill: + # raise Exception("浠g爜杈撳叆濉厖鍑洪敊") + # time.sleep(0.001) # 楠岃瘉娑ㄥ仠浠� limit_up_price_now = self.getLimitUpPrice(win) + trade_win = THSBuyWinManagerNew.get_trade_win(win) + # if not trade_win: + # error = "浜ゆ槗瀛愮獥鍙f煡鎵惧け璐� {}".format(code) + # raise Exception(error) - # 娴嬭瘯锛屾殏鏃朵笉楠岃瘉娑ㄥ仠浠� + # TODO 鏆傛椂涓嶉獙璇佹定鍋滀环 if not global_util.TEST: if abs(float(limit_up_price_now) - float(limit_up_price)) >= 0.01: error = "娑ㄥ仠浠烽獙璇佸嚭閿� {}-{}".format(limit_up_price, limit_up_price_now) @@ -258,6 +267,7 @@ # win32gui.SendMessage(buy_hwnd, win32con.WM_LBUTTONUP, 0, 0); # 涔板叆 蹇嵎閿瓸 + # 鑾峰彇浜ゆ槗win win32gui.PostMessage(win, win32con.WM_KEYDOWN, 66, 0); logger_trade_gui.info("鎵ц涔板叆缁撴潫锛歝ode-{} 鑰楁椂:{}".format(code, int(round(time.time() * 1000)) - start)) @@ -380,18 +390,432 @@ self.input_number(code_input, "") +class THSGuiUtil: + @classmethod + def getText(cls, hwnd): + bufSize = win32gui.SendMessage(hwnd, win32con.WM_GETTEXTLENGTH, 0, 0) + 1 + buffer = array.array('b', b'\x00\x00' * bufSize) + win32gui.SendMessage(hwnd, win32con.WM_GETTEXT, bufSize, buffer) + text = win32gui.PyGetString(buffer.buffer_info()[0], bufSize - 1) + return text.replace("\x00", "").strip(); + + # 娣诲姞涓嬪崟绐楀彛 + @classmethod + def add_buy_win(cls): + buy_wins = THSGuiTrade().get_buy_wins() + if len(buy_wins) < 1: + raise Exception("娌℃湁涔板叆绐楀彛") + if len(buy_wins) >= 10: + raise Exception("鏈�澶氬彧鑳芥坊鍔�10涓笅鍗曟") + # 澧炲姞绐楀彛鎸夐挳鐨処D锛�00005ED + win = buy_wins[-1] + add_btn = win32gui.GetDlgItem(win, 0x000005ED) + if add_btn <= 0: + raise Exception("娌℃湁鎵惧埌娣诲姞鎸夐挳") + try: + win32gui.SetForegroundWindow(win) + except: + pass + cls.click(add_btn) + for i in range(0, 30): + new_buy_wins = THSGuiTrade().get_buy_wins() + if len(new_buy_wins) - len(buy_wins) >= 1: + # 姹傚樊闆� + list_ = list(set(new_buy_wins).difference(set(buy_wins))) + return list_[0] + else: + time.sleep(0.01) + raise Exception("鏈坊鍔犳垚鍔�") + + # 绐楀彛鏄惁瀛樺湪 + @classmethod + def is_win_exist(cls, win): + try: + result = win32gui.IsWindowVisible(win) + if result: + return True + else: + return False + except: + return False + + # 绐楀彛鏄惁姝e湪灞曠ず + @classmethod + def is_win_show(cls, win): + try: + result = win32gui.GetWindowRect(win) + if result[2] - result[0] > 0 and result[3] - result[1] > 0: + return True + else: + return False + except: + return False + + @classmethod + def click(cls, control): + win32gui.SendMessage(control, win32con.WM_LBUTTONDOWN, 0, 0) + win32gui.SendMessage(control, win32con.WM_LBUTTONUP, 0, 0) + + # 娓呴櫎涔板叆绐楀彛浠g爜 + @classmethod + def clear_buy_window_code(cls, win): + if not cls.is_win_exist(win): + raise Exception("绐楀彛涓嶅瓨鍦�") + + hwnd1 = win32gui.GetDlgItem(win, 0x00000408) + if hwnd1 <= 0: + raise Exception("缂栬緫鎺т欢娌℃壘鍒�") + THSGuiTrade().input_number(hwnd1, "") + + # 璁剧疆涔板叆绐楀彛浠g爜 + @classmethod + def set_buy_window_code(cls, win, code): + if not cls.is_win_exist(win): + raise Exception("绐楀彛涓嶅瓨鍦�") + try: + win32gui.SetForegroundWindow(win) + except: + pass + hwnd1 = win32gui.GetDlgItem(win, 0x00000408) + if hwnd1 <= 0: + raise Exception("缂栬緫鎺т欢娌℃壘鍒�") + THSGuiTrade().input_number(hwnd1, code) + + +# 杩囨椂 鍚岃姳椤轰拱鍏ョ獥鍙g鐞嗗櫒 +class __THSBuyWinManager: + redisManager = redis_manager.RedisManager(2) + + @classmethod + def __get_redis(cls): + return cls.redisManager.getRedis() + + # 淇濆瓨绐楀彛浠g爜鍒嗛厤 + @classmethod + def __save_code_win(cls, code, win): + key = "buywin_distribute-{}".format(code) + cls.__get_redis().setex(key, tool.get_expire(), win) + + # 鑾峰彇绐楀彛鍒嗛厤鐨勪唬鐮� + @classmethod + def __get_code_win(cls, code): + key = "buywin_distribute-{}".format(code) + win = cls.__get_redis().get(key) + if win is not None: + return int(win) + return None + + # 鍒犻櫎浠g爜绐楀彛鍒嗛厤 + @classmethod + def __del_code_win(cls, code): + key = "buywin_distribute-{}".format(code) + cls.__get_redis().delete(key) + + # 鑾峰彇鎵�鏈夊凡缁忓垎閰嶇獥鍙g殑浠g爜 + @classmethod + def __get_distributed_win_codes(cls): + key = "buywin_distribute-*" + keys = cls.__get_redis().keys(key) + codes = [] + for k in keys: + codes.append(k.replace("buywin_distribute-", "")) + return codes + + # 鑾峰彇鍙敤鐨勭獥鍙� + @classmethod + def __get_available_win(cls): + # 鏄惁鏈夊彲鐢ㄧ殑杩樻湭鍒嗛厤鐨勭獥鍙� + key = "buywin_distribute-*" + keys = cls.__get_redis().keys(key) + win_list = THSGuiTrade().get_buy_wins() + if len(win_list) < 1: + raise Exception("蹇呴』瑕佹湁涓�涓拱鍏ョ獥鍙�") + win_set = set(win_list) + for k in keys: + win = int(cls.__get_redis().get(k)) + if win in win_set: + win_set.remove(win) + if len(win_set) > 0: + return win_set.pop() + + # 娌℃湁鍓╀綑鐨勭獥鍙o紝鏂板鍔犵獥鍙� + win = THSGuiUtil.add_buy_win() + if win: + return win + else: + raise Exception("鏂板绐楀彛澶辫触") + + # 涓轰唬鐮佸垎閰嶇獥鍙� + @classmethod + def distribute_win_for_code(cls, code): + # 鑾峰彇鏄惁宸茬粡鍒嗛厤 + win = cls.__get_code_win(code) + if win is not None: + # 宸茬粡鍒嗛厤鐨勭獥鍙f槸鍚︽湁鏁� + if THSGuiUtil.is_win_exist(win): + # 濉厖浠g爜 + THSGuiUtil.set_buy_window_code(win, code) + return win + + # 鑾峰彇鍙敤鐨勭獥鍙� + win = cls.__get_available_win() + if win is None: + raise Exception("绐楀彛宸茬粡鍒嗛厤瀹屾瘯锛屾棤鍙敤绐楀彛") + # 淇濆瓨绐楀彛鍒嗛厤淇℃伅 + cls.__save_code_win(code, win) + THSGuiUtil.set_buy_window_code(win, code) + return win + + # 鍒犻櫎浠g爜绐楀彛鍒嗛厤 + @classmethod + def cancel_distribute_win_for_code(cls, code): + win = cls.__get_code_win(code) + if win is not None: + # 娓呴櫎浠g爜 + THSGuiUtil.clear_buy_window_code(win) + cls.__del_code_win(code) + + # 鑾峰彇浠g爜宸茬粡鍒嗛厤鐨勭獥鍙� + @classmethod + def get_distributed_code_win(cls, code): + win = cls.__get_code_win(code) + if not THSGuiUtil.is_win_exist(win): + # 鍒犻櫎涓嶅瓨鍦ㄧ殑绐楀彛 + cls.__del_code_win(code) + return None + return win + + @classmethod + def fill_codes(cls, codes): + codes_ = gpcode_manager.get_gp_list() + # 鍏堝垹闄ゆ病鏈夌殑浠g爜 + old_codes = cls.__get_distributed_win_codes() + for code in old_codes: + if code not in codes_: + cls.cancel_distribute_win_for_code(code) + add_codes = codes[0:10] + del_codes = codes[10:] + + for code in del_codes: + cls.cancel_distribute_win_for_code(code) + + for code in add_codes: + # 宸茬粡鍔犲叆杩涘幓鐨勪笉鍋氭搷浣� + if code in old_codes: + continue + win = cls.distribute_win_for_code(code) + print("鍒嗛厤鐨勭獥鍙�:", win, THSGuiUtil.is_win_exist(win)) + + +# 鍚岃姳椤轰拱鍏ョ獥鍙g鐞嗗櫒 +class THSBuyWinManagerNew: + redisManager = redis_manager.RedisManager(2) + + @classmethod + def get_buy_wins(cls): + buy_win_list = [] + hWndList = [] + main_hwnd = None + win32gui.EnumWindows(lambda hWnd, param: param.append(hWnd), hWndList) + for hwnd in hWndList: + if THSGuiUtil.getText(hwnd) == "涓撲笟鐗堜笅鍗�": + main_hwnd = hwnd + break + if not main_hwnd: + raise Exception("涓撲笟鐗堜笅鍗曟湭鎵撳紑") + child_win = None + for i in range(0, 20): + child_win = win32gui.FindWindowEx(main_hwnd, child_win, "#32770", None) + if not child_win: + break + if not win32gui.IsWindowVisible(child_win): + continue + temp = win32gui.FindWindowEx(child_win, None, "Button", "鎾ゅ崟") + if temp: + buy_win_list.append(child_win) + return buy_win_list + + @classmethod + def get_trade_win(cls, win): + # 鑾峰彇浜ゆ槗绐楀彛 + child_child_win = None + for j in range(0, 10): + child_child_win = win32gui.FindWindowEx(win, child_child_win, "#32770", None) + if not child_child_win: + break + temp = win32gui.FindWindowEx(child_child_win, None, "Edit", None) + if temp: + return child_child_win + return None + + @classmethod + def __get_redis(cls): + return cls.redisManager.getRedis() + + # 淇濆瓨绐楀彛浠g爜鍒嗛厤 + @classmethod + def __save_code_win(cls, code, win): + key = "buywin_distribute-{}".format(code) + cls.__get_redis().setex(key, tool.get_expire(), win) + + # 鑾峰彇绐楀彛鍒嗛厤鐨勪唬鐮� + @classmethod + def __get_code_win(cls, code): + key = "buywin_distribute-{}".format(code) + win = cls.__get_redis().get(key) + if win is not None: + return int(win) + return None + + # 鍒犻櫎浠g爜绐楀彛鍒嗛厤 + @classmethod + def __del_code_win(cls, code): + key = "buywin_distribute-{}".format(code) + cls.__get_redis().delete(key) + + # 鑾峰彇鎵�鏈夊凡缁忓垎閰嶇獥鍙g殑浠g爜 + @classmethod + def __get_distributed_win_codes(cls): + key = "buywin_distribute-*" + keys = cls.__get_redis().keys(key) + codes = [] + for k in keys: + codes.append(k.replace("buywin_distribute-", "")) + return codes + + # 鑾峰彇鍙敤鐨勭獥鍙� + @classmethod + def __get_available_win(cls): + # 鏄惁鏈夊彲鐢ㄧ殑杩樻湭鍒嗛厤鐨勭獥鍙� + key = "buywin_distribute-*" + keys = cls.__get_redis().keys(key) + win_list = cls.get_buy_wins() + if len(win_list) < 1: + raise Exception("蹇呴』瑕佹湁涓�涓拱鍏ョ獥鍙�") + win_set = set(win_list) + for k in keys: + win = int(cls.__get_redis().get(k)) + if win in win_set: + win_set.remove(win) + if len(win_set) > 0: + return win_set.pop() + + # 娌℃湁鍓╀綑鐨勭獥鍙o紝鏂板鍔犵獥鍙� + raise Exception("娌℃湁鍓╀綑绐楀彛") + + # 涓轰唬鐮佸垎閰嶇獥鍙� + @classmethod + def distribute_win_for_code(cls, code): + # 鑾峰彇鏄惁宸茬粡鍒嗛厤 + win = cls.__get_code_win(code) + if win is not None: + # 宸茬粡鍒嗛厤鐨勭獥鍙f槸鍚︽湁鏁� + if THSGuiUtil.is_win_exist(win): + # 濉厖浠g爜 + THSGuiUtil.set_buy_window_code(cls.get_trade_win(win), code) + return win + + # 鑾峰彇鍙敤鐨勭獥鍙� + win = cls.__get_available_win() + if win is None: + raise Exception("绐楀彛宸茬粡鍒嗛厤瀹屾瘯锛屾棤鍙敤绐楀彛") + # 淇濆瓨绐楀彛鍒嗛厤淇℃伅 + cls.__save_code_win(code, win) + THSGuiUtil.set_buy_window_code(cls.get_trade_win(win), code) + return win + + # 鍒犻櫎浠g爜绐楀彛鍒嗛厤 + @classmethod + def cancel_distribute_win_for_code(cls, code): + win = cls.__get_code_win(code) + if win is not None: + # 娓呴櫎浠g爜 + try: + THSGuiUtil.clear_buy_window_code(win) + except: + pass + cls.__del_code_win(code) + + # 鑾峰彇浠g爜宸茬粡鍒嗛厤鐨勭獥鍙� + @classmethod + def get_distributed_code_win(cls, code): + win = cls.__get_code_win(code) + if not THSGuiUtil.is_win_exist(win): + # 鍒犻櫎涓嶅瓨鍦ㄧ殑绐楀彛 + cls.__del_code_win(code) + return None + return win + + # 鑾峰彇浠g爜鍚嶇О + @classmethod + def __get_code_name(cls, win): + trade_win = cls.get_trade_win(win) + if trade_win is None: + return None + code_name_win = win32gui.GetDlgItem(trade_win, 0x000005C2) + return THSGuiUtil.getText(code_name_win) + + @classmethod + def fill_codes(cls, codes): + name_codes = gpcode_manager.get_name_codes() + codes_ = gpcode_manager.get_gp_list() + # 鍏堝垹闄ゆ病鏈夌殑浠g爜 + old_codes = cls.__get_distributed_win_codes() + for code in old_codes: + if code not in codes_: + cls.cancel_distribute_win_for_code(code) + add_codes = codes[0:10] + del_codes = codes[10:] + + for code in del_codes: + cls.cancel_distribute_win_for_code(code) + + for code in add_codes: + # 宸茬粡鍔犲叆杩涘幓鐨勪笉鍋氭搷浣� + if code in old_codes: + # 鏍¢獙浠g爜鏄惁濉厖瀵� + win = cls.__get_code_win(code) + if not THSGuiUtil.is_win_exist(win): + cls.cancel_distribute_win_for_code(code) + else: + code_name = cls.__get_code_name(win) + if name_codes.get(code_name) != code: + cls.cancel_distribute_win_for_code(code) + continue + win = cls.distribute_win_for_code(code) + print("鍒嗛厤鐨勭獥鍙�:", win, THSGuiUtil.is_win_exist(win)) + + +class GUITest: + def test_distribute(self): + codes = ["300396", "688656", "688029", "688787", "688016", "002659", "002777", "603318", "000333", "003004", + "002882", "300014", "688981", "002531"] + + for i in range(10, len(codes)): + THSBuyWinManagerNew.cancel_distribute_win_for_code(codes[i]) + + for i in range(0, 10): + win = THSBuyWinManagerNew.distribute_win_for_code(codes[i]) + time.sleep(1) + print("鍒嗛厤鐨勭獥鍙�:", win, THSGuiUtil.is_win_exist(win)) + + random.shuffle(codes) + print(codes[0:10]) + for i in range(10, len(codes)): + THSBuyWinManagerNew.cancel_distribute_win_for_code(codes[i]) + + for i in range(0, 10): + win = THSBuyWinManagerNew.distribute_win_for_code(codes[i]) + time.sleep(1) + print("鍒嗛厤鐨勭獥鍙�:", win, THSGuiUtil.is_win_exist(win)) + + # THSBuyWinManager.cancel_distribute_win_for_code("600125") + + if __name__ == '__main__': - try: - # THSGuiTrade.checkEnv(); - # print("鐜姝e父") - trade = THSGuiTrade(); - print(id(trade)) - # win = trade.get_available_buy_win() - # if win < 1: - # raise Exception("鏃犲彲鐢ㄧ殑浜ゆ槗绐楀彛") - # result = trade.buy("002564", "7.26") - # # print("浜ゆ槗鎴愬姛") - # time.sleep(0.2) - trade.cancel_buy("000716") - except Exception as e: - print(e) + THSGuiTrade().buy("002853", "18.98", THSBuyWinManagerNew.get_buy_wins()[5]) + # GUITest().test_distribute() + # try: + # THSGuiUtil.set_buy_window_code(0x000112D0, "000333") + # except Exception as e: + # print(e) diff --git a/trade_manager.py b/trade_manager.py index e82fae4..615145d 100644 --- a/trade_manager.py +++ b/trade_manager.py @@ -9,7 +9,7 @@ import gpcode_manager import l2_code_operate -import mongo_data +import mysql_data import tool from trade_data_manager import TradeBuyDataManager from trade_gui import THSGuiTrade, async_call @@ -85,6 +85,7 @@ redis = __redis_manager.getRedis() time_str = datetime.datetime.now().strftime("%H:%M:%S") redis.setex("trade-success-latest-time", tool.get_expire(), time_str) + mysqldb = mysql_data.Mysqldb() for data in datas: _time = data["time"] # 杩囨护閿欒鏁版嵁 @@ -93,22 +94,35 @@ data["_id"] = data["trade_num"] data["day"] = day data["create_time"] = int(round(t.time() * 1000)) - count = mongo_data.count("ths-trade-success-record", {"_id": data["_id"]}) - if count is None or count < 1: - mongo_data.save_one("ths-trade-success-record", data) + counts = mysqldb.select_one("select count(*) from ths_trade_success_record where _id='{}'".format(data["_id"])) + if counts[0] < 1: + mysqldb.execute( + "insert into ths_trade_success_record(_id,code,money,num,price,time,trade_num,type,day,create_time) values('{}','{}','{}','{}','{}','{}','{}',{},'{}',{})".format( + data["_id"], data["code"], data["money"], data["num"], data["price"], data["time"], + data["trade_num"], data["type"], data["day"], round(t.time() * 1000))) + else: + mysqldb.execute( + "update ths_trade_success_record set money=%s, num=%s, price=%s,time=%s,trade_num=%s,type=%s where _id=%s",( + data["money"], data["num"], data["price"], data["time"], data["trade_num"], data["type"],data["_id"])) # 淇濆瓨浜ゆ槗濮旀墭鏁版嵁 def save_trade_delegate_data(datas): day = datetime.datetime.now().strftime("%Y%m%d") time_str = datetime.datetime.now().strftime("%H:%M:%S") + mysqldb = mysql_data.Mysqldb() for data in datas: data["_id"] = "{}-{}-{}".format(day, data["code"], data["time"]) data["day"] = day data["create_time"] = int(round(t.time() * 1000)) - count = mongo_data.count("ths-trade-delegate-record", {"_id": data["_id"]}) - if count is None or count < 1: - mongo_data.save_one("ths-trade-delegate-record", data) + counts = mysqldb.select_one("select count(*) from ths_trade_delegate_record where _id='{}'".format(data["_id"])) + if counts[0] < 1: + mysqldb.execute( + "insert into ths_trade_delegate_record(_id,code,num,price,time,trade_num,trade_price,type,day,create_time) values(%s,%s,%s,%s,%s,%s,%s,%s,%s,%d)", + ( + data["_id"], data["code"], data["num"], data["price"], data["time"], + data["trade_num"],data["trade_price"], data["type"], data["day"], round(t.time() * 1000))) + # 淇濆瓨鏈�鏂扮殑濮旀墭鏁版嵁 redis = __redis_manager.getRedis() redis.setex("trade-delegate-latest", tool.get_expire(), json.dumps(datas)) @@ -119,7 +133,15 @@ def get_trade_success_data(): redis = __redis_manager.getRedis() day = datetime.datetime.now().strftime("%Y%m%d") - return mongo_data.find("ths-trade-success-record", {"day": day}), redis.get("trade-success-latest-time") + mysqldb = mysql_data.Mysqldb() + results = mysqldb.select_all("select * from ths_trade_success_record where day='{}'".format(day)) + datas = [] + for result in results: + data = {"_id": result[0], "code": result[1], "money": result[2], "num": result[3], "price": result[4], + "time": result[5], "trade_num": result[6], "type": result[7], "day": result[8], + "create_time": result[9]} + datas.append(data) + return datas, redis.get("trade-success-latest-time") # 鑾峰彇浜ゆ槗濮旀墭鏁版嵁 @@ -316,8 +338,12 @@ redis_info = redis_manager.RedisManager(0).getRedis() keys = redis_info.keys("*{}*".format(code)) for k in keys: - if k.find("pre") is None or k.find("pre") or k.find("zyltgb") < 0: - redis_info.delete(k) + if k.find("pre") is not None: + continue + if k.find("zyltgb") is not None: + continue + + redis_info.delete(k) def __clear_big_data(): @@ -330,6 +356,6 @@ if __name__ == "__main__": # time_str = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") # print(time_str) - # __clear_data("002093") - __clear_big_data() + __clear_data("002388") + # __clear_big_data() pass diff --git a/trade_queue_manager.py b/trade_queue_manager.py new file mode 100644 index 0000000..adc48df --- /dev/null +++ b/trade_queue_manager.py @@ -0,0 +1,134 @@ +# 涔�1閲忕鐞� +import decimal +import json + +import gpcode_manager +import redis_manager +import tool + + +class THSBuy1VolumnManager: + __redisManager = redis_manager.RedisManager(1) + __last_data = {} + __code_time_volumn_dict = {} + + def __get_redis(self): + return self.__redisManager.getRedis() + + def __save_recod(self, code, time_str, volumn): + + # 淇濆瓨姣忎竴娆$殑 + key = "buy1_volumn-{}-{}".format(code, time_str) + self.__get_redis().setex(key, tool.get_expire(), volumn) + # 淇濆瓨鏈�杩戠殑 + key = "buy1_volumn_latest_info-{}".format(code) + self.__get_redis().setex(key, tool.get_expire(), json.dumps((time_str, volumn))) + + # 淇濆瓨涓婁竴娆℃暟鎹� + def __save_last_recod(self, code, time_str, volumn): + # 淇濆瓨鏈�杩戠殑 + key = "buy1_volumn_last_info-{}".format(code) + self.__get_redis().setex(key, tool.get_expire(), json.dumps((time_str, volumn))) + + def __get_last_record(self, code): + key = "buy1_volumn_last_info-{}".format(code) + val = self.__get_redis().get(key) + if val is None: + return None, None + val = json.loads(val) + return val[0], val[1] + + def __get_latest_record(self, code): + key = "buy1_volumn_latest_info-{}".format(code) + val = self.__get_redis().get(key) + if val is None: + return None, None + val = json.loads(val) + return val[0], val[1] + + # 杩斿洖鏄惁闇�瑕佹洿鏂版暟鎹� + def save(self, code, time_str, volumn): + if volumn < 1: + return False + # 涓嶄繚瀛樺拰涓婁竴娆$浉鍚岀殑鏁版嵁 + if code in self.__last_data and self.__last_data[code] == volumn: + return False + self.__last_data[code] = volumn + + if code not in self.__code_time_volumn_dict: + self.__code_time_volumn_dict[code] = {} + self.__code_time_volumn_dict[code][time_str] = volumn + # 鍒犻櫎鍊掓暟绗�2涓箣鍓嶇殑鏁版嵁 + keys = [] + for k in self.__code_time_volumn_dict[code].keys(): + keys.append(k) + keys.sort(key=lambda val: int(val.replace(":", ""))) + if len(keys) > 2: + for i in range(0, len(keys) - 2): + self.__code_time_volumn_dict[code].pop(keys[i]) + keys = keys[len(keys) - 2:] + if len(keys) == 2: + self.__save_last_recod(code, keys[0], self.__code_time_volumn_dict[code][keys[0]]) + + self.__save_recod(code, time_str, volumn) + return True + + # 鑾峰彇鏍¢獙鏁版嵁 + # 杩斿洖涓婁竴娆$殑鏁版嵁锛屽鏋滄病鏈変笂涓�娆$殑灏辫繑鍥炴湰娆$殑 + def get_verify_data(self, code): + time_str, volumn = self.__get_last_record(code) + if time_str is not None: + return time_str, volumn + time_str, volumn = self.__get_latest_record(code) + return time_str, volumn + + +class JueJinBuy1VolumnManager: + __redisManager = redis_manager.RedisManager(1) + __last_data = {} + + def __get_redis(self): + return self.__redisManager.getRedis() + + def __save_recod(self, code, time_str, volumn): + # 淇濆瓨姣忎竴娆$殑 + key = "buy1_volumn_juejin-{}-{}".format(code, time_str) + self.__get_redis().setex(key, tool.get_expire(), volumn) + key = "buy1_volumn_juejin_latest_info-{}".format(code) + self.__get_redis().setex(key, tool.get_expire(), volumn) + + def __get_latest_record(self, code): + key = "buy1_volumn_juejin_latest_info-{}".format(code) + val = self.__get_redis().get(key) + if val is None: + return None, None + val = json.loads(val) + return val[0], val[1] + + # 杩斿洖鏄惁闇�瑕佹洿鏂版暟鎹� + def save(self, code, time_str, volumn, price): + + # 鍒ゆ柇鏄惁涓烘定鍋滀环 + limit_up_price = gpcode_manager.get_limit_up_price(code) + if limit_up_price != tool.to_price(decimal.Decimal(price)): + # 闈炴定鍋滀环 + return False + + if volumn < 1: + return False + # 涓嶄繚瀛樺拰涓婁竴娆$浉鍚岀殑鏁版嵁 + if code in self.__last_data and self.__last_data[code] == volumn: + return False + self.__last_data[code] = volumn + self.__save_recod(code, time_str, volumn) + return True + + # 鑾峰彇鏍¢獙鏁版嵁 + # 杩斿洖涓婁竴娆$殑鏁版嵁锛屽鏋滄病鏈変笂涓�娆$殑灏辫繑鍥炴湰娆$殑 + def get_verify_data(self, code): + time_str, volumn = self.__get_latest_record(code) + return time_str, volumn + +if __name__ == '__main__': + + JueJinBuy1VolumnManager().save("001203", "15:00:00", 40586553, 12.12) \ No newline at end of file -- Gitblit v1.8.0