| | |
| | | # 数据处理 |
| | | import decimal |
| | | import json |
| | | import logging |
| | | import time as t |
| | | |
| | | import authority |
| | | import redis_manager |
| | | import gpcode_manager |
| | | import mongo_data |
| | | |
| | | # 统计今日卖出 |
| | | # 统计今日买入 |
| | | import tool |
| | | from code_data_util import ZYLTGBUtil |
| | | from log import logger_l2_error |
| | | |
| | | __redisManager = redis_manager.RedisManager(0) |
| | | |
| | | |
| | | def _mysql_insert_data(day, code, item, conn): |
| | | try: |
| | | with conn.cursor() as cursor: |
| | | sql = f"insert into level2_data(day,code,time,price,num,limit_price,operate_type,cancel_time,cancel_time_unit, md5,create_time) values ('{day}','{code}','{item['time']}','{item['price']}',{item['num']},{item['limitPrice']},{item['operateType']},{item['cancelTime']},{item['cancelTimeUnit']},'{item['md5']}',now())" |
| | | print(sql) |
| | | cursor.execute(sql) |
| | | conn.commit() |
| | | except Exception as e: |
| | | conn.rollback() |
| | | |
| | | |
| | | def _mysql_update_data(item, conn): |
| | | try: |
| | | with conn.cursor() as cursor: |
| | | sql = "update level2_data set re = {}, update_time=now() where md5='{}'".format(item['re'], item['md5']) |
| | | print(sql) |
| | | cursor.execute(sql) |
| | | conn.commit() |
| | | except Exception as e: |
| | | conn.rollback() |
| | | |
| | | |
| | | def parse(str): |
| | |
| | | return dict["type"] |
| | | except Exception as e: |
| | | logging.exception(e) |
| | | logger_l2_error.error(str) |
| | | |
| | | print(str) |
| | | return -1 |
| | | |
| | | |
| | | def parseGPCode(str): |
| | | dict = json.loads(str) |
| | | data = dict["data"] |
| | | return data |
| | | add = dict.get("add") |
| | | return data, add |
| | | |
| | | |
| | | def parseList(str): |
| | |
| | | return data |
| | | |
| | | |
| | | def parseL2TradeQueueData(str): |
| | | dict = json.loads(str) |
| | | data = dict["data"] |
| | | code = data["code"] |
| | | trade_data = data["data"] |
| | | return code, trade_data |
| | | |
| | | |
| | | # 代码对应的价格是否正确 |
| | | def is_same_code_with_price(code, price): |
| | | # 昨日收盘价 |
| | | price_close = gpcode_manager.get_price_pre(code) |
| | | max_price = tool.to_price(decimal.Decimal(str(price_close)) * decimal.Decimal("1.1")) |
| | | min_price = tool.to_price(decimal.Decimal(str(price_close)) * decimal.Decimal("0.9")) |
| | | if min_price <= decimal.Decimal(str(price)) <= max_price: |
| | | return True |
| | | return False |
| | | |
| | | |
| | | # 保存L2交易队列 |
| | | def saveL2TradeQueueData(code, data): |
| | | redis = __redisManager.getRedis() |
| | | data_str = json.dumps(data) |
| | | key = "trade-queue-{}".format(code) |
| | | # 保存5s的数据 |
| | | redis.setex(key, 5, data_str) |
| | | |
| | | |
| | | # 获取L2交易队列 |
| | | def getL2TradeQueueData(code): |
| | | redis = __redisManager.getRedis() |
| | | key = "trade-queue-{}".format(code) |
| | | data_str = redis.get(key) |
| | | if data_str is None or len(data_str) <= 0: |
| | | return None |
| | | return json.loads(data_str) |
| | | |
| | | |
| | | def _getIndustry(datas): |
| | | ors = [] |
| | | codes = set() |
| | | for data in datas: |
| | | codes.add(data["code"]) |
| | | |
| | | for code in codes: |
| | | ors.append({'first_code': code}) |
| | | result = mongo_data.find("ths-industry", {'$or': ors}) |
| | | |
| | | _fname = None |
| | | for a in result: |
| | | _fname = a["_id"] |
| | | break |
| | | print("最终的二级行业名称为:", _fname) |
| | | return _fname |
| | | |
| | | |
| | | def saveIndustryCode(datasList): |
| | | for datas in datasList: |
| | | # 查询这批数据所属行业 |
| | | industry_name = _getIndustry(datas); |
| | | _list = [] |
| | | for data in datas: |
| | | # 保存 |
| | | _dict = {"_id": data["code"]} |
| | | _dict["second_industry"] = industry_name |
| | | _dict["zyltgb"] = data["zyltgb"] |
| | | _dict["zyltgb_unit"] = data["zyltgb_unit"] |
| | | _dict["update_time"] = int(round(t.time() * 1000)) |
| | | _list.append(_dict) |
| | | mongo_data.save("ths-industry-codes", _list) |
| | | |
| | | |
| | | # 保存自由流通市值 |
| | | def saveZYLTSZ(datasList): |
| | | redis = __redisManager.getRedis() |
| | | _list = [] |
| | | for data in datasList: |
| | | # 保存 |
| | | _dict = {"_id": data["code"], "zyltgb": data["zyltgb"], "zyltgb_unit": data["zyltgb_unit"], |
| | | "update_time": int(round(t.time() * 1000))} |
| | | if float(data["zyltgb"]) > 0: |
| | | _list.append(_dict) |
| | | # 保存10天 |
| | | ZYLTGBUtil.save(data["code"],data["zyltgb"],data["zyltgb_unit"]) |
| | | mongo_data.save("ths-zylt", _list) |
| | | |
| | | |
| | | def saveClientActive(client_id, host, thsDead): |
| | | if client_id <= 0: |
| | | return |
| | | redis = __redisManager.getRedis(); |
| | | redis.setex("client-active-{}".format(client_id), 10, json.dumps((host, thsDead))) |
| | | |
| | | |
| | | def getValidL2Clients(): |
| | | redis = __redisManager.getRedis(); |
| | | keys = redis.keys("client-active-*") |
| | | client_ids = [] |
| | | for k in keys: |
| | | _id = k.split("client-active-")[1] |
| | | client_ids.append(_id) |
| | | l2_clients = authority.get_l2_clients() |
| | | |
| | | return list(set(client_ids).intersection(set(l2_clients))) |
| | | |
| | | |
| | | # 获取客户端IP |
| | | def getActiveClientIP(client_id): |
| | | redis = __redisManager.getRedis(); |
| | | val = redis.get("client-active-{}".format(client_id)) |
| | | if val is None: |
| | | return None |
| | | val = json.loads(val) |
| | | return val[0] |
| | | |
| | | |
| | | # 获取客户端同花顺状态 |
| | | def getTHSState(client_id): |
| | | redis = __redisManager.getRedis(); |
| | | val = redis.get("client-active-{}".format(client_id)) |
| | | if val is None: |
| | | return None |
| | | val = json.loads(val) |
| | | return val[1] |
| | | |
| | | |
| | | # 保存量能 |
| | | def saveCodeVolumn(datas): |
| | | redis = __redisManager.getRedis() |
| | | for key in datas: |
| | | k = "volumn-max-{}".format(key) |
| | | redis.setex(k, tool.get_expire(), datas[key]["max_volumn"]) |
| | | k = "volumn-latest-{}".format(key) |
| | | redis.setex(k, tool.get_expire(), datas[key]["latest_volumn"]) |
| | | |
| | | |
| | | if __name__ == '__main__': |
| | | print(getActiveClientIP(3)) |
| | | pass |