"""
|
接受客户端数据的服务器
|
"""
|
import decimal
|
import json
|
import logging
|
import random
|
import socketserver
|
import socket
|
import threading
|
import time
|
|
import alert_util
|
import client_manager
|
import code_nature_analyse
|
import code_volumn_manager
|
import constant
|
import data_process
|
import global_data_loader
|
import global_util
|
import gpcode_first_screen_manager
|
import gpcode_manager
|
import authority
|
import juejin
|
import limit_up_time_manager
|
from l2 import l2_data_manager_new, l2_data_manager, l2_data_log, l2_log, code_price_manager
|
import l2_data_util
|
from l2.cancel_buy_strategy import HourCancelBigNumComputer, L2LimitUpMoneyStatisticUtil
|
import l2.l2_data_util
|
|
import ths_industry_util
|
import ths_util
|
import tool
|
from output import code_info_output
|
from third_data import hot_block_data_process, block_info, kpl_api, kpl_util
|
from third_data.code_plate_key_manager import TargetCodePlateKeyManager
|
from third_data.kpl_data_manager import KPLCodeLimitUpReasonManager, KPLLimitUpDataRecordManager
|
from ths import l2_listen_pos_health_manager
|
from trade import trade_gui, trade_data_manager, trade_manager, l2_trade_util, deal_big_money_manager, \
|
first_code_score_manager, current_price_process_manager
|
import l2_code_operate
|
from code_data_util import ZYLTGBUtil
|
import l2.transaction_progress
|
|
from log import logger_l2_error, logger_device, logger_trade_delegate, logger_buy_1_volumn_record, \
|
logger_l2_trade_queue, logger_l2_latest_data, logger_l2_trade_buy_queue, logger_first_code_record, logger_debug
|
from trade.trade_queue_manager import THSBuy1VolumnManager, thsl2tradequeuemanager
|
|
|
class MyTCPServer(socketserver.TCPServer):
|
def __init__(self, server_address, RequestHandlerClass, bind_and_activate=True, pipe_juejin=None, pipe_ui=None):
|
self.pipe_juejin = pipe_juejin # 增加的参数
|
self.pipe_ui = pipe_ui
|
socketserver.TCPServer.__init__(self, server_address, RequestHandlerClass, bind_and_activate=bind_and_activate)
|
|
|
# 如果使用异步的形式则需要再重写ThreadingTCPServer
|
class MyThreadingTCPServer(socketserver.ThreadingMixIn, MyTCPServer): pass
|
|
|
# 首板tick级数据
|
|
|
class MyBaseRequestHandle(socketserver.BaseRequestHandler):
|
l2_data_error_dict = {}
|
last_trade_delegate_data = None
|
buy1_volumn_manager = THSBuy1VolumnManager()
|
ths_l2_trade_queue_manager = thsl2tradequeuemanager()
|
|
latest_buy1_volumn_dict = {}
|
l2_trade_queue_time_dict = {}
|
l2_save_time_dict = {}
|
l2_trade_buy_queue_dict = {}
|
tradeBuyQueue = l2.transaction_progress.TradeBuyQueue()
|
last_time = {}
|
first_tick_datas = []
|
latest_oringin_data = {}
|
last_l2_listen_health_time = {}
|
__KPLCodeLimitUpReasonManager = KPLCodeLimitUpReasonManager()
|
__TargetCodePlateKeyManager = TargetCodePlateKeyManager()
|
|
def setup(self):
|
super().setup() # 可以不调用父类的setup()方法,父类的setup方法什么都没做
|
# print("----setup方法被执行-----")
|
# print("打印传入的参数:", self.server.pipe)
|
self.l2CodeOperate = l2_code_operate.L2CodeOperate.get_instance()
|
|
def handle(self):
|
host = self.client_address[0]
|
super().handle() # 可以不调用父类的handler(),方法,父类的handler方法什么都没做
|
# print("-------handler方法被执行----")
|
# print(self.server)
|
# print(self.request) # 服务
|
# print("客户端地址:", self.client_address) # 客户端地址
|
# print(self.__dict__)
|
# print("- " * 30)
|
# print(self.server.__dict__)
|
# print("- " * 30)
|
sk: socket.socket = self.request
|
while True:
|
data = sk.recv(1024 * 100)
|
if len(data) == 0:
|
# print("客户端断开连接")
|
break
|
_str = str(data, encoding="gbk")
|
if len(_str) > 0:
|
# print("结果:",_str)
|
type = -1
|
try:
|
type = data_process.parseType(_str)
|
except Exception as e:
|
if str(e).find("Unterminated string starting") > -1:
|
_str = _str.replace("\n", "")
|
type = data_process.parseType(_str)
|
else:
|
print(_str)
|
return_str = "OK"
|
if type == 0:
|
try:
|
|
origin_start_time = round(time.time() * 1000)
|
__start_time = round(time.time() * 1000)
|
|
# level2盘口数据
|
day, client, channel, code, capture_time, process_time, origin_datas, origin_datas_count = l2.l2_data_util.parseL2Data(
|
_str)
|
last_health_time = self.last_l2_listen_health_time.get((client, channel))
|
# --------------------------------设置L2健康状态--------------------------------
|
if last_health_time is None or __start_time - last_health_time > 1000:
|
self.last_l2_listen_health_time[(client, channel)] = __start_time
|
# 更新监听位健康状态
|
if origin_datas_count == 0:
|
l2_listen_pos_health_manager.set_unhealthy(client, channel)
|
else:
|
l2_listen_pos_health_manager.set_healthy(client, channel)
|
|
l2_log.threadIds[code] = random.randint(0, 100000)
|
if True:
|
# 间隔1s保存一条l2的最后一条数据
|
if code not in self.l2_save_time_dict or origin_start_time - self.l2_save_time_dict[
|
code] >= 1000 and len(origin_datas) > 0:
|
self.l2_save_time_dict[code] = origin_start_time
|
logger_l2_latest_data.info("{}#{}#{}", code, capture_time, origin_datas[-1])
|
|
# 10ms的网络传输延时
|
capture_timestamp = __start_time - process_time - 10
|
# print("截图时间:", process_time)
|
__start_time = l2_data_log.l2_time(code, round(time.time() * 1000) - __start_time,
|
"截图时间:{} 数据解析时间".format(process_time))
|
|
cid, pid = gpcode_manager.get_listen_code_pos(code)
|
|
__start_time = l2_data_log.l2_time(code, round(time.time() * 1000) - __start_time,
|
"l2获取代码位置耗时")
|
# 判断目标代码位置是否与上传数据位置一致
|
if cid is not None and pid is not None and client == int(cid) and channel == int(pid):
|
# l2.l2_data_util.set_l2_data_latest_count(code, len(origin_datas))
|
l2_data_util.save_l2_latest_data_number(code, origin_datas_count)
|
# 保存l2数据条数
|
if not origin_datas:
|
# or not l2.l2_data_util.is_origin_data_diffrent(origin_datas,self.latest_oringin_data.get(code)):
|
raise Exception("无新增数据")
|
# 保存最近的数据
|
self.latest_oringin_data[code] = origin_datas
|
limit_up_price = gpcode_manager.get_limit_up_price(code)
|
datas = l2.l2_data_util.L2DataUtil.format_l2_data(origin_datas, code, limit_up_price)
|
try:
|
# 校验客户端代码
|
l2_code_operate.verify_with_l2_data_pos_info(code, client, channel)
|
__start_time = round(time.time() * 1000)
|
if gpcode_manager.is_listen(code):
|
__start_time = l2_data_log.l2_time(code,
|
round(time.time() * 1000) - __start_time,
|
"l2外部数据预处理耗时")
|
l2_data_manager_new.L2TradeDataProcessor.process(code, datas, capture_timestamp)
|
__start_time = l2_data_log.l2_time(code,
|
round(time.time() * 1000) - __start_time,
|
"l2数据有效处理外部耗时",
|
False)
|
# 保存原始数据数量
|
# l2_data_util.save_l2_latest_data_number(code, len(origin_datas))
|
# if round(time.time() * 1000) - __start_time > 20:
|
# l2_data_log.l2_time(code, round(time.time() * 1000) - __start_time,
|
# "异步保存原始数据条数耗时",
|
# False)
|
|
except l2_data_manager.L2DataException as l:
|
# 单价不符
|
if l.get_code() == l2_data_manager.L2DataException.CODE_PRICE_ERROR:
|
key = "{}-{}-{}".format(client, channel, code)
|
if key not in self.l2_data_error_dict or round(
|
time.time() * 1000) - self.l2_data_error_dict[key] > 10000:
|
# self.l2CodeOperate.repaire_l2_data(code)
|
# todo 太敏感移除代码
|
logger_l2_error.warning("code-{} l2单价错误:{}", code, l.msg)
|
# 单价不一致时需要移除代码重新添加
|
l2_code_operate.L2CodeOperate().remove_l2_listen(code, "l2监听单价错误")
|
self.l2_data_error_dict[key] = round(time.time() * 1000)
|
|
except Exception as e:
|
print("异常", str(e), code)
|
logging.exception(e)
|
logger_l2_error.error("出错:{}".format(str(e)))
|
logger_l2_error.error("内容:{}".format(_str))
|
finally:
|
|
__end_time = round(time.time() * 1000)
|
# 只记录大于40ms的数据
|
if __end_time - origin_start_time > 100:
|
l2_data_log.l2_time(code, round(time.time() * 1000) - origin_start_time,
|
"l2数据处理总耗时",
|
True)
|
except Exception as e:
|
if str(e).find("新增数据"):
|
pass
|
else:
|
logger_l2_error.exception(e)
|
|
elif type == 1:
|
# 设置股票代码
|
data_list, is_add = data_process.parseGPCode(_str)
|
ZYLTGBUtil.save_list(data_list)
|
code_list = []
|
for data in data_list:
|
code_list.append(data["code"])
|
# 获取基本信息
|
code_datas = juejin.JueJinManager.get_gp_latest_info(code_list)
|
if is_add:
|
gpcode_manager.add_gp_list(code_datas)
|
else:
|
gpcode_manager.set_gp_list(code_datas)
|
|
if not is_add:
|
# 同步同花顺目标代码
|
t1 = threading.Thread(target=lambda: sync_target_codes_to_ths())
|
t1.setDaemon(True)
|
t1.start()
|
elif type == 2:
|
# 涨停代码
|
dataList, is_add = data_process.parseGPCode(_str)
|
# 设置涨停时间
|
gpcode_manager.set_limit_up_list(dataList)
|
# 保存到内存中
|
if dataList:
|
global_data_loader.add_limit_up_codes(dataList)
|
ths_industry_util.set_industry_hot_num(dataList)
|
# 保存涨停时间
|
gp_list = gpcode_manager.get_gp_list()
|
gp_code_set = set(gp_list)
|
now_str = tool.get_now_time_str()
|
if dataList:
|
for d in dataList:
|
if d["time"] == "00:00:00" or tool.get_time_as_second(now_str) < tool.get_time_as_second(
|
d["time"]):
|
continue
|
if d["code"] not in gp_code_set:
|
continue
|
|
# 获取是否有涨停时间
|
# if limit_up_time_manager.get_limit_up_time(d["code"]) is None:
|
# limit_up_time_manager.save_limit_up_time(d["code"], d["time"])
|
elif type == 22:
|
try:
|
if int(tool.get_now_time_str().replace(":", "")) < int("092500"):
|
raise Exception('未到接受时间')
|
# 首板代码
|
dataList, is_add = data_process.parseGPCode(_str)
|
limit_up_price_dict = {}
|
temp_codes = []
|
codes = []
|
tick_datas = []
|
if dataList:
|
for data in dataList:
|
code = data["code"]
|
codes.append(code)
|
|
# 保存未筛选的首板代码
|
new_add_codes = gpcode_first_screen_manager.set_target_no_screen_codes(codes)
|
# 保存自由流通股本
|
if dataList:
|
zyltgb_list = []
|
for data in dataList:
|
code = data["code"]
|
if code in global_util.zyltgb_map:
|
continue
|
zyltgb_list.append(
|
{"code": code, "zyltgb": data["zyltgb"], "zyltgb_unit": data["zyltgbUnit"]})
|
if zyltgb_list:
|
ZYLTGBUtil.save_list(zyltgb_list)
|
global_data_loader.load_zyltgb()
|
|
bad_codes = set()
|
|
# 获取昨日收盘价
|
for code in codes:
|
# 如果涨停价是空值就需要设置昨日收盘价格
|
if gpcode_manager.get_limit_up_price(code) is None:
|
juejin.re_set_price_pres([code], True)
|
|
# 板块关键字准备
|
for code in codes:
|
if self.__TargetCodePlateKeyManager.get_history_limit_up_reason(code) is None:
|
self.__TargetCodePlateKeyManager.set_history_limit_up_reason(code,
|
KPLLimitUpDataRecordManager.get_latest_blocks_set(
|
code))
|
if self.__TargetCodePlateKeyManager.get_blocks(code) is None:
|
try:
|
results = kpl_api.getStockIDPlate(code)
|
bs = [r[1] for r in results]
|
self.__TargetCodePlateKeyManager.set_blocks(code, bs)
|
except Exception as e:
|
logging.exception(e)
|
pass
|
|
# 获取60天最大记录
|
for code in codes:
|
need_get_volumn = False
|
if code not in global_util.max60_volumn or global_util.max60_volumn.get(code) is None:
|
need_get_volumn = True
|
if not need_get_volumn and code_nature_analyse.CodeNatureRecordManager.get_nature(
|
code) is None:
|
need_get_volumn = True
|
if need_get_volumn:
|
volumes_data = juejin.get_volumns_by_code(code, 150)
|
volumes = juejin.parse_max_volume(volumes_data[:90], code_nature_analyse.is_new_top(
|
gpcode_manager.get_limit_up_price(code), volumes_data[:90]))
|
logger_first_code_record.info("{} 获取到首板60天最大量:{}", code, volumes)
|
code_volumn_manager.set_histry_volumn(code, volumes[0], volumes[1], volumes[2])
|
# 判断K线形态
|
is_has_k_format, msg = code_nature_analyse.is_has_k_format(
|
gpcode_manager.get_limit_up_price(code), volumes_data)
|
if not is_has_k_format:
|
logger_first_code_record.info("{}首板K线形态不好,{}", code, msg)
|
# 股性不好,就不要加入
|
bad_codes.add(code)
|
# 加入禁止交易代码
|
l2_trade_util.forbidden_trade(code)
|
code_nature_analyse.set_record_datas(code,
|
gpcode_manager.get_limit_up_price(code),
|
volumes_data)
|
gpcode_manager.FirstCodeManager.add_record(codes)
|
if new_add_codes:
|
gpcode_manager.set_first_gp_codes_with_data(juejin.JueJinManager.get_gp_latest_info(codes))
|
# 加入首板历史记录
|
|
logger_first_code_record.info("新增首板:{}", new_add_codes)
|
|
# 移除代码
|
listen_codes = gpcode_manager.get_listen_codes()
|
for lc in listen_codes:
|
if not gpcode_manager.is_in_gp_pool(lc):
|
# 移除代码
|
l2_code_operate.L2CodeOperate.get_instance().add_operate(0, lc, "代码被移除")
|
|
if new_add_codes:
|
# 低分值代码禁止交易
|
for code in new_add_codes:
|
try:
|
score, score_list = first_code_score_manager.get_score(code, 0, None, False)
|
if score < 0:
|
trade_manager.ForbiddenBuyCodeByScoreManager.add_code(code)
|
# elif score >= 200:
|
# # 如果没有涨停过
|
# limit_up_time = limit_up_time_manager.get_limit_up_time(code)
|
# if limit_up_time is None and int(tool.get_now_time_str().replace(":","")) > int("113000"):
|
# gpcode_manager.WantBuyCodesManager.add_code(code)
|
except Exception as e:
|
logging.exception(e)
|
|
# if dataList and int(tool.get_now_time_str().replace(":","")) > int("113000"):
|
# for data in dataList:
|
# code = data["code"]
|
# if gpcode_manager.WantBuyCodesManager.is_in(code):
|
# score, score_list = first_code_score_manager.get_score(code, 0, None, False)
|
# if score < 200:
|
# gpcode_manager.WantBuyCodesManager.remove_code(code)
|
|
# 保存现价
|
if dataList:
|
for data in dataList:
|
code = data["code"]
|
codes.append(code)
|
limit_up_price = gpcode_manager.get_limit_up_price(code)
|
if limit_up_price is not None:
|
limit_up_price_dict[code] = limit_up_price
|
else:
|
temp_codes.append(code)
|
tick_datas.append({"code": code, "price": data["price"], "volume": data["volume"],
|
"volumeUnit": data["volumeUnit"]})
|
# 获取涨停价
|
if temp_codes:
|
# 获取涨停价
|
juejin.re_set_price_pres(temp_codes)
|
# 重新获取涨停价
|
for code in temp_codes:
|
limit_up_price = gpcode_manager.get_limit_up_price(code)
|
if limit_up_price is not None:
|
limit_up_price_dict[code] = limit_up_price
|
# 保存现价
|
self.first_tick_datas.clear()
|
self.first_tick_datas.extend(tick_datas)
|
|
# 首板数据加工
|
prices = []
|
for data in dataList:
|
code = data["code"]
|
price = data["price"]
|
limit_up_time = data["time"]
|
if limit_up_time == "00:00:00":
|
limit_up_time = None
|
if code not in limit_up_price_dict:
|
continue
|
is_limit_up = abs(float(limit_up_price_dict[code]) - float(price)) < 0.01
|
# 纠正数据
|
if is_limit_up and limit_up_time is None:
|
limit_up_time = tool.get_now_time_str()
|
if is_limit_up:
|
# 加入首板涨停
|
gpcode_manager.FirstCodeManager.add_limited_up_record([code])
|
pricePre = gpcode_manager.get_price_pre(code)
|
if pricePre is None:
|
juejin.re_set_price_pres([code])
|
|
rate = round((float(price) - pricePre) * 100 / pricePre, 1)
|
prices.append(
|
{"code": code, "time": limit_up_time, "rate": rate,
|
"limit_up": is_limit_up})
|
if code in new_add_codes:
|
if is_limit_up:
|
place_order_count = trade_data_manager.placeordercountmanager.get_place_order_count(
|
code)
|
if place_order_count == 0:
|
trade_data_manager.placeordercountmanager.place_order(code)
|
|
gpcode_first_screen_manager.process_ticks(prices)
|
except Exception as e:
|
logging.exception(e)
|
|
elif type == 3:
|
# 交易成功信息
|
dataList = data_process.parseList(_str)
|
try:
|
trade_manager.process_trade_success_data(dataList)
|
except Exception as e:
|
logging.exception(e)
|
trade_manager.save_trade_success_data(dataList)
|
|
elif type == 5:
|
logger_trade_delegate.debug("接收到委托信息")
|
__start_time = round(time.time() * 1000)
|
try:
|
# 交易委托信息
|
dataList = data_process.parseList(_str)
|
if self.last_trade_delegate_data != _str:
|
self.last_trade_delegate_data = _str
|
# 保存委托信息
|
logger_trade_delegate.info(dataList)
|
try:
|
# 设置申报时间
|
for item in dataList:
|
apply_time = item["apply_time"]
|
if apply_time and len(apply_time) >= 8:
|
code = item["code"]
|
trade_state = trade_manager.get_trade_state(code)
|
# 设置下单状态的代码为已委托
|
if trade_state == trade_manager.TRADE_STATE_BUY_PLACE_ORDER:
|
origin_apply_time = apply_time
|
apply_time = apply_time[0:6]
|
apply_time = "{}:{}:{}".format(apply_time[0:2], apply_time[2:4],
|
apply_time[4:6])
|
ms = origin_apply_time[6:9]
|
if int(ms) > 500:
|
# 时间+1s
|
apply_time = tool.trade_time_add_second(apply_time, 1)
|
|
print(apply_time)
|
except Exception as e:
|
logging.exception(e)
|
|
try:
|
trade_manager.process_trade_delegate_data(dataList)
|
except Exception as e:
|
logging.exception(e)
|
trade_manager.save_trade_delegate_data(dataList)
|
# 刷新交易界面
|
trade_gui.THSGuiTrade().refresh_data()
|
finally:
|
pass
|
|
elif type == 4:
|
# 行业代码信息
|
dataList = data_process.parseList(_str)
|
codes = []
|
for datas in dataList:
|
for d in datas:
|
name = ths_industry_util.get_name_by_code(d['code'])
|
if not name or name == 'None':
|
codes.append(d["code"])
|
# 根据代码获取代码名称
|
codes_name = {}
|
if codes:
|
codes_name = juejin.JueJinManager.get_gp_codes_names(codes)
|
ths_industry_util.save_industry_code(dataList, codes_name)
|
elif type == 6:
|
# 可用金额
|
datas = data_process.parseData(_str)
|
client = datas["client"]
|
money = datas["money"]
|
# TODO存入缓存文件
|
trade_manager.set_available_money(client, money)
|
# l2交易队列
|
elif type == 10:
|
# 可用金额
|
__start_time = time.time()
|
datas = data_process.parseData(_str)
|
channel = datas["channel"]
|
code = datas["code"]
|
msg = ""
|
try:
|
|
if not gpcode_manager.is_in_gp_pool(code) and not gpcode_manager.is_in_first_gp_codes(code):
|
# 没在目标代码中且没有在首板今日历史代码中
|
raise Exception("代码没在监听中")
|
|
data = datas["data"]
|
buy_time = data["buyTime"]
|
buy_one_price = data["buyOnePrice"]
|
buy_one_volumn = data["buyOneVolumn"]
|
sell_one_price = data["sellOnePrice"]
|
sell_one_volumn = data["sellOneVolumn"]
|
|
buy_queue = data["buyQueue"]
|
if buy_one_price is None:
|
print('买1价没有,', code)
|
limit_up_price = gpcode_manager.get_limit_up_price(code)
|
|
if limit_up_price is not None:
|
code_price_manager.Buy1PriceManager.process(code, buy_one_price, buy_time, limit_up_price,
|
sell_one_price, sell_one_volumn)
|
_start_time = time.time()
|
msg += "买1价格处理:" + f"{_start_time - __start_time} "
|
|
buy_queue_result_list = self.tradeBuyQueue.save(code, limit_up_price, buy_one_price,
|
buy_time,
|
buy_queue)
|
msg += "买队列保存:" + f"{time.time() - _start_time} "
|
_start_time = time.time()
|
|
if buy_queue_result_list:
|
|
# 有数据
|
try:
|
buy_one_price_ = decimal.Decimal(round(float(buy_one_price), 2)).quantize(
|
decimal.Decimal("0.00"))
|
# 获取执行位时间
|
|
buy_single_index, buy_exec_index, compute_index, num, count, max_num_set, volume_rate = l2_data_manager.TradePointManager.get_buy_compute_start_data(
|
code)
|
if True:
|
# 只有下单过后才获取交易进度
|
exec_time = None
|
try:
|
if buy_exec_index:
|
exec_time = \
|
l2.l2_data_util.local_today_datas.get(code)[buy_exec_index]["val"][
|
"time"]
|
except:
|
pass
|
buy_progress_index = self.tradeBuyQueue.compute_traded_index(code,
|
buy_one_price_,
|
buy_queue_result_list,
|
exec_time)
|
if buy_progress_index is not None:
|
HourCancelBigNumComputer.set_trade_progress(code, buy_time, buy_exec_index,
|
buy_progress_index,
|
l2.l2_data_util.local_today_datas.get(
|
code),
|
l2.l2_data_util.local_today_num_operate_map.get(
|
code))
|
logger_l2_trade_buy_queue.info("获取成交位置成功: code-{} index-{} 数据-{}", code,
|
buy_progress_index,
|
json.dumps(buy_queue_result_list))
|
# 计算大单成交额
|
deal_big_money_manager.set_trade_progress(code, buy_progress_index,
|
l2.l2_data_util.local_today_datas.get(
|
code),
|
l2.l2_data_util.local_today_num_operate_map.get(
|
code))
|
|
else:
|
raise Exception("暂未获取到交易进度")
|
msg += "计算成交进度:" + f"{time.time() - _start_time} "
|
_start_time = time.time()
|
except Exception as e:
|
logging.exception(e)
|
print("买入队列", code, buy_queue_result_list)
|
logger_l2_trade_buy_queue.warning("获取成交位置失败: code-{} 原因-{} 数据-{}", code, str(e),
|
json.dumps(buy_queue_result_list))
|
|
# buy_queue是否有变化
|
if self.l2_trade_buy_queue_dict.get(
|
code) is None or buy_queue != self.l2_trade_buy_queue_dict.get(
|
code):
|
self.l2_trade_buy_queue_dict[code] = buy_queue
|
logger_l2_trade_buy_queue.info("{}-{}", code, buy_queue)
|
msg += "保存记录日志:" + f"{time.time() - _start_time} "
|
_start_time = time.time()
|
# 保存最近的记录
|
if self.ths_l2_trade_queue_manager.save_recod(code, data):
|
if buy_time != "00:00:00":
|
logger_l2_trade_queue.info("{}-{}", code, data)
|
need_sync, need_cancel, cancel_msg = self.buy1_volumn_manager.save(code, buy_time,
|
int(buy_one_volumn),
|
buy_one_price)
|
# if need_sync:
|
# # 同步数据
|
# s = time.time()
|
# L2LimitUpMoneyStatisticUtil.verify_num(code, int(buy_one_volumn), buy_time)
|
# msg += "量校验:"+f"{time.time()-s} "
|
# print(buy_time, buy_one_price, buy_one_volumn)
|
|
# print("L2买卖队列",datas)
|
msg += "买1处理:" + f"{time.time() - _start_time} "
|
_start_time = time.time()
|
except:
|
pass
|
finally:
|
space = time.time() - __start_time
|
if space > 0.1:
|
logger_debug.info("{}成交队列处理时间:{},{}", code, space, msg)
|
|
elif type == 20:
|
# 登录
|
data = data_process.parse(_str)["data"]
|
try:
|
client_id, _authoritys = authority.login(data["account"], data["pwd"])
|
return_str = data_process.toJson(
|
{"code": 0, "data": {"client": int(client_id), "authoritys": json.loads(_authoritys)}})
|
except Exception as e:
|
return_str = data_process.toJson({"code": 1, "msg": str(e)})
|
# 现价更新
|
elif type == 40:
|
datas = data_process.parse(_str)["data"]
|
if datas is None:
|
datas = []
|
print("二板现价")
|
# 获取暂存的二版现价数据
|
if self.first_tick_datas:
|
datas.extend(self.first_tick_datas)
|
if datas is not None:
|
print("二板现价数量", len(datas))
|
for item in datas:
|
volumn = item["volume"]
|
volumnUnit = item["volumeUnit"]
|
code_volumn_manager.save_today_volumn(item["code"], volumn, volumnUnit)
|
current_price_process_manager.accept_prices(datas)
|
elif type == 50:
|
data = data_process.parse(_str)["data"]
|
if data is not None:
|
index = data["index"]
|
code_name = data["codeName"].replace(" ", "")
|
volumn = data["volumn"]
|
price = data["price"]
|
time_ = data["time"]
|
code = global_util.name_codes.get(code_name)
|
if code is None:
|
global_data_loader.load_name_codes()
|
code = global_util.name_codes.get(code_name)
|
if code is not None:
|
# 记录日志
|
if self.latest_buy1_volumn_dict.get(code) != "{}-{}".format(volumn, price):
|
# 记录数据
|
logger_buy_1_volumn_record.info("{}-{}", code, data)
|
self.latest_buy1_volumn_dict[code] = "{}-{}".format(volumn, price)
|
# 校正时间
|
time_ = tool.compute_buy1_real_time(time_)
|
# 保存数据
|
need_sync, need_cancel, cancel_msg = self.buy1_volumn_manager.save(code, time_, volumn,
|
price)
|
# if need_cancel:
|
# l2_data_manager_new.L2TradeDataProcessor.cancel_buy(code, cancel_msg, "trade_queue")
|
if need_sync:
|
# 同步数据
|
L2LimitUpMoneyStatisticUtil.verify_num(code, volumn, time_)
|
elif type == 30:
|
# 心跳信息
|
data = data_process.parse(_str)["data"]
|
client_id = data["client"]
|
thsDead = data.get("thsDead")
|
logger_device.info("({})客户端信息:{}".format(client_id, json.dumps(data)))
|
client_manager.saveClientActive(int(client_id), host, thsDead)
|
if ths_util.is_ths_dead(client_id):
|
# TODO 重启同花顺
|
# 报警
|
l2_clients = authority.get_l2_clients()
|
if client_id in l2_clients:
|
alert_util.alarm()
|
elif type == 60:
|
# L2自启动成功
|
data = data_process.parse(_str)["data"]
|
client_id = data["client"]
|
print("L2自启动成功", client_id)
|
now_str = tool.get_now_time_str()
|
ts = tool.get_time_as_second(now_str)
|
# 9点25到9点28之间的自启动就需要批量设置代码,目前永远不执行
|
if tool.get_time_as_second("09:24:50") <= ts <= tool.get_time_as_second("09:28:00") and False:
|
# 准备批量设置代码
|
return_json = {"code": 1, "msg": "等待批量设置代码"}
|
return_str = json.dumps(return_json)
|
# 获取排名前16位的代码
|
codes = trade_data_manager.CodeActualPriceProcessor().get_top_rate_codes(16)
|
codes = sorted(codes)
|
if client_id == 2:
|
codes = codes[:constant.L2_CODE_COUNT_PER_DEVICE]
|
else:
|
codes = codes[constant.L2_CODE_COUNT_PER_DEVICE:]
|
codes_datas = []
|
for i in range(0, constant.L2_CODE_COUNT_PER_DEVICE):
|
if i >= len(codes):
|
break
|
codes_datas.append((i, codes[i]))
|
# 如果设置失败需要重试2次
|
for i in range(0, 3):
|
set_success = l2_code_operate.betch_set_client_codes(client_id, codes_datas)
|
if set_success:
|
break
|
else:
|
time.sleep(3)
|
else:
|
return_json = {"code": 0, "msg": "开启在线状态"}
|
return_str = json.dumps(return_json)
|
elif type == 70:
|
# 选股宝热门概念
|
data_json = data_process.parse(_str)
|
day = data_json["day"]
|
datas = data_json["data"]
|
if datas:
|
hot_block_data_process.save_datas(day, datas)
|
print(datas)
|
elif type == 71:
|
# 根据代码获取选股宝热门概念
|
day = tool.get_now_date_str()
|
code = data_process.parse(_str)["data"]["code"]
|
__start_time = time.time()
|
final_data = {'code': code, 'data': code_info_output.get_output_html(code)}
|
return_str = json.dumps({"code": 0, "data": final_data})
|
print("代码信息获取时间", code, round((time.time() - __start_time) * 1000))
|
pass
|
# 获取最近2个交易日涨停代码
|
elif type == 72:
|
day = tool.get_now_date_str()
|
data_dict = {}
|
for i in range(0, 2):
|
day = juejin.JueJinManager.get_previous_trading_date(day)
|
data_list = list(block_info.KPLLimitUpDataRecordManager.list_all(day))
|
codes_set = set()
|
if data_list:
|
for d in data_list:
|
if len(d[4]) > 6:
|
codes_set.add(d[3])
|
data_dict[day] = list(codes_set)
|
return_str = json.dumps({"code": 0, "data": data_dict})
|
elif type == 80:
|
# 撤单
|
data = json.loads(_str)
|
code = data["data"]["code"]
|
if code:
|
return_str = json.dumps({"code": 0})
|
try:
|
l2_data_manager_new.L2TradeDataProcessor.cancel_buy(code, "手动撤销")
|
except Exception as e:
|
return_str = json.dumps({"code": 2, "msg": str(e)})
|
else:
|
return_str = json.dumps({"code": 1, "msg": "请上传代码"})
|
elif type == 201:
|
# 加入黑名单
|
data = json.loads(_str)
|
codes = data["data"]["codes"]
|
for code in codes:
|
l2_trade_util.forbidden_trade(code)
|
name = gpcode_manager.get_code_name(code)
|
if not name:
|
results = juejin.JueJinManager.get_gp_codes_names([code])
|
if results:
|
gpcode_manager.CodesNameManager.add_first_code_name(code, results[code])
|
|
return_str = json.dumps({"code": 0})
|
elif type == 202:
|
# 加入白名单
|
data = json.loads(_str)
|
codes = data["data"]["codes"]
|
try:
|
for code in codes:
|
# 自由流通市值>50亿,股价高于30块的不能加白名单
|
limit_up_price = gpcode_manager.get_limit_up_price(code)
|
if float(limit_up_price) > 30:
|
raise Exception("股价高于30元")
|
# zyltgb = global_util.zyltgb_map.get(code)
|
# if zyltgb is None:
|
# global_data_loader.load_zyltgb()
|
# zyltgb = global_util.zyltgb_map.get(code)
|
# if zyltgb > 50 * 100000000:
|
# raise Exception("自由流通股本大于50亿")
|
|
l2_trade_util.WhiteListCodeManager.add_code(code)
|
name = gpcode_manager.get_code_name(code)
|
if not name:
|
results = juejin.JueJinManager.get_gp_codes_names([code])
|
if results:
|
gpcode_manager.CodesNameManager.add_first_code_name(code, results[code])
|
return_str = json.dumps({"code": 0})
|
except Exception as e:
|
return_str = json.dumps({"code": 1, "msg": str(e)})
|
|
elif type == 203:
|
# 移除黑名单
|
data = json.loads(_str)
|
codes = data["data"]["codes"]
|
for code in codes:
|
l2_trade_util.remove_from_forbidden_trade_codes(code)
|
return_str = json.dumps({"code": 0})
|
elif type == 204:
|
# 移除白名单
|
data = json.loads(_str)
|
codes = data["data"]["codes"]
|
for code in codes:
|
l2_trade_util.WhiteListCodeManager.remove_code(code)
|
return_str = json.dumps({"code": 0})
|
elif type == 301:
|
# 黑名单列表
|
codes = l2_trade_util.BlackListCodeManager.list_codes()
|
datas = []
|
for code in codes:
|
name = gpcode_manager.get_code_name(code)
|
datas.append(f"{name}:{code}")
|
return_str = json.dumps({"code": 0, "data": datas})
|
elif type == 302:
|
# 黑名单列表
|
codes = l2_trade_util.WhiteListCodeManager.list_codes()
|
datas = []
|
for code in codes:
|
name = gpcode_manager.get_code_name(code)
|
datas.append(f"{name}:{code}")
|
return_str = json.dumps({"code": 0, "data": datas})
|
elif type == 401:
|
# 加入想要买
|
data = json.loads(_str)
|
codes = data["data"]["codes"]
|
for code in codes:
|
gpcode_manager.WantBuyCodesManager.add_code(code)
|
name = gpcode_manager.get_code_name(code)
|
if not name:
|
results = juejin.JueJinManager.get_gp_codes_names([code])
|
if results:
|
gpcode_manager.CodesNameManager.add_first_code_name(code, results[code])
|
if "plates" in data["data"]:
|
for i in range(len(data["data"]["plates"])):
|
self.__KPLCodeLimitUpReasonManager.save_reason(codes[i], data["data"]["plates"][i])
|
|
return_str = json.dumps({"code": 0})
|
elif type == 402:
|
data = json.loads(_str)
|
codes = data["data"]["codes"]
|
for code in codes:
|
gpcode_manager.WantBuyCodesManager.remove_code(code)
|
return_str = json.dumps({"code": 0})
|
elif type == 403:
|
plate = None
|
include_codes = set()
|
if _str:
|
data = json.loads(_str)
|
plate = data.get("plate")
|
if plate:
|
code_map = self.__KPLCodeLimitUpReasonManager.list_all()
|
for k in code_map:
|
if code_map[k] == plate:
|
include_codes.add(k)
|
|
codes = gpcode_manager.WantBuyCodesManager.list_code()
|
datas = []
|
for code in codes:
|
if plate and plate != '其他' and code not in include_codes:
|
continue
|
name = gpcode_manager.get_code_name(code)
|
datas.append(f"{name}:{code}")
|
|
return_str = json.dumps({"code": 0, "data": datas})
|
elif type == 501:
|
data = json.loads(_str)
|
is_open = data["data"]["open"]
|
if is_open:
|
trade_manager.TradeStateManager.open_buy()
|
else:
|
trade_manager.TradeStateManager.close_buy()
|
return_str = json.dumps({"code": 0, "msg": ("开启成功" if is_open else "关闭成功")})
|
elif type == 502:
|
can_buy = trade_manager.TradeStateManager.is_can_buy()
|
return_str = json.dumps({"code": 0, "data": {"can_buy": can_buy}})
|
elif type == 601:
|
pass
|
# 加自选
|
elif type == 602:
|
pass
|
# 移除自选
|
|
sk.send(return_str.encode())
|
|
# print("----------handler end ----------")
|
|
def finish(self):
|
super().finish() # 可以不调用父类的finish(),方法,父类的finish方法什么都没做
|
# print("--------finish方法被执行---")
|
|
|
def send_msg(client_id, data):
|
_ip = client_manager.getActiveClientIP(client_id)
|
print("ip", client_id, _ip)
|
if _ip is None or len(_ip) <= 0:
|
raise Exception("客户端IP为空")
|
socketClient = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
socketClient.connect((_ip, 9006))
|
# 连接socket
|
try:
|
socketClient.send(json.dumps(data).encode())
|
recv = socketClient.recv(1024)
|
result = str(recv, encoding="gbk")
|
return result
|
finally:
|
socketClient.close()
|
|
|
# 客户端心跳机制
|
def test_client_server():
|
while True:
|
clients = authority.get_l2_clients()
|
for client in clients:
|
print("心跳", client)
|
try:
|
send_msg(client, {"action": "test"})
|
except:
|
pass
|
# 矫正客户端代码
|
l2_code_operate.correct_client_codes()
|
time.sleep(5)
|
|
|
# 获取采集客户端的状态
|
def get_client_env_state(client):
|
result = send_msg(client, {"action": "getEnvState"})
|
result = json.loads(result)
|
if result["code"] == 0:
|
return json.loads(result["data"])
|
else:
|
raise Exception(result["msg"])
|
|
|
# 修复采集客户端
|
def repair_client_env(client):
|
result = send_msg(client, {"action": "repairEnv"})
|
result = json.loads(result)
|
if result["code"] != 0:
|
raise Exception(result["msg"])
|
|
|
# 同步目标标的到同花顺
|
def sync_target_codes_to_ths():
|
codes = gpcode_manager.get_second_gp_list()
|
code_list = []
|
for code in codes:
|
code_list.append(code)
|
client = authority._get_client_ids_by_rule("data-maintain")
|
result = send_msg(client[0], {"action": "syncTargetCodes", "data": code_list})
|
return result
|
|
|
# 修复同花顺主站
|
def repair_ths_main_site(client):
|
result = send_msg(client, {"action": "updateTHSSite"})
|
result = json.loads(result)
|
if result["code"] != 0:
|
raise Exception(result["msg"])
|
else:
|
# 测速成功
|
client_infos = []
|
for index in range(0, constant.L2_CODE_COUNT_PER_DEVICE):
|
client_infos.append((client, index))
|
l2_listen_pos_health_manager.init_all(client_infos)
|
|
|
if __name__ == "__main__1":
|
cid, pid = gpcode_manager.get_listen_code_pos("000070")
|
print(cid, pid)
|
|
if __name__ == "__main__":
|
codes = gpcode_manager.get_first_gp_codes()
|
for code in codes:
|
try:
|
global_data_loader.load_zyltgb()
|
limit_up_price = float(gpcode_manager.get_limit_up_price(code))
|
volumes_data = juejin.get_volumns_by_code(code, 150)
|
volumes_data = volumes_data[1:]
|
volumes = juejin.parse_max_volume(volumes_data[:60],
|
code_nature_analyse.is_new_top(limit_up_price,
|
volumes_data[:60]))
|
logger_first_code_record.info("{} 获取到首板60天最大量:{}", code, volumes)
|
code_volumn_manager.set_histry_volumn(code, volumes[0], volumes[1], volumes[2])
|
# 判断K线形态
|
k_format = code_nature_analyse.get_k_format(
|
limit_up_price, volumes_data)
|
print(k_format)
|
|
code_nature_analyse.set_record_datas(code,
|
gpcode_manager.get_limit_up_price(code),
|
volumes_data)
|
except:
|
pass
|
|
# code_nature_analyse.set_record_datas(code,
|
# limit_up_price,
|
# volumes_data)
|