# 数据处理 import decimal import json import logging import time as t import random import datetime import authority import mysql import redis_manager import gpcode_manager import mongo_data # 统计今日卖出 # 统计今日买入 import tool __redisManager = redis_manager.RedisManager(0) def _mysql_insert_data(day, code, item, conn): try: with conn.cursor() as cursor: sql = f"insert into level2_data(day,code,time,price,num,limit_price,operate_type,cancel_time,cancel_time_unit, md5,create_time) values ('{day}','{code}','{item['time']}','{item['price']}',{item['num']},{item['limitPrice']},{item['operateType']},{item['cancelTime']},{item['cancelTimeUnit']},'{item['md5']}',now())" print(sql) cursor.execute(sql) conn.commit() except Exception as e: conn.rollback() def _mysql_update_data(item, conn): try: with conn.cursor() as cursor: sql = "update level2_data set re = {}, update_time=now() where md5='{}'".format(item['re'], item['md5']) print(sql) cursor.execute(sql) conn.commit() except Exception as e: conn.rollback() def parse(str): dict = json.loads(str) return dict def toJson(_dict): return json.dumps(_dict) def parseType(str): try: dict = json.loads(str) return dict["type"] except Exception as e: logging.exception(e) return -1 def parseGPCode(str): dict = json.loads(str) data = dict["data"] return data def parseList(str): _dict = json.loads(str) data = _dict["data"] if data is not None: return data else: return [] def parseData(str): _dict = json.loads(str) data = _dict["data"] return data def parseL2TradeQueueData(str): dict = json.loads(str) data = dict["data"] code = data["code"] trade_data = data["data"] return code, trade_data # 代码对应的价格是否正确 def is_same_code_with_price(code, price): # 昨日收盘价 price_close = gpcode_manager.get_price_pre(code) max_price = tool.to_price(decimal.Decimal(str(price_close)) * decimal.Decimal("1.1")) min_price = tool.to_price(decimal.Decimal(str(price_close)) * decimal.Decimal("0.9")) if min_price <= decimal.Decimal(str(price)) <= max_price: return True return False # 保存L2交易队列 def saveL2TradeQueueData(code, data): redis = __redisManager.getRedis() data_str = json.dumps(data) key = "trade-queue-{}".format(code) # 保存5s的数据 redis.setex(key, 5, data_str) # 获取L2交易队列 def getL2TradeQueueData(code): redis = __redisManager.getRedis() key = "trade-queue-{}".format(code) data_str = redis.get(key) if data_str is None or len(data_str) <= 0: return None return json.loads(data_str) def _getIndustry(datas): ors = [] codes = set() for data in datas: codes.add(data["code"]) for code in codes: ors.append({'first_code': code}) result = mongo_data.find("ths-industry", {'$or': ors}) _fname = None for a in result: _fname = a["_id"] break print("最终的二级行业名称为:", _fname) return _fname def saveIndustryCode(datasList): for datas in datasList: # 查询这批数据所属行业 industry_name = _getIndustry(datas); _list = [] for data in datas: # 保存 _dict = {"_id": data["code"]} _dict["second_industry"] = industry_name _dict["zyltgb"] = data["zyltgb"] _dict["zyltgb_unit"] = data["zyltgb_unit"] _dict["update_time"] = int(round(t.time() * 1000)) _list.append(_dict) mongo_data.save("ths-industry-codes", _list) # 保存自由流通市值 def saveZYLTSZ(datasList): _list = [] for data in datasList: # 保存 _dict = {"_id": data["code"], "zyltgb": data["zyltgb"], "zyltgb_unit": data["zyltgb_unit"], "update_time": int(round(t.time() * 1000))} if float(data["zyltgb"]) > 0: _list.append(_dict) mongo_data.save("ths-zylt", _list) def saveClientActive(client_id, host, thsDead): if client_id <= 0: return redis = __redisManager.getRedis(); redis.setex("client-active-{}".format(client_id), 10, json.dumps((host, thsDead))) def getValidL2Clients(): redis = __redisManager.getRedis(); keys = redis.keys("client-active-*") client_ids = [] for k in keys: _id = k.split("client-active-")[1] client_ids.append(_id) l2_clients = authority.get_l2_clients() return list(set(client_ids).intersection(set(l2_clients))) # 获取客户端IP def getActiveClientIP(client_id): redis = __redisManager.getRedis(); val = redis.get("client-active-{}".format(client_id)) if val is None: return None val=json.loads(val) return val[0] # 获取客户端同花顺状态 def getTHSState(client_id): redis = __redisManager.getRedis(); val = redis.get("client-active-{}".format(client_id)) if val is None: return None val = json.loads(val) return val[1] # 保存量能 def saveCodeVolumn(datas): redis = __redisManager.getRedis() for key in datas: k = "volumn-max-{}".format(key) redis.setex(k, tool.get_expire(), datas[key]["max_volumn"]) k = "volumn-latest-{}".format(key) redis.setex(k, tool.get_expire(), datas[key]["latest_volumn"]) if __name__ == '__main__': print(getActiveClientIP(3))