Administrator
2022-10-27 6e71fbcb119e7068ba35380edaa5cc66e7c71f1b
data_process.py
@@ -1,19 +1,8 @@
# 数据处理
import decimal
import json
import logging
import time as t
import authority
import mysql_data
import redis_manager
import gpcode_manager
# 统计今日卖出
# 统计今日买入
import ths_util
import tool
from code_data_util import ZYLTGBUtil
__redisManager = redis_manager.RedisManager(0)
@@ -58,112 +47,6 @@
    return data
def parseL2TradeQueueData(str):
    dict = json.loads(str)
    data = dict["data"]
    code = data["code"]
    trade_data = data["data"]
    return code, trade_data
# 代码对应的价格是否正确
def is_same_code_with_price(code, price):
    # 昨日收盘价
    price_close = gpcode_manager.get_price_pre(code)
    max_price = tool.to_price(decimal.Decimal(str(price_close)) * decimal.Decimal("1.1"))
    min_price = tool.to_price(decimal.Decimal(str(price_close)) * decimal.Decimal("0.9"))
    if min_price <= decimal.Decimal(str(price)) <= max_price:
        return True
    return False
# 保存L2交易队列
def saveL2TradeQueueData(code, data):
    redis = __redisManager.getRedis()
    data_str = json.dumps(data)
    key = "trade-queue-{}".format(code)
    # 保存5s的数据
    redis.setex(key, 5, data_str)
# 获取L2交易队列
def getL2TradeQueueData(code):
    redis = __redisManager.getRedis()
    key = "trade-queue-{}".format(code)
    data_str = redis.get(key)
    if data_str is None or len(data_str) <= 0:
        return None
    return json.loads(data_str)
# 保存自由流通市值
def saveZYLTSZ(datasList):
    mysqldb = mysql_data.Mysqldb()
    for data in datasList:
        # 保存
        _dict = {"_id": data["code"], "zyltgb": data["zyltgb"], "zyltgb_unit": data["zyltgb_unit"],
                 "update_time": int(round(t.time() * 1000))}
        if float(data["zyltgb"]) > 0:
            # 保存10天
            ZYLTGBUtil.save(data["code"], data["zyltgb"], data["zyltgb_unit"])
            result = mysqldb.select_one("select * from ths_zylt where _id='{}'".format(data["code"]))
            if result is None:
                mysqldb.execute("insert into ths_zylt(_id,zyltgb,zyltgb_unit,update_time) values ('{}','{}',{},{})".format(data["code"],data["zyltgb"],data["zyltgb_unit"],round(t.time()*1000)))
            else:
                mysqldb.execute("update ths_zylt set zyltgb='{}',zyltgb_unit={},update_time={} where _id='{}'".format(data["zyltgb"],data["zyltgb_unit"],round(t.time()*1000),data["code"]))
def saveClientActive(client_id, host, thsDead):
    if client_id <= 0:
        return
    redis = __redisManager.getRedis();
    redis.setex("client-active-{}".format(client_id), 10, json.dumps((host, thsDead)))
    ths_util.set_ths_dead_state(client_id, thsDead)
def getValidL2Clients():
    redis = __redisManager.getRedis();
    keys = redis.keys("client-active-*")
    client_ids = []
    for k in keys:
        _id = k.split("client-active-")[1]
        # 客户端同花顺没卡死才能加入
        if not ths_util.is_ths_dead(_id):
            client_ids.append(int(_id))
    l2_clients = authority.get_l2_clients()
    return list(set(client_ids).intersection(set(l2_clients)))
# 获取客户端IP
def getActiveClientIP(client_id):
    redis = __redisManager.getRedis();
    val = redis.get("client-active-{}".format(client_id))
    if val is None:
        return None
    val = json.loads(val)
    return val[0]
# 获取客户端同花顺状态
def getTHSState(client_id):
    redis = __redisManager.getRedis();
    val = redis.get("client-active-{}".format(client_id))
    if val is None:
        return None
    val = json.loads(val)
    return val[1]
# 保存量能
def saveCodeVolumn(datas):
    redis = __redisManager.getRedis()
    for key in datas:
        k = "volumn-max-{}".format(key)
        redis.setex(k, tool.get_expire(), datas[key]["max_volumn"])
        k = "volumn-latest-{}".format(key)
        redis.setex(k, tool.get_expire(), datas[key]["latest_volumn"])
if __name__ == '__main__':
    print(getActiveClientIP(3))
    pass