"""
|
接受客户端数据的服务器
|
"""
|
import decimal
|
import json
|
import logging
|
import random
|
import socketserver
|
import socket
|
import threading
|
import time
|
|
from cancel_strategy.s_l_h_cancel_strategy import LCancelBigNumComputer
|
from utils import alert_util, data_process, global_util, ths_industry_util, tool, import_util, socket_util
|
from code_attribute import code_volumn_manager, global_data_loader, gpcode_manager, first_target_code_data_processor
|
import constant
|
from user import authority
|
from l2 import l2_data_manager_new, l2_data_manager, l2_data_log, l2_log, code_price_manager
|
import l2_data_util
|
import l2.l2_data_util
|
|
from third_data import block_info
|
from third_data.code_plate_key_manager import CodesHisReasonAndBlocksManager
|
from third_data.history_k_data_util import HistoryKDatasUtils
|
from third_data.kpl_data_manager import KPLCodeLimitUpReasonManager
|
from ths import l2_listen_pos_health_manager, l2_code_operate, client_manager
|
from trade import trade_data_manager, trade_manager, l2_trade_util, \
|
current_price_process_manager, trade_juejin
|
from code_attribute.code_data_util import ZYLTGBUtil
|
import l2.transaction_progress
|
|
from log_module.log import logger_l2_error, logger_device, logger_trade_delegate, logger_buy_1_volumn_record, \
|
logger_l2_trade_queue, logger_l2_latest_data, logger_l2_trade_buy_queue, logger_debug
|
from trade.huaxin import huaxin_trade_record_manager
|
from trade.trade_manager import TradeTargetCodeModeManager
|
from trade.trade_queue_manager import THSBuy1VolumnManager, thsl2tradequeuemanager
|
|
ths_util = import_util.import_lib("ths.ths_util")
|
trade_gui = import_util.import_lib("trade.trade_gui")
|
|
|
class MyTCPServer(socketserver.TCPServer):
|
def __init__(self, server_address, RequestHandlerClass, bind_and_activate=True, pipe_trade=None, pipe_ui=None):
|
self.pipe_trade = pipe_trade # 增加的参数
|
self.pipe_ui = pipe_ui
|
# 初始化数据
|
block_info.init()
|
socketserver.TCPServer.__init__(self, server_address, RequestHandlerClass, bind_and_activate=bind_and_activate)
|
|
|
# 如果使用异步的形式则需要再重写ThreadingTCPServer
|
class MyThreadingTCPServer(socketserver.ThreadingMixIn, MyTCPServer): pass
|
|
|
# 首板tick级数据
|
|
|
class MyBaseRequestHandle(socketserver.BaseRequestHandler):
|
l2_data_error_dict = {}
|
last_trade_delegate_data = None
|
buy1_volumn_manager = THSBuy1VolumnManager()
|
ths_l2_trade_queue_manager = thsl2tradequeuemanager()
|
|
latest_buy1_volumn_dict = {}
|
l2_trade_queue_time_dict = {}
|
l2_save_time_dict = {}
|
l2_trade_buy_queue_dict = {}
|
tradeBuyQueue = l2.transaction_progress.TradeBuyQueue()
|
last_time = {}
|
first_tick_datas = []
|
latest_oringin_data = {}
|
last_l2_listen_health_time = {}
|
__KPLCodeLimitUpReasonManager = KPLCodeLimitUpReasonManager()
|
__CodesPlateKeysManager = CodesHisReasonAndBlocksManager()
|
# 在L2监控上采集的现价
|
__l2_current_price_data = {}
|
|
def setup(self):
|
super().setup() # 可以不调用父类的setup()方法,父类的setup方法什么都没做
|
# print("----setup方法被执行-----")
|
# print("打印传入的参数:", self.server.pipe_trade)
|
self.l2CodeOperate = l2_code_operate.L2CodeOperate.get_instance()
|
|
def __notify_trade(self, type_):
|
if self.server.pipe_trade:
|
self.server.pipe_trade.send(json.dumps({"type": type_}))
|
|
def handle(self):
|
host = self.client_address[0]
|
super().handle() # 可以不调用父类的handler(),方法,父类的handler方法什么都没做
|
# print("-------handler方法被执行----")
|
# print(self.server)
|
# print(self.request) # 服务
|
# print("客户端地址:", self.client_address) # 客户端地址
|
# print(self.__dict__)
|
# print("- " * 30)
|
# print(self.server.__dict__)
|
# print("- " * 30)
|
sk: socket.socket = self.request
|
while True:
|
data = sk.recv(1024 * 100)
|
if len(data) == 0:
|
# print("客户端断开连接")
|
break
|
_str = str(data, encoding="gbk")
|
if len(_str) > 0:
|
# print("结果:",_str)
|
type = -1
|
try:
|
# 如果带有头
|
if _str.startswith("##"):
|
total_length = int(_str[2:10])
|
_str = _str[10:]
|
# 防止socket数据发生粘连
|
while total_length > len(_str):
|
d = sk.recv(1024 * 100)
|
if d:
|
_str += d.decode(encoding='gbk')
|
type = data_process.parseType(_str)
|
except Exception as e:
|
print("接受到的异常数据:", f"{_str[:10]}...{_str[-10:]}")
|
if str(e).find("Unterminated string starting") > -1:
|
_str = _str.replace("\n", "")
|
type = data_process.parseType(_str)
|
else:
|
print(_str)
|
return_str = "OK"
|
if type == 0:
|
try:
|
|
origin_start_time = round(time.time() * 1000)
|
__start_time = round(time.time() * 1000)
|
|
# level2盘口数据
|
day, client, channel, code, capture_time, process_time, origin_datas, origin_datas_count = l2.l2_data_util.parseL2Data(
|
_str)
|
last_health_time = self.last_l2_listen_health_time.get((client, channel))
|
# --------------------------------设置L2健康状态--------------------------------
|
if last_health_time is None or __start_time - last_health_time > 1000:
|
self.last_l2_listen_health_time[(client, channel)] = __start_time
|
# 更新监听位健康状态
|
if origin_datas_count == 0:
|
l2_listen_pos_health_manager.set_unhealthy(client, channel)
|
else:
|
l2_listen_pos_health_manager.set_healthy(client, channel)
|
|
l2_log.threadIds[code] = random.randint(0, 100000)
|
if True:
|
# 间隔1s保存一条l2的最后一条数据
|
if code not in self.l2_save_time_dict or origin_start_time - self.l2_save_time_dict[
|
code] >= 1000 and len(origin_datas) > 0:
|
self.l2_save_time_dict[code] = origin_start_time
|
logger_l2_latest_data.info("{}#{}#{}", code, capture_time, origin_datas[-1])
|
|
# 10ms的网络传输延时
|
capture_timestamp = __start_time - process_time - 10
|
# print("截图时间:", process_time)
|
__start_time = l2_data_log.l2_time(code, round(time.time() * 1000) - __start_time,
|
"截图时间:{} 数据解析时间".format(process_time))
|
|
cid, pid = gpcode_manager.get_listen_code_pos(code)
|
|
__start_time = l2_data_log.l2_time(code, round(time.time() * 1000) - __start_time,
|
"l2获取代码位置耗时")
|
# 判断目标代码位置是否与上传数据位置一致
|
if cid is not None and pid is not None and client == int(cid) and channel == int(pid):
|
# l2.l2_data_util.set_l2_data_latest_count(code, len(origin_datas))
|
l2_data_util.save_l2_latest_data_number(code, origin_datas_count)
|
# 保存l2数据条数
|
if not origin_datas:
|
# or not l2.l2_data_util.is_origin_data_diffrent(origin_datas,self.latest_oringin_data.get(code)):
|
raise Exception("无新增数据")
|
# 保存最近的数据
|
self.latest_oringin_data[code] = origin_datas
|
limit_up_price = gpcode_manager.get_limit_up_price(code)
|
datas = l2.l2_data_util.L2DataUtil.format_l2_data(origin_datas, code, limit_up_price)
|
try:
|
# 校验客户端代码
|
l2_code_operate.verify_with_l2_data_pos_info(code, client, channel)
|
__start_time = round(time.time() * 1000)
|
if gpcode_manager.is_listen(code):
|
__start_time = l2_data_log.l2_time(code,
|
round(time.time() * 1000) - __start_time,
|
"l2外部数据预处理耗时")
|
l2_data_manager_new.L2TradeDataProcessor.process(code, datas, capture_timestamp)
|
__start_time = l2_data_log.l2_time(code,
|
round(time.time() * 1000) - __start_time,
|
"l2数据有效处理外部耗时",
|
False)
|
# 保存原始数据数量
|
# l2_data_util.save_l2_latest_data_number(code, len(origin_datas))
|
# if round(time.time() * 1000) - __start_time > 20:
|
# l2_data_log.l2_time(code, round(time.time() * 1000) - __start_time,
|
# "异步保存原始数据条数耗时",
|
# False)
|
|
except l2_data_manager.L2DataException as l:
|
# 单价不符
|
if l.get_code() == l2_data_manager.L2DataException.CODE_PRICE_ERROR:
|
key = "{}-{}-{}".format(client, channel, code)
|
if key not in self.l2_data_error_dict or round(
|
time.time() * 1000) - self.l2_data_error_dict[key] > 10000:
|
# self.l2CodeOperate.repaire_l2_data(code)
|
logger_l2_error.warning("code-{} l2单价错误:{}", code, l.msg)
|
# 单价不一致时需要移除代码重新添加
|
l2_code_operate.L2CodeOperate().remove_l2_listen(code, "l2监听单价错误")
|
self.l2_data_error_dict[key] = round(time.time() * 1000)
|
|
except Exception as e:
|
print("异常", str(e), code)
|
logging.exception(e)
|
logger_l2_error.error("出错:{}".format(str(e)))
|
logger_l2_error.error("内容:{}".format(_str))
|
finally:
|
|
__end_time = round(time.time() * 1000)
|
# 只记录大于40ms的数据
|
if __end_time - origin_start_time > 100:
|
l2_data_log.l2_time(code, round(time.time() * 1000) - origin_start_time,
|
"l2数据处理总耗时",
|
True)
|
except Exception as e:
|
if str(e).find("新增数据"):
|
pass
|
else:
|
logger_l2_error.exception(e)
|
|
elif type == 1:
|
# 设置股票代码
|
data_list, is_add = data_process.parseGPCode(_str)
|
ZYLTGBUtil.save_list(data_list)
|
code_list = []
|
for data in data_list:
|
code_list.append(data["code"])
|
# 获取基本信息
|
code_datas = HistoryKDatasUtils.get_gp_latest_info(code_list)
|
# if is_add:
|
# gpcode_manager.add_gp_list(code_datas)
|
# else:
|
# gpcode_manager.set_gp_list(code_datas)
|
|
if not is_add:
|
# 同步同花顺目标代码
|
t1 = threading.Thread(target=lambda: sync_target_codes_to_ths())
|
t1.setDaemon(True)
|
t1.start()
|
elif type == 2:
|
# 涨停代码
|
dataList, is_add = data_process.parseGPCode(_str)
|
# 设置涨停时间
|
gpcode_manager.set_limit_up_list(dataList)
|
# 保存到内存中
|
if dataList:
|
global_data_loader.add_limit_up_codes(dataList)
|
ths_industry_util.set_industry_hot_num(dataList)
|
# 保存涨停时间
|
gp_list = gpcode_manager.get_gp_list()
|
gp_code_set = set(gp_list)
|
now_str = tool.get_now_time_str()
|
if dataList:
|
for d in dataList:
|
if d["time"] == "00:00:00" or tool.get_time_as_second(now_str) < tool.get_time_as_second(
|
d["time"]):
|
continue
|
if d["code"] not in gp_code_set:
|
continue
|
|
# 获取是否有涨停时间
|
# if limit_up_time_manager.get_limit_up_time(d["code"]) is None:
|
# limit_up_time_manager.save_limit_up_time(d["code"], d["time"])
|
elif type == 22:
|
print("---接受到首板代码")
|
try:
|
if int(tool.get_now_time_str().replace(":", "")) < int("092500"):
|
raise Exception('未到接受时间')
|
# 首板代码
|
dataList, is_add = data_process.parseGPCode(_str)
|
tick_datas = first_target_code_data_processor.process_first_codes_datas(dataList)
|
# 保存现价
|
self.first_tick_datas.clear()
|
self.first_tick_datas.extend(tick_datas)
|
except Exception as e:
|
logging.exception(e)
|
finally:
|
print("首板代码处理完毕:")
|
return_str = socket_util.load_header(json.dumps({"code": 0}).encode("utf-8")).decode("utf-8")
|
|
elif type == 3:
|
# 交易成功信息
|
dataList = data_process.parseList(_str)
|
try:
|
trade_manager.process_trade_success_data(dataList)
|
except Exception as e:
|
logging.exception(e)
|
trade_manager.save_trade_success_data(dataList)
|
|
elif type == 5:
|
logger_trade_delegate.debug("接收到委托信息")
|
__start_time = round(time.time() * 1000)
|
try:
|
# 交易委托信息
|
dataList = data_process.parseList(_str)
|
if self.last_trade_delegate_data != _str:
|
self.last_trade_delegate_data = _str
|
# 保存委托信息
|
logger_trade_delegate.info(dataList)
|
try:
|
# 设置申报时间
|
for item in dataList:
|
apply_time = item["apply_time"]
|
if apply_time and len(apply_time) >= 8:
|
code = item["code"]
|
trade_state = trade_manager.CodesTradeStateManager().get_trade_state(code)
|
# 设置下单状态的代码为已委托
|
if trade_state == trade_manager.TRADE_STATE_BUY_PLACE_ORDER:
|
origin_apply_time = apply_time
|
apply_time = apply_time[0:6]
|
apply_time = "{}:{}:{}".format(apply_time[0:2], apply_time[2:4],
|
apply_time[4:6])
|
ms = origin_apply_time[6:9]
|
if int(ms) > 500:
|
# 时间+1s
|
apply_time = tool.trade_time_add_second(apply_time, 1)
|
|
print(apply_time)
|
except Exception as e:
|
logging.exception(e)
|
|
try:
|
trade_manager.process_trade_delegate_data(dataList)
|
except Exception as e:
|
logging.exception(e)
|
trade_manager.save_trade_delegate_data(dataList)
|
# 刷新交易界面
|
if trade_gui is not None:
|
trade_gui.THSGuiTrade().refresh_data()
|
finally:
|
pass
|
|
elif type == 4:
|
# 行业代码信息
|
dataList = data_process.parseList(_str)
|
codes = []
|
for datas in dataList:
|
for d in datas:
|
name = ths_industry_util.get_name_by_code(d['code'])
|
if not name or name == 'None':
|
codes.append(d["code"])
|
# 根据代码获取代码名称
|
codes_name = {}
|
if codes:
|
codes_name = HistoryKDatasUtils.get_gp_codes_names(codes)
|
ths_industry_util.save_industry_code(dataList, codes_name)
|
elif type == 6:
|
# 可用金额
|
datas = data_process.parseData(_str)
|
client = datas["client"]
|
money = datas["money"]
|
# TODO存入缓存文件
|
trade_manager.AccountAvailableMoneyManager().set_available_money(client, money)
|
# l2交易队列
|
elif type == 10:
|
# 可用金额
|
__start_time = time.time()
|
datas = data_process.parseData(_str)
|
channel = datas["channel"]
|
code = datas["code"]
|
msg = ""
|
try:
|
|
if not gpcode_manager.is_in_gp_pool(
|
code) and not gpcode_manager.FirstGPCodesManager().is_in_first_gp_codes_cache(code):
|
# 没在目标代码中且没有在首板今日历史代码中
|
raise Exception("代码没在监听中")
|
|
data = datas["data"]
|
buy_time = data["buyTime"]
|
buy_one_price = data["buyOnePrice"]
|
buy_one_volumn = data["buyOneVolumn"]
|
sell_one_price = data["sellOnePrice"]
|
sell_one_volumn = data["sellOneVolumn"]
|
|
buy_queue = data["buyQueue"]
|
if buy_one_price is None:
|
print('买1价没有,', code)
|
limit_up_price = gpcode_manager.get_limit_up_price(code)
|
|
if limit_up_price is not None:
|
code_price_manager.Buy1PriceManager().process(code, buy_one_price, buy_one_volumn, buy_time, limit_up_price,
|
sell_one_price, sell_one_volumn)
|
_start_time = time.time()
|
msg += "买1价格处理:" + f"{_start_time - __start_time} "
|
|
buy_queue_result_list = self.tradeBuyQueue.save(code, limit_up_price, buy_one_price,
|
buy_time,
|
buy_queue)
|
msg += "买队列保存:" + f"{time.time() - _start_time} "
|
_start_time = time.time()
|
|
if buy_queue_result_list:
|
|
# 有数据
|
try:
|
buy_one_price_ = decimal.Decimal(round(float(buy_one_price), 2)).quantize(
|
decimal.Decimal("0.00"))
|
# 获取执行位时间
|
order_begin_pos = l2_data_manager.TradePointManager().get_buy_compute_start_data_cache(code)
|
if True:
|
# 只有下单过后才获取交易进度
|
exec_time = None
|
try:
|
if order_begin_pos.buy_exec_index and order_begin_pos.buy_exec_index > -1:
|
exec_time = \
|
l2.l2_data_util.local_today_datas.get(code)[order_begin_pos.buy_exec_index]["val"]["time"]
|
except:
|
pass
|
buy_progress_index = self.tradeBuyQueue.compute_traded_index(code,
|
buy_one_price_,
|
buy_queue_result_list,
|
exec_time)
|
if buy_progress_index is not None:
|
LCancelBigNumComputer().set_trade_progress(code, order_begin_pos.buy_single_index, buy_progress_index,
|
l2.l2_data_util.local_today_datas.get(
|
code))
|
|
logger_l2_trade_buy_queue.info("获取成交位置成功: code-{} index-{} 数据-{}", code,
|
buy_progress_index,
|
json.dumps(buy_queue_result_list))
|
else:
|
raise Exception("暂未获取到交易进度")
|
msg += "计算成交进度:" + f"{time.time() - _start_time} "
|
_start_time = time.time()
|
except Exception as e:
|
logging.exception(e)
|
print("买入队列", code, buy_queue_result_list)
|
logger_l2_trade_buy_queue.warning("获取成交位置失败: code-{} 原因-{} 数据-{}", code, str(e),
|
json.dumps(buy_queue_result_list))
|
|
# buy_queue是否有变化
|
if self.l2_trade_buy_queue_dict.get(
|
code) is None or buy_queue != self.l2_trade_buy_queue_dict.get(
|
code):
|
self.l2_trade_buy_queue_dict[code] = buy_queue
|
logger_l2_trade_buy_queue.info("{}-{}", code, buy_queue)
|
msg += "保存记录日志:" + f"{time.time() - _start_time} "
|
_start_time = time.time()
|
# 保存最近的记录
|
if self.ths_l2_trade_queue_manager.save_recod(code, data):
|
if buy_time != "00:00:00":
|
logger_l2_trade_queue.info("{}-{}", code, data)
|
need_sync, need_cancel, cancel_msg = self.buy1_volumn_manager.save(code, buy_time,
|
int(buy_one_volumn),
|
buy_one_price)
|
# if need_sync:
|
# # 同步数据
|
# s = time.time()
|
# L2LimitUpMoneyStatisticUtil.verify_num(code, int(buy_one_volumn), buy_time)
|
# msg += "量校验:"+f"{time.time()-s} "
|
# print(buy_time, buy_one_price, buy_one_volumn)
|
|
# print("L2买卖队列",datas)
|
msg += "买1处理:" + f"{time.time() - _start_time} "
|
_start_time = time.time()
|
except:
|
pass
|
finally:
|
space = time.time() - __start_time
|
if space > 0.1:
|
logger_debug.info("{}成交队列处理时间:{},{}", code, space, msg)
|
|
elif type == 20:
|
# 登录
|
data = data_process.parse(_str)["data"]
|
try:
|
client_id, _authoritys = authority.login(data["account"], data["pwd"])
|
return_str = data_process.toJson(
|
{"code": 0, "data": {"client": int(client_id), "authoritys": json.loads(_authoritys)}})
|
except Exception as e:
|
return_str = data_process.toJson({"code": 1, "msg": str(e)})
|
# 现价更新
|
elif type == 40:
|
datas = data_process.parse(_str)["data"]
|
if datas is None:
|
datas = []
|
print("二板现价")
|
# 获取暂存的二版现价数据
|
if self.first_tick_datas:
|
datas.extend(self.first_tick_datas)
|
if datas is not None:
|
print("二板现价数量", len(datas))
|
for item in datas:
|
volumn = item["volume"]
|
volumnUnit = item["volumeUnit"]
|
code_volumn_manager.save_today_volumn(item["code"], volumn, volumnUnit)
|
current_price_process_manager.accept_prices(datas)
|
# L2现价更新
|
elif type == 41:
|
datas = data_process.parse(_str)["data"]
|
if datas:
|
for d in datas:
|
code = d["code"]
|
self.__l2_current_price_data[code] = d
|
|
elif type == 50:
|
data = data_process.parse(_str)["data"]
|
if data is not None:
|
index = data["index"]
|
code_name = data["codeName"].replace(" ", "")
|
volumn = data["volumn"]
|
price = data["price"]
|
time_ = data["time"]
|
code = global_util.name_codes.get(code_name)
|
if code is None:
|
global_data_loader.load_name_codes()
|
code = global_util.name_codes.get(code_name)
|
if code is not None:
|
# 记录日志
|
if self.latest_buy1_volumn_dict.get(code) != "{}-{}".format(volumn, price):
|
# 记录数据
|
logger_buy_1_volumn_record.info("{}-{}", code, data)
|
self.latest_buy1_volumn_dict[code] = "{}-{}".format(volumn, price)
|
# 校正时间
|
time_ = tool.compute_buy1_real_time(time_)
|
# 保存数据
|
need_sync, need_cancel, cancel_msg = self.buy1_volumn_manager.save(code, time_, volumn,
|
price)
|
# if need_cancel:
|
# l2_data_manager_new.L2TradeDataProcessor.cancel_buy(code, cancel_msg, "trade_queue")
|
# if need_sync:
|
# # 同步数据
|
# L2LimitUpMoneyStatisticUtil.verify_num(code, volumn, time_)
|
elif type == 30:
|
# 心跳信息
|
data = data_process.parse(_str)["data"]
|
client_id = data["client"]
|
thsDead = data.get("thsDead")
|
logger_device.info("({})客户端信息:{}".format(client_id, json.dumps(data)))
|
client_manager.saveClientActive(int(client_id), host, thsDead)
|
if constant.is_windows():
|
# 动态导入
|
|
if ths_util.is_ths_dead(client_id):
|
# TODO 重启同花顺
|
# 报警
|
l2_clients = authority.get_l2_clients()
|
if client_id in l2_clients:
|
alert_util.alarm()
|
elif type == 60:
|
# L2自启动成功
|
data = data_process.parse(_str)["data"]
|
client_id = data["client"]
|
print("L2自启动成功", client_id)
|
now_str = tool.get_now_time_str()
|
ts = tool.get_time_as_second(now_str)
|
# 9点25到9点28之间的自启动就需要批量设置代码,目前永远不执行
|
if tool.get_time_as_second("09:24:50") <= ts <= tool.get_time_as_second("09:28:00") and False:
|
# 准备批量设置代码
|
return_json = {"code": 1, "msg": "等待批量设置代码"}
|
return_str = json.dumps(return_json)
|
# 获取排名前16位的代码
|
codes = trade_data_manager.CodeActualPriceProcessor().get_top_rate_codes(16)
|
codes = sorted(codes)
|
if client_id == 2:
|
codes = codes[:constant.L2_CODE_COUNT_PER_DEVICE]
|
else:
|
codes = codes[constant.L2_CODE_COUNT_PER_DEVICE:]
|
codes_datas = []
|
for i in range(0, constant.L2_CODE_COUNT_PER_DEVICE):
|
if i >= len(codes):
|
break
|
codes_datas.append((i, codes[i]))
|
# 如果设置失败需要重试2次
|
for i in range(0, 3):
|
set_success = l2_code_operate.betch_set_client_codes(client_id, codes_datas)
|
if set_success:
|
break
|
else:
|
time.sleep(3)
|
else:
|
return_json = {"code": 0, "msg": "开启在线状态"}
|
return_str = json.dumps(return_json)
|
elif type == 70:
|
# 选股宝热门概念
|
data_json = data_process.parse(_str)
|
day = data_json["day"]
|
datas = data_json["data"]
|
# if datas:
|
# hot_block_data_process.save_datas(day, datas)
|
print(datas)
|
# 获取最近2个交易日涨停代码
|
elif type == 72:
|
day = tool.get_now_date_str()
|
data_dict = {}
|
for i in range(0, 2):
|
day = HistoryKDatasUtils.get_previous_trading_date_cache(day)
|
data_list = list(block_info.KPLLimitUpDataRecordManager.list_all(day))
|
codes_set = set()
|
if data_list:
|
for d in data_list:
|
if len(d[4]) > 6:
|
codes_set.add(d[3])
|
data_dict[day] = list(codes_set)
|
return_str = json.dumps({"code": 0, "data": data_dict})
|
elif type == 80:
|
# 撤单
|
data = json.loads(_str)
|
code = data["data"]["code"]
|
if code:
|
state = trade_manager.CodesTradeStateManager().get_trade_state(code)
|
if state == trade_manager.TRADE_STATE_BUY_PLACE_ORDER or state == trade_manager.TRADE_STATE_BUY_DELEGATED or state == trade_manager.TRADE_STATE_BUY_CANCEL_ING:
|
try:
|
l2_data_manager_new.L2TradeDataProcessor.cancel_buy(code, "手动撤销")
|
return_str = json.dumps({"code": 0})
|
except Exception as e:
|
return_str = json.dumps({"code": 2, "msg": str(e)})
|
else:
|
return_str = json.dumps({"code": 1, "msg": "未处于可撤单状态"})
|
else:
|
return_str = json.dumps({"code": 1, "msg": "请上传代码"})
|
|
elif type == 82:
|
# 获取委托列表
|
data = json.loads(_str)
|
update_time = data["data"]["update_time"]
|
results, update_time = huaxin_trade_record_manager.DelegateRecordManager.list_by_day(
|
tool.get_now_date_str("%Y%m%d"), update_time)
|
return_str = json.dumps(
|
{"code": 0, "data": {"list": results, "updateTime": update_time}, "msg": "请上传代码"})
|
|
elif type == 201:
|
# 加入黑名单
|
data = json.loads(_str)
|
codes = data["data"]["codes"]
|
for code in codes:
|
l2_trade_util.forbidden_trade(code, msg="手动加入")
|
name = gpcode_manager.get_code_name(code)
|
if not name:
|
results = HistoryKDatasUtils.get_gp_codes_names([code])
|
if results:
|
gpcode_manager.CodesNameManager.add_first_code_name(code, results[code])
|
|
return_str = json.dumps({"code": 0})
|
self.__notify_trade("black_list")
|
elif type == 202:
|
# 加入白名单
|
data = json.loads(_str)
|
codes = data["data"]["codes"]
|
try:
|
for code in codes:
|
# 自由流通市值>50亿,股价高于30块的不能加白名单
|
# limit_up_price = gpcode_manager.get_limit_up_price(code)
|
# if float(limit_up_price) > 30:
|
# raise Exception("股价高于30元")
|
# zyltgb = global_util.zyltgb_map.get(code)
|
# if zyltgb is None:
|
# global_data_loader.load_zyltgb()
|
# zyltgb = global_util.zyltgb_map.get(code)
|
# if zyltgb > 50 * 100000000:
|
# raise Exception("自由流通股本大于50亿")
|
|
gpcode_manager.WhiteListCodeManager().add_code(code)
|
name = gpcode_manager.get_code_name(code)
|
if not name:
|
results = HistoryKDatasUtils.get_gp_codes_names([code])
|
if results:
|
gpcode_manager.CodesNameManager.add_first_code_name(code, results[code])
|
return_str = json.dumps({"code": 0})
|
except Exception as e:
|
return_str = json.dumps({"code": 1, "msg": str(e)})
|
self.__notify_trade("white_list")
|
|
elif type == 203:
|
# 移除黑名单
|
data = json.loads(_str)
|
codes = data["data"]["codes"]
|
for code in codes:
|
l2_trade_util.remove_from_forbidden_trade_codes(code)
|
return_str = json.dumps({"code": 0})
|
self.__notify_trade("black_list")
|
elif type == 204:
|
# 移除白名单
|
data = json.loads(_str)
|
codes = data["data"]["codes"]
|
for code in codes:
|
gpcode_manager.WhiteListCodeManager().remove_code(code)
|
return_str = json.dumps({"code": 0})
|
self.__notify_trade("white_list")
|
elif type == 301:
|
# 黑名单列表
|
codes = gpcode_manager.BlackListCodeManager().list_codes()
|
datas = []
|
for code in codes:
|
name = gpcode_manager.get_code_name(code)
|
datas.append(f"{name}:{code}")
|
return_str = json.dumps({"code": 0, "data": datas})
|
elif type == 302:
|
# 黑名单列表
|
codes = gpcode_manager.WhiteListCodeManager().list_codes()
|
datas = []
|
for code in codes:
|
name = gpcode_manager.get_code_name(code)
|
datas.append(f"{name}:{code}")
|
return_str = json.dumps({"code": 0, "data": datas})
|
|
elif type == 401:
|
# 加入想要买
|
data = json.loads(_str)
|
codes = data["data"]["codes"]
|
for code in codes:
|
gpcode_manager.WantBuyCodesManager().add_code(code)
|
name = gpcode_manager.get_code_name(code)
|
if not name:
|
results = HistoryKDatasUtils.get_gp_codes_names([code])
|
if results:
|
gpcode_manager.CodesNameManager.add_first_code_name(code, results[code])
|
if "plates" in data["data"]:
|
for i in range(len(data["data"]["plates"])):
|
self.__KPLCodeLimitUpReasonManager.save_reason(codes[i], data["data"]["plates"][i])
|
|
return_str = json.dumps({"code": 0})
|
self.__notify_trade("want_list")
|
elif type == 402:
|
data = json.loads(_str)
|
codes = data["data"]["codes"]
|
for code in codes:
|
gpcode_manager.WantBuyCodesManager().remove_code(code)
|
return_str = json.dumps({"code": 0})
|
self.__notify_trade("want_list")
|
elif type == 403:
|
plate = None
|
include_codes = set()
|
if _str:
|
data = json.loads(_str)
|
plate = data.get("plate")
|
if plate:
|
code_map = self.__KPLCodeLimitUpReasonManager.list_all()
|
for k in code_map:
|
if code_map[k] == plate:
|
include_codes.add(k)
|
|
codes = gpcode_manager.WantBuyCodesManager().list_code_cache()
|
datas = []
|
for code in codes:
|
if plate and plate != '其他' and code not in include_codes:
|
continue
|
name = gpcode_manager.get_code_name(code)
|
datas.append(f"{name}:{code}")
|
|
return_str = json.dumps({"code": 0, "data": datas})
|
elif type == 411:
|
data = json.loads(_str)
|
codes = data["data"]["codes"]
|
for code in codes:
|
gpcode_manager.PauseBuyCodesManager().add_code(code)
|
name = gpcode_manager.get_code_name(code)
|
if not name:
|
results = HistoryKDatasUtils.get_gp_codes_names([code])
|
if results:
|
gpcode_manager.CodesNameManager.add_first_code_name(code, results[code])
|
return_str = json.dumps({"code": 0})
|
self.__notify_trade("pause_buy_list")
|
# 加入暂停买入列表
|
elif type == 412:
|
# 移除暂停买入列表
|
data = json.loads(_str)
|
codes = data["data"]["codes"]
|
for code in codes:
|
gpcode_manager.PauseBuyCodesManager().remove_code(code)
|
return_str = json.dumps({"code": 0})
|
self.__notify_trade("pause_buy_list")
|
|
elif type == 413:
|
# 暂停买入列表
|
codes = gpcode_manager.PauseBuyCodesManager().list_code()
|
datas = []
|
for code in codes:
|
name = gpcode_manager.get_code_name(code)
|
datas.append(f"{name}:{code}")
|
return_str = json.dumps({"code": 0, "data": datas})
|
|
elif type == 420:
|
# 是否可以撤单
|
data = json.loads(_str)
|
codes = data["data"]["codes"]
|
code = codes[0]
|
state = trade_manager.CodesTradeStateManager().get_trade_state(code)
|
if state != trade_manager.TRADE_STATE_BUY_CANCEL_SUCCESS and state != trade_manager.TRADE_STATE_BUY_SUCCESS:
|
return_str = json.dumps({"code": 0, "msg": "可以取消"})
|
else:
|
return_str = json.dumps({"code": 1, "msg": "不可以取消"})
|
|
elif type == 430:
|
# 查询代码属性
|
data = json.loads(_str)
|
code = data["data"]["code"]
|
# 查询是否想买单/白名单/黑名单/暂不买
|
code_name = gpcode_manager.get_code_name(code)
|
want = gpcode_manager.WantBuyCodesManager().is_in_cache(code)
|
white = gpcode_manager.WhiteListCodeManager().is_in_cache(code)
|
black = l2_trade_util.is_in_forbidden_trade_codes(code)
|
pause_buy = gpcode_manager.PauseBuyCodesManager().is_in_cache(code)
|
|
desc_list = []
|
if want:
|
desc_list.append("【想买单】")
|
if white:
|
desc_list.append("【白名单】")
|
if black:
|
desc_list.append("【黑名单】")
|
if pause_buy:
|
desc_list.append("【暂不买】")
|
return_str = json.dumps(
|
{"code": 0, "data": {"code_info": (code, code_name), "desc": "".join(desc_list)}})
|
|
|
elif type == 501:
|
data = json.loads(_str)
|
is_open = data["data"]["open"]
|
if is_open:
|
trade_manager.TradeStateManager().open_buy()
|
else:
|
trade_manager.TradeStateManager().close_buy()
|
return_str = json.dumps({"code": 0, "msg": ("开启成功" if is_open else "关闭成功")})
|
self.__notify_trade("trade_state")
|
elif type == 502:
|
can_buy = trade_manager.TradeStateManager().is_can_buy_cache()
|
return_str = json.dumps({"code": 0, "data": {"can_buy": can_buy}})
|
elif type == 503:
|
# 设置交易目标代码的模式
|
data = json.loads(_str)
|
mode = data["data"]["mode"]
|
try:
|
TradeTargetCodeModeManager().set_mode(mode)
|
return_str = json.dumps({"code": 0, "data": {"mode": mode}})
|
except Exception as e:
|
return_str = json.dumps({"code": 1, "msg": str(e)})
|
self.__notify_trade("trade_mode")
|
elif type == 504:
|
# 获取交易目标代码模式
|
mode = TradeTargetCodeModeManager().get_mode_cache()
|
return_str = json.dumps({"code": 0, "data": {"mode": mode}})
|
elif type == 601:
|
pass
|
# 加自选
|
elif type == 602:
|
pass
|
# 移除自选
|
|
sk.send(return_str.encode())
|
|
# print("----------handler end ----------")
|
|
def finish(self):
|
super().finish() # 可以不调用父类的finish(),方法,父类的finish方法什么都没做
|
# print("--------finish方法被执行---")
|
|
|
def send_msg(client_id, data):
|
_ip = client_manager.getActiveClientIP(client_id)
|
# print("ip", client_id, _ip)
|
if _ip is None or len(_ip) <= 0:
|
raise Exception("客户端IP为空")
|
socketClient = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
socketClient.connect((_ip, 9006))
|
# 连接socket
|
try:
|
socketClient.send(json.dumps(data).encode())
|
recv = socketClient.recv(1024)
|
result = str(recv, encoding="gbk")
|
return result
|
finally:
|
socketClient.close()
|
|
|
# 客户端心跳机制
|
def test_client_server():
|
while True:
|
clients = authority.get_l2_clients()
|
for client in clients:
|
# print("心跳", client)
|
try:
|
send_msg(client, {"action": "test"})
|
except:
|
pass
|
# 矫正客户端代码
|
l2_code_operate.correct_client_codes()
|
time.sleep(5)
|
|
|
# 获取采集客户端的状态
|
def get_client_env_state(client):
|
result = send_msg(client, {"action": "getEnvState"})
|
result = json.loads(result)
|
if result["code"] == 0:
|
return json.loads(result["data"])
|
else:
|
raise Exception(result["msg"])
|
|
|
# 修复采集客户端
|
def repair_client_env(client):
|
result = send_msg(client, {"action": "repairEnv"})
|
result = json.loads(result)
|
if result["code"] != 0:
|
raise Exception(result["msg"])
|
|
|
# 同步目标标的到同花顺
|
def sync_target_codes_to_ths():
|
codes = gpcode_manager.get_second_gp_list()
|
code_list = []
|
for code in codes:
|
code_list.append(code)
|
client = authority._get_client_ids_by_rule("data-maintain")
|
result = send_msg(client[0], {"action": "syncTargetCodes", "data": code_list})
|
return result
|
|
|
# 修复同花顺主站
|
def repair_ths_main_site(client):
|
result = send_msg(client, {"action": "updateTHSSite"})
|
result = json.loads(result)
|
if result["code"] != 0:
|
raise Exception(result["msg"])
|
else:
|
# 测速成功
|
client_infos = []
|
for index in range(0, constant.L2_CODE_COUNT_PER_DEVICE):
|
client_infos.append((client, index))
|
l2_listen_pos_health_manager.init_all(client_infos)
|
|
|
if __name__ == "__main__1":
|
|
# 交易成功无法读取时备用
|
while True:
|
try:
|
datas = trade_juejin.get_execution_reports()
|
# 上传数据
|
fdatas = []
|
for d in datas:
|
fdatas.append(
|
{"code": d[0], "money": d[4], "num": d[2], "price": d[3], "time": d[7], "trade_num": d[5],
|
"type": d[1] - 1})
|
print(fdatas)
|
if fdatas:
|
try:
|
trade_manager.process_trade_success_data(fdatas)
|
except Exception as e:
|
logging.exception(e)
|
trade_manager.save_trade_success_data(fdatas)
|
except:
|
pass
|
time.sleep(1.5)
|