# 数据处理 import decimal import json import logging import time as t import authority import mysql_data import redis_manager import gpcode_manager # 统计今日卖出 # 统计今日买入 import ths_util import tool from code_data_util import ZYLTGBUtil __redisManager = redis_manager.RedisManager(0) def parse(str): dict = json.loads(str) return dict def toJson(_dict): return json.dumps(_dict) def parseType(str): try: dict = json.loads(str) return dict["type"] except Exception as e: logging.exception(e) return -1 def parseGPCode(str): dict = json.loads(str) data = dict["data"] return data def parseList(str): _dict = json.loads(str) data = _dict["data"] if data is not None: return data else: return [] def parseData(str): _dict = json.loads(str) data = _dict["data"] return data def parseL2TradeQueueData(str): dict = json.loads(str) data = dict["data"] code = data["code"] trade_data = data["data"] return code, trade_data # 代码对应的价格是否正确 def is_same_code_with_price(code, price): # 昨日收盘价 price_close = gpcode_manager.get_price_pre(code) max_price = tool.to_price(decimal.Decimal(str(price_close)) * decimal.Decimal("1.1")) min_price = tool.to_price(decimal.Decimal(str(price_close)) * decimal.Decimal("0.9")) if min_price <= decimal.Decimal(str(price)) <= max_price: return True return False # 保存L2交易队列 def saveL2TradeQueueData(code, data): redis = __redisManager.getRedis() data_str = json.dumps(data) key = "trade-queue-{}".format(code) # 保存5s的数据 redis.setex(key, 5, data_str) # 获取L2交易队列 def getL2TradeQueueData(code): redis = __redisManager.getRedis() key = "trade-queue-{}".format(code) data_str = redis.get(key) if data_str is None or len(data_str) <= 0: return None return json.loads(data_str) # 保存自由流通市值 def saveZYLTSZ(datasList): mysqldb = mysql_data.Mysqldb() for data in datasList: # 保存 _dict = {"_id": data["code"], "zyltgb": data["zyltgb"], "zyltgb_unit": data["zyltgb_unit"], "update_time": int(round(t.time() * 1000))} if float(data["zyltgb"]) > 0: # 保存10天 ZYLTGBUtil.save(data["code"], data["zyltgb"], data["zyltgb_unit"]) result = mysqldb.select_one("select * from ths_zylt where _id='{}'".format(data["code"])) if result is None: mysqldb.execute("insert into ths_zylt(_id,zyltgb,zyltgb_unit,update_time) values ('{}','{}',{},{})".format(data["code"],data["zyltgb"],data["zyltgb_unit"],round(t.time()*1000))) else: mysqldb.execute("update ths_zylt set zyltgb='{}',zyltgb_unit={},update_time={} where _id='{}'".format(data["zyltgb"],data["zyltgb_unit"],round(t.time()*1000),data["code"])) def saveClientActive(client_id, host, thsDead): if client_id <= 0: return redis = __redisManager.getRedis(); redis.setex("client-active-{}".format(client_id), 10, json.dumps((host, thsDead))) ths_util.set_ths_dead_state(client_id, thsDead) def getValidL2Clients(): redis = __redisManager.getRedis(); keys = redis.keys("client-active-*") client_ids = [] for k in keys: _id = k.split("client-active-")[1] # 客户端同花顺没卡死才能加入 if not ths_util.is_ths_dead(_id): client_ids.append(int(_id)) l2_clients = authority.get_l2_clients() return list(set(client_ids).intersection(set(l2_clients))) # 获取客户端IP def getActiveClientIP(client_id): redis = __redisManager.getRedis(); val = redis.get("client-active-{}".format(client_id)) if val is None: return None val = json.loads(val) return val[0] # 获取客户端同花顺状态 def getTHSState(client_id): redis = __redisManager.getRedis(); val = redis.get("client-active-{}".format(client_id)) if val is None: return None val = json.loads(val) return val[1] # 保存量能 def saveCodeVolumn(datas): redis = __redisManager.getRedis() for key in datas: k = "volumn-max-{}".format(key) redis.setex(k, tool.get_expire(), datas[key]["max_volumn"]) k = "volumn-latest-{}".format(key) redis.setex(k, tool.get_expire(), datas[key]["latest_volumn"]) if __name__ == '__main__': print(getActiveClientIP(3))