From 892b50e242e3c59a738b92dfdfee1bf1ff8932f2 Mon Sep 17 00:00:00 2001 From: Administrator <admin@example.com> Date: 星期五, 21 十月 2022 16:59:58 +0800 Subject: [PATCH] 新策略修改 --- data_process.py | 69 ++++------------------------------ 1 files changed, 8 insertions(+), 61 deletions(-) diff --git a/data_process.py b/data_process.py index 2dafd7d..fbe6a51 100644 --- a/data_process.py +++ b/data_process.py @@ -5,9 +5,9 @@ import time as t import authority +import mysql_data import redis_manager import gpcode_manager -import mongo_data # 缁熻浠婃棩鍗栧嚭 # 缁熻浠婃棩涔板叆 @@ -17,27 +17,6 @@ __redisManager = redis_manager.RedisManager(0) - -def _mysql_insert_data(day, code, item, conn): - try: - with conn.cursor() as cursor: - sql = f"insert into level2_data(day,code,time,price,num,limit_price,operate_type,cancel_time,cancel_time_unit, md5,create_time) values ('{day}','{code}','{item['time']}','{item['price']}',{item['num']},{item['limitPrice']},{item['operateType']},{item['cancelTime']},{item['cancelTimeUnit']},'{item['md5']}',now())" - print(sql) - cursor.execute(sql) - conn.commit() - except Exception as e: - conn.rollback() - - -def _mysql_update_data(item, conn): - try: - with conn.cursor() as cursor: - sql = "update level2_data set re = {}, update_time=now() where md5='{}'".format(item['re'], item['md5']) - print(sql) - cursor.execute(sql) - conn.commit() - except Exception as e: - conn.rollback() def parse(str): @@ -117,53 +96,21 @@ return json.loads(data_str) -def _getIndustry(datas): - ors = [] - codes = set() - for data in datas: - codes.add(data["code"]) - - for code in codes: - ors.append({'first_code': code}) - result = mongo_data.find("ths-industry", {'$or': ors}) - - _fname = None - for a in result: - _fname = a["_id"] - break - print("鏈�缁堢殑浜岀骇琛屼笟鍚嶇О涓猴細", _fname) - return _fname - - -def saveIndustryCode(datasList): - for datas in datasList: - # 鏌ヨ杩欐壒鏁版嵁鎵�灞炶涓� - industry_name = _getIndustry(datas); - _list = [] - for data in datas: - # 淇濆瓨 - _dict = {"_id": data["code"]} - _dict["second_industry"] = industry_name - _dict["zyltgb"] = data["zyltgb"] - _dict["zyltgb_unit"] = data["zyltgb_unit"] - _dict["update_time"] = int(round(t.time() * 1000)) - _list.append(_dict) - mongo_data.save("ths-industry-codes", _list) - - # 淇濆瓨鑷敱娴侀�氬競鍊� def saveZYLTSZ(datasList): - redis = __redisManager.getRedis() - _list = [] + mysqldb = mysql_data.Mysqldb() for data in datasList: # 淇濆瓨 _dict = {"_id": data["code"], "zyltgb": data["zyltgb"], "zyltgb_unit": data["zyltgb_unit"], "update_time": int(round(t.time() * 1000))} if float(data["zyltgb"]) > 0: - _list.append(_dict) # 淇濆瓨10澶� ZYLTGBUtil.save(data["code"], data["zyltgb"], data["zyltgb_unit"]) - mongo_data.save("ths-zylt", _list) + result = mysqldb.select_one("select * from ths_zylt where _id='{}'".format(data["code"])) + if result is None: + mysqldb.execute("insert into ths_zylt(_id,zyltgb,zyltgb_unit,update_time) values ('{}','{}',{},{})".format(data["code"],data["zyltgb"],data["zyltgb_unit"],round(t.time()*1000))) + else: + mysqldb.execute("update ths_zylt set zyltgb='{}',zyltgb_unit={},update_time={} where _id='{}'".format(data["zyltgb"],data["zyltgb_unit"],round(t.time()*1000),data["code"])) def saveClientActive(client_id, host, thsDead): @@ -182,7 +129,7 @@ _id = k.split("client-active-")[1] # 瀹㈡埛绔悓鑺遍『娌″崱姝绘墠鑳藉姞鍏� if not ths_util.is_ths_dead(_id): - client_ids.append(_id) + client_ids.append(int(_id)) l2_clients = authority.get_l2_clients() return list(set(client_ids).intersection(set(l2_clients))) -- Gitblit v1.8.0