# 数据处理
|
import decimal
|
import json
|
import logging
|
import time as t
|
|
import authority
|
import redis_manager
|
import gpcode_manager
|
import mongo_data
|
|
# 统计今日卖出
|
# 统计今日买入
|
import ths_util
|
import tool
|
from code_data_util import ZYLTGBUtil
|
|
__redisManager = redis_manager.RedisManager(0)
|
|
|
def _mysql_insert_data(day, code, item, conn):
|
try:
|
with conn.cursor() as cursor:
|
sql = f"insert into level2_data(day,code,time,price,num,limit_price,operate_type,cancel_time,cancel_time_unit, md5,create_time) values ('{day}','{code}','{item['time']}','{item['price']}',{item['num']},{item['limitPrice']},{item['operateType']},{item['cancelTime']},{item['cancelTimeUnit']},'{item['md5']}',now())"
|
print(sql)
|
cursor.execute(sql)
|
conn.commit()
|
except Exception as e:
|
conn.rollback()
|
|
|
def _mysql_update_data(item, conn):
|
try:
|
with conn.cursor() as cursor:
|
sql = "update level2_data set re = {}, update_time=now() where md5='{}'".format(item['re'], item['md5'])
|
print(sql)
|
cursor.execute(sql)
|
conn.commit()
|
except Exception as e:
|
conn.rollback()
|
|
|
def parse(str):
|
dict = json.loads(str)
|
return dict
|
|
|
def toJson(_dict):
|
return json.dumps(_dict)
|
|
|
def parseType(str):
|
try:
|
dict = json.loads(str)
|
return dict["type"]
|
except Exception as e:
|
logging.exception(e)
|
return -1
|
|
|
def parseGPCode(str):
|
dict = json.loads(str)
|
data = dict["data"]
|
return data
|
|
|
def parseList(str):
|
_dict = json.loads(str)
|
data = _dict["data"]
|
if data is not None:
|
return data
|
else:
|
return []
|
|
|
def parseData(str):
|
_dict = json.loads(str)
|
data = _dict["data"]
|
return data
|
|
|
def parseL2TradeQueueData(str):
|
dict = json.loads(str)
|
data = dict["data"]
|
code = data["code"]
|
trade_data = data["data"]
|
return code, trade_data
|
|
|
# 代码对应的价格是否正确
|
def is_same_code_with_price(code, price):
|
# 昨日收盘价
|
price_close = gpcode_manager.get_price_pre(code)
|
max_price = tool.to_price(decimal.Decimal(str(price_close)) * decimal.Decimal("1.1"))
|
min_price = tool.to_price(decimal.Decimal(str(price_close)) * decimal.Decimal("0.9"))
|
if min_price <= decimal.Decimal(str(price)) <= max_price:
|
return True
|
return False
|
|
|
# 保存L2交易队列
|
def saveL2TradeQueueData(code, data):
|
redis = __redisManager.getRedis()
|
data_str = json.dumps(data)
|
key = "trade-queue-{}".format(code)
|
# 保存5s的数据
|
redis.setex(key, 5, data_str)
|
|
|
# 获取L2交易队列
|
def getL2TradeQueueData(code):
|
redis = __redisManager.getRedis()
|
key = "trade-queue-{}".format(code)
|
data_str = redis.get(key)
|
if data_str is None or len(data_str) <= 0:
|
return None
|
return json.loads(data_str)
|
|
|
def _getIndustry(datas):
|
ors = []
|
codes = set()
|
for data in datas:
|
codes.add(data["code"])
|
|
for code in codes:
|
ors.append({'first_code': code})
|
result = mongo_data.find("ths-industry", {'$or': ors})
|
|
_fname = None
|
for a in result:
|
_fname = a["_id"]
|
break
|
print("最终的二级行业名称为:", _fname)
|
return _fname
|
|
|
def saveIndustryCode(datasList):
|
for datas in datasList:
|
# 查询这批数据所属行业
|
industry_name = _getIndustry(datas);
|
_list = []
|
for data in datas:
|
# 保存
|
_dict = {"_id": data["code"]}
|
_dict["second_industry"] = industry_name
|
_dict["zyltgb"] = data["zyltgb"]
|
_dict["zyltgb_unit"] = data["zyltgb_unit"]
|
_dict["update_time"] = int(round(t.time() * 1000))
|
_list.append(_dict)
|
mongo_data.save("ths-industry-codes", _list)
|
|
|
# 保存自由流通市值
|
def saveZYLTSZ(datasList):
|
redis = __redisManager.getRedis()
|
_list = []
|
for data in datasList:
|
# 保存
|
_dict = {"_id": data["code"], "zyltgb": data["zyltgb"], "zyltgb_unit": data["zyltgb_unit"],
|
"update_time": int(round(t.time() * 1000))}
|
if float(data["zyltgb"]) > 0:
|
_list.append(_dict)
|
# 保存10天
|
ZYLTGBUtil.save(data["code"], data["zyltgb"], data["zyltgb_unit"])
|
mongo_data.save("ths-zylt", _list)
|
|
|
def saveClientActive(client_id, host, thsDead):
|
if client_id <= 0:
|
return
|
redis = __redisManager.getRedis();
|
redis.setex("client-active-{}".format(client_id), 10, json.dumps((host, thsDead)))
|
ths_util.set_ths_dead_state(client_id, thsDead)
|
|
|
def getValidL2Clients():
|
redis = __redisManager.getRedis();
|
keys = redis.keys("client-active-*")
|
client_ids = []
|
for k in keys:
|
_id = k.split("client-active-")[1]
|
# 客户端同花顺没卡死才能加入
|
if not ths_util.is_ths_dead(_id):
|
client_ids.append(_id)
|
l2_clients = authority.get_l2_clients()
|
|
return list(set(client_ids).intersection(set(l2_clients)))
|
|
|
# 获取客户端IP
|
def getActiveClientIP(client_id):
|
redis = __redisManager.getRedis();
|
val = redis.get("client-active-{}".format(client_id))
|
if val is None:
|
return None
|
val = json.loads(val)
|
return val[0]
|
|
|
# 获取客户端同花顺状态
|
def getTHSState(client_id):
|
redis = __redisManager.getRedis();
|
val = redis.get("client-active-{}".format(client_id))
|
if val is None:
|
return None
|
val = json.loads(val)
|
return val[1]
|
|
|
# 保存量能
|
def saveCodeVolumn(datas):
|
redis = __redisManager.getRedis()
|
for key in datas:
|
k = "volumn-max-{}".format(key)
|
redis.setex(k, tool.get_expire(), datas[key]["max_volumn"])
|
k = "volumn-latest-{}".format(key)
|
redis.setex(k, tool.get_expire(), datas[key]["latest_volumn"])
|
|
|
if __name__ == '__main__':
|
print(getActiveClientIP(3))
|