Administrator
2023-01-06 59fba698b03a51a8da5b56a919ebbf94d4784f74
data_process.py
@@ -1,42 +1,10 @@
# 数据处理
import decimal
import json
import time as t
import random
import datetime
import mysql
import logging
import redis_manager
import gpcode_manager
import mongo_data
# 统计今日卖出
# 统计今日买入
import tool
from log import logger_l2_error
__redisManager = redis_manager.RedisManager(0)
def _mysql_insert_data(day, code, item, conn):
    try:
        with conn.cursor() as cursor:
            sql = f"insert into level2_data(day,code,time,price,num,limit_price,operate_type,cancel_time,cancel_time_unit, md5,create_time) values ('{day}','{code}','{item['time']}','{item['price']}',{item['num']},{item['limitPrice']},{item['operateType']},{item['cancelTime']},{item['cancelTimeUnit']},'{item['md5']}',now())"
            print(sql)
            cursor.execute(sql)
            conn.commit()
    except Exception as e:
        conn.rollback()
def _mysql_update_data(item, conn):
    try:
        with conn.cursor() as cursor:
            sql = "update level2_data set re = {}, update_time=now() where md5='{}'".format(item['re'], item['md5'])
            print(sql)
            cursor.execute(sql)
            conn.commit()
    except Exception as e:
        conn.rollback()
def parse(str):
@@ -49,14 +17,22 @@
def parseType(str):
    dict = json.loads(str)
    return dict["type"]
    try:
        dict = json.loads(str)
        return dict["type"]
    except Exception as e:
        logging.exception(e)
        logger_l2_error.error(str)
        print(str)
        return -1
def parseGPCode(str):
    dict = json.loads(str)
    data = dict["data"]
    return data
    add = dict.get("add")
    return data, add
def parseList(str):
@@ -74,120 +50,5 @@
    return data
def parseL2TradeQueueData(str):
    dict = json.loads(str)
    data = dict["data"]
    code = data["code"]
    trade_data = data["data"]
    return code, trade_data
# 代码对应的价格是否正确
def is_same_code_with_price(code, price):
    # 昨日收盘价
    price_close = gpcode_manager.get_price_pre(code)
    max_price = tool.to_price(decimal.Decimal(str(price_close)) * decimal.Decimal("1.1"))
    min_price = tool.to_price(decimal.Decimal(str(price_close)) * decimal.Decimal("0.9"))
    if min_price <= decimal.Decimal(str(price)) <= max_price:
        return True
    return False
# 保存L2交易队列
def saveL2TradeQueueData(code, data):
    redis = __redisManager.getRedis()
    data_str = json.dumps(data)
    key = "trade-queue-{}".format(code)
    # 保存5s的数据
    redis.setex(key, 5, data_str)
# 获取L2交易队列
def getL2TradeQueueData(code):
    redis = __redisManager.getRedis()
    key = "trade-queue-{}".format(code)
    data_str = redis.get(key)
    if data_str is None or len(data_str) <= 0:
        return None
    return json.loads(data_str)
def _getIndustry(datas):
    ors = []
    codes = set()
    for data in datas:
        codes.add(data["code"])
    for code in codes:
        ors.append({'first_code': code})
    result = mongo_data.find("ths-industry", {'$or': ors})
    _fname = None
    for a in result:
        _fname = a["_id"]
        break
    print("最终的二级行业名称为:", _fname)
    return _fname
def saveIndustryCode(datasList):
    for datas in datasList:
        # 查询这批数据所属行业
        industry_name = _getIndustry(datas);
        _list = []
        for data in datas:
            # 保存
            _dict = {"_id": data["code"]}
            _dict["second_industry"] = industry_name
            _dict["zyltgb"] = data["zyltgb"]
            _dict["zyltgb_unit"] = data["zyltgb_unit"]
            _dict["update_time"] = int(round(t.time() * 1000))
            _list.append(_dict)
        mongo_data.save("ths-industry-codes", _list)
# 保存自由流通市值
def saveZYLTSZ(datasList):
    _list = []
    for data in datasList:
        # 保存
        _dict = {"_id": data["code"], "zyltgb": data["zyltgb"], "zyltgb_unit": data["zyltgb_unit"],
                 "update_time": int(round(t.time() * 1000))}
        if float(data["zyltgb"]) > 0:
            _list.append(_dict)
    mongo_data.save("ths-zylt", _list)
def saveClientActive(client_id, host):
    if client_id <= 0:
        return
    redis = __redisManager.getRedis();
    redis.setex("client-active-{}".format(client_id), 10, host)
def getValidClients():
    redis = __redisManager.getRedis();
    keys = redis.keys("client-active-*")
    client_ids = []
    for k in keys:
        _id = k.split("client-active-")[1]
        client_ids.append(_id)
    return client_ids
def getActiveClientIP(client_id):
    redis = __redisManager.getRedis();
    return redis.get("client-active-{}".format(client_id))
# 保存量能
def saveCodeVolumn(datas):
    redis = __redisManager.getRedis()
    for key in datas:
        k = "volumn-max-{}".format(key)
        redis.setex(k, tool.get_expire(), datas[key]["max_volumn"])
        k = "volumn-latest-{}".format(key)
        redis.setex(k, tool.get_expire(), datas[key]["latest_volumn"])
if __name__ == '__main__':
    print(getActiveClientIP(3))
    pass