"""
|
接受客户端数据的服务器
|
"""
|
import decimal
|
import json
|
import logging
|
import socketserver
|
import socket
|
import time
|
|
from cancel_strategy.s_l_h_cancel_strategy import LCancelBigNumComputer
|
from utils import data_process, global_util, ths_industry_util, tool, import_util, socket_util
|
from code_attribute import code_volumn_manager, global_data_loader, gpcode_manager, first_target_code_data_processor
|
from l2 import l2_data_manager_new, l2_data_manager, code_price_manager
|
import l2.l2_data_util
|
|
from third_data import block_info
|
from third_data.code_plate_key_manager import CodesHisReasonAndBlocksManager
|
from third_data.history_k_data_util import HistoryKDatasUtils
|
from third_data.kpl_data_manager import KPLCodeLimitUpReasonManager
|
from trade import trade_data_manager, trade_manager, l2_trade_util, \
|
current_price_process_manager, trade_juejin, trade_constant
|
import l2.transaction_progress
|
|
from log_module.log import logger_trade_delegate, logger_buy_1_volumn_record, \
|
logger_l2_trade_queue, logger_l2_trade_buy_queue, logger_debug
|
from trade.huaxin import huaxin_trade_record_manager
|
from trade.trade_manager import TradeTargetCodeModeManager
|
from trade.trade_queue_manager import THSBuy1VolumnManager, thsl2tradequeuemanager
|
|
ths_util = import_util.import_lib("ths.ths_util")
|
trade_gui = import_util.import_lib("trade.trade_gui")
|
|
|
class MyTCPServer(socketserver.TCPServer):
|
def __init__(self, server_address, RequestHandlerClass, bind_and_activate=True, pipe_trade=None, pipe_ui=None):
|
self.pipe_trade = pipe_trade # 增加的参数
|
self.pipe_ui = pipe_ui
|
# 初始化数据
|
block_info.init()
|
socketserver.TCPServer.__init__(self, server_address, RequestHandlerClass, bind_and_activate=bind_and_activate)
|
|
|
# 如果使用异步的形式则需要再重写ThreadingTCPServer
|
class MyThreadingTCPServer(socketserver.ThreadingMixIn, MyTCPServer): pass
|
|
|
# 首板tick级数据
|
|
|
class MyBaseRequestHandle(socketserver.BaseRequestHandler):
|
l2_data_error_dict = {}
|
last_trade_delegate_data = None
|
buy1_volumn_manager = THSBuy1VolumnManager()
|
ths_l2_trade_queue_manager = thsl2tradequeuemanager()
|
|
latest_buy1_volumn_dict = {}
|
l2_trade_queue_time_dict = {}
|
l2_save_time_dict = {}
|
l2_trade_buy_queue_dict = {}
|
tradeBuyQueue = l2.transaction_progress.TradeBuyQueue()
|
last_time = {}
|
first_tick_datas = []
|
latest_oringin_data = {}
|
last_l2_listen_health_time = {}
|
__KPLCodeLimitUpReasonManager = KPLCodeLimitUpReasonManager()
|
__CodesPlateKeysManager = CodesHisReasonAndBlocksManager()
|
# 在L2监控上采集的现价
|
__l2_current_price_data = {}
|
|
def setup(self):
|
super().setup() # 可以不调用父类的setup()方法,父类的setup方法什么都没做
|
# print("----setup方法被执行-----")
|
# print("打印传入的参数:", self.server.pipe_trade)
|
|
def __notify_trade(self, type_):
|
if self.server.pipe_trade:
|
self.server.pipe_trade.send(json.dumps({"type": type_}))
|
|
def handle(self):
|
host = self.client_address[0]
|
super().handle() # 可以不调用父类的handler(),方法,父类的handler方法什么都没做
|
# print("-------handler方法被执行----")
|
# print(self.server)
|
# print(self.request) # 服务
|
# print("客户端地址:", self.client_address) # 客户端地址
|
# print(self.__dict__)
|
# print("- " * 30)
|
# print(self.server.__dict__)
|
# print("- " * 30)
|
sk: socket.socket = self.request
|
while True:
|
data = sk.recv(1024 * 100)
|
if len(data) == 0:
|
# print("客户端断开连接")
|
break
|
_str = str(data, encoding="gbk")
|
if len(_str) > 0:
|
# print("结果:",_str)
|
type = -1
|
try:
|
# 如果带有头
|
if _str.startswith("##"):
|
total_length = int(_str[2:10])
|
_str = _str[10:]
|
# 防止socket数据发生粘连
|
while total_length > len(_str):
|
d = sk.recv(1024 * 100)
|
if d:
|
_str += d.decode(encoding='gbk')
|
type = data_process.parseType(_str)
|
except Exception as e:
|
if str(e).find("Unterminated string starting") > -1:
|
_str = _str.replace("\n", "")
|
type = data_process.parseType(_str)
|
else:
|
print(_str)
|
return_str = "OK"
|
if type == 2:
|
# 涨停代码
|
dataList, is_add = data_process.parseGPCode(_str)
|
# 设置涨停时间
|
gpcode_manager.set_limit_up_list(dataList)
|
# 保存到内存中
|
if dataList:
|
global_data_loader.add_limit_up_codes(dataList)
|
ths_industry_util.set_industry_hot_num(dataList)
|
# 保存涨停时间
|
gp_list = gpcode_manager.get_gp_list()
|
gp_code_set = set(gp_list)
|
now_str = tool.get_now_time_str()
|
if dataList:
|
for d in dataList:
|
if d["time"] == "00:00:00" or tool.get_time_as_second(now_str) < tool.get_time_as_second(
|
d["time"]):
|
continue
|
if d["code"] not in gp_code_set:
|
continue
|
|
# 获取是否有涨停时间
|
# if limit_up_time_manager.get_limit_up_time(d["code"]) is None:
|
# limit_up_time_manager.save_limit_up_time(d["code"], d["time"])
|
elif type == 22:
|
print("---接受到首板代码")
|
try:
|
if int(tool.get_now_time_str().replace(":", "")) < int("092500"):
|
raise Exception('未到接受时间')
|
# 首板代码
|
dataList, is_add = data_process.parseGPCode(_str)
|
tick_datas = first_target_code_data_processor.process_first_codes_datas(dataList)
|
# 保存现价
|
self.first_tick_datas.clear()
|
self.first_tick_datas.extend(tick_datas)
|
except Exception as e:
|
logging.exception(e)
|
finally:
|
print("首板代码处理完毕:")
|
return_str = socket_util.load_header(json.dumps({"code": 0}).encode("utf-8")).decode("utf-8")
|
|
elif type == 3:
|
# 交易成功信息
|
dataList = data_process.parseList(_str)
|
try:
|
trade_manager.process_trade_success_data(dataList)
|
except Exception as e:
|
logging.exception(e)
|
trade_manager.save_trade_success_data(dataList)
|
|
elif type == 5:
|
logger_trade_delegate.debug("接收到委托信息")
|
__start_time = round(time.time() * 1000)
|
try:
|
# 交易委托信息
|
dataList = data_process.parseList(_str)
|
if self.last_trade_delegate_data != _str:
|
self.last_trade_delegate_data = _str
|
# 保存委托信息
|
logger_trade_delegate.info(dataList)
|
try:
|
# 设置申报时间
|
for item in dataList:
|
apply_time = item["apply_time"]
|
if apply_time and len(apply_time) >= 8:
|
code = item["code"]
|
trade_state = trade_manager.CodesTradeStateManager().get_trade_state(code)
|
# 设置下单状态的代码为已委托
|
if trade_state == trade_constant.TRADE_STATE_BUY_PLACE_ORDER:
|
origin_apply_time = apply_time
|
apply_time = apply_time[0:6]
|
apply_time = "{}:{}:{}".format(apply_time[0:2], apply_time[2:4],
|
apply_time[4:6])
|
ms = origin_apply_time[6:9]
|
if int(ms) > 500:
|
# 时间+1s
|
apply_time = tool.trade_time_add_second(apply_time, 1)
|
|
print(apply_time)
|
except Exception as e:
|
logging.exception(e)
|
|
try:
|
trade_manager.process_trade_delegate_data(dataList)
|
except Exception as e:
|
logging.exception(e)
|
trade_manager.save_trade_delegate_data(dataList)
|
# 刷新交易界面
|
if trade_gui is not None:
|
trade_gui.THSGuiTrade().refresh_data()
|
finally:
|
pass
|
|
elif type == 4:
|
# 行业代码信息
|
dataList = data_process.parseList(_str)
|
codes = []
|
for datas in dataList:
|
for d in datas:
|
name = ths_industry_util.get_name_by_code(d['code'])
|
if not name or name == 'None':
|
codes.append(d["code"])
|
# 根据代码获取代码名称
|
codes_name = {}
|
if codes:
|
codes_name = HistoryKDatasUtils.get_gp_codes_names(codes)
|
ths_industry_util.save_industry_code(dataList, codes_name)
|
elif type == 6:
|
# 可用金额
|
datas = data_process.parseData(_str)
|
client = datas["client"]
|
money = datas["money"]
|
# TODO存入缓存文件
|
trade_data_manager.AccountMoneyManager().set_available_money(client, money)
|
# l2交易队列
|
elif type == 10:
|
# 可用金额
|
__start_time = time.time()
|
datas = data_process.parseData(_str)
|
channel = datas["channel"]
|
code = datas["code"]
|
msg = ""
|
try:
|
|
if not gpcode_manager.is_in_gp_pool(
|
code) and not gpcode_manager.FirstGPCodesManager().is_in_first_gp_codes_cache(code):
|
# 没在目标代码中且没有在首板今日历史代码中
|
raise Exception("代码没在监听中")
|
|
data = datas["data"]
|
buy_time = data["buyTime"]
|
buy_one_price = data["buyOnePrice"]
|
buy_one_volumn = data["buyOneVolumn"]
|
sell_one_price = data["sellOnePrice"]
|
sell_one_volumn = data["sellOneVolumn"]
|
|
buy_queue = data["buyQueue"]
|
if buy_one_price is None:
|
print('买1价没有,', code)
|
limit_up_price = gpcode_manager.get_limit_up_price(code)
|
|
if limit_up_price is not None:
|
code_price_manager.Buy1PriceManager().process(code, buy_one_price, buy_one_volumn, buy_time,
|
limit_up_price,
|
sell_one_price, sell_one_volumn)
|
_start_time = time.time()
|
msg += "买1价格处理:" + f"{_start_time - __start_time} "
|
|
buy_queue_result_list = self.tradeBuyQueue.save(code, limit_up_price, buy_one_price,
|
buy_time,
|
buy_queue)
|
msg += "买队列保存:" + f"{time.time() - _start_time} "
|
_start_time = time.time()
|
|
if buy_queue_result_list:
|
|
# 有数据
|
try:
|
buy_one_price_ = decimal.Decimal(round(float(buy_one_price), 2)).quantize(
|
decimal.Decimal("0.00"))
|
# 获取执行位时间
|
order_begin_pos = l2_data_manager.TradePointManager().get_buy_compute_start_data_cache(
|
code)
|
if True:
|
# 只有下单过后才获取交易进度
|
exec_time = None
|
try:
|
if order_begin_pos.buy_exec_index and order_begin_pos.buy_exec_index > -1:
|
exec_time = \
|
l2.l2_data_util.local_today_datas.get(code)[
|
order_begin_pos.buy_exec_index]["val"]["time"]
|
except:
|
pass
|
buy_progress_index = self.tradeBuyQueue.compute_traded_index(code,
|
buy_one_price_,
|
buy_queue_result_list,
|
exec_time)
|
if buy_progress_index is not None:
|
LCancelBigNumComputer().set_trade_progress(code,
|
order_begin_pos.buy_single_index,
|
buy_progress_index,
|
l2.l2_data_util.local_today_datas.get(
|
code))
|
|
logger_l2_trade_buy_queue.info("获取成交位置成功: code-{} index-{} 数据-{}", code,
|
buy_progress_index,
|
json.dumps(buy_queue_result_list))
|
else:
|
raise Exception("暂未获取到交易进度")
|
msg += "计算成交进度:" + f"{time.time() - _start_time} "
|
_start_time = time.time()
|
except Exception as e:
|
logging.exception(e)
|
print("买入队列", code, buy_queue_result_list)
|
logger_l2_trade_buy_queue.warning("获取成交位置失败: code-{} 原因-{} 数据-{}", code, str(e),
|
json.dumps(buy_queue_result_list))
|
|
# buy_queue是否有变化
|
if self.l2_trade_buy_queue_dict.get(
|
code) is None or buy_queue != self.l2_trade_buy_queue_dict.get(
|
code):
|
self.l2_trade_buy_queue_dict[code] = buy_queue
|
logger_l2_trade_buy_queue.info("{}-{}", code, buy_queue)
|
msg += "保存记录日志:" + f"{time.time() - _start_time} "
|
_start_time = time.time()
|
# 保存最近的记录
|
if self.ths_l2_trade_queue_manager.save_recod(code, data):
|
if buy_time != "00:00:00":
|
logger_l2_trade_queue.info("{}-{}", code, data)
|
need_sync, need_cancel, cancel_msg = self.buy1_volumn_manager.save(code, buy_time,
|
int(buy_one_volumn),
|
buy_one_price)
|
# if need_sync:
|
# # 同步数据
|
# s = time.time()
|
# L2LimitUpMoneyStatisticUtil.verify_num(code, int(buy_one_volumn), buy_time)
|
# msg += "量校验:"+f"{time.time()-s} "
|
# print(buy_time, buy_one_price, buy_one_volumn)
|
|
# print("L2买卖队列",datas)
|
msg += "买1处理:" + f"{time.time() - _start_time} "
|
_start_time = time.time()
|
except:
|
pass
|
finally:
|
space = time.time() - __start_time
|
if space > 0.1:
|
logger_debug.info("{}成交队列处理时间:{},{}", code, space, msg)
|
# 现价更新
|
elif type == 40:
|
datas = data_process.parse(_str)["data"]
|
if datas is None:
|
datas = []
|
print("二板现价")
|
# 获取暂存的二版现价数据
|
if self.first_tick_datas:
|
datas.extend(self.first_tick_datas)
|
if datas is not None:
|
print("二板现价数量", len(datas))
|
for item in datas:
|
volumn = item["volume"]
|
volumnUnit = item["volumeUnit"]
|
code_volumn_manager.CodeVolumeManager().save_today_volumn(item["code"], volumn, volumnUnit)
|
current_price_process_manager.accept_prices(datas)
|
# L2现价更新
|
elif type == 41:
|
datas = data_process.parse(_str)["data"]
|
if datas:
|
for d in datas:
|
code = d["code"]
|
self.__l2_current_price_data[code] = d
|
|
elif type == 50:
|
data = data_process.parse(_str)["data"]
|
if data is not None:
|
index = data["index"]
|
code_name = data["codeName"].replace(" ", "")
|
volumn = data["volumn"]
|
price = data["price"]
|
time_ = data["time"]
|
code = global_util.name_codes.get(code_name)
|
if code is None:
|
global_data_loader.load_name_codes()
|
code = global_util.name_codes.get(code_name)
|
if code is not None:
|
# 记录日志
|
if self.latest_buy1_volumn_dict.get(code) != "{}-{}".format(volumn, price):
|
# 记录数据
|
logger_buy_1_volumn_record.info("{}-{}", code, data)
|
self.latest_buy1_volumn_dict[code] = "{}-{}".format(volumn, price)
|
# 校正时间
|
time_ = tool.compute_buy1_real_time(time_)
|
# 保存数据
|
need_sync, need_cancel, cancel_msg = self.buy1_volumn_manager.save(code, time_, volumn,
|
price)
|
# if need_cancel:
|
# l2_data_manager_new.L2TradeDataProcessor.cancel_buy(code, cancel_msg, "trade_queue")
|
# if need_sync:
|
# # 同步数据
|
# L2LimitUpMoneyStatisticUtil.verify_num(code, volumn, time_)
|
elif type == 70:
|
# 选股宝热门概念
|
data_json = data_process.parse(_str)
|
day = data_json["day"]
|
datas = data_json["data"]
|
# if datas:
|
# hot_block_data_process.save_datas(day, datas)
|
print(datas)
|
# 获取最近2个交易日涨停代码
|
elif type == 72:
|
day = tool.get_now_date_str()
|
data_dict = {}
|
for i in range(0, 2):
|
day = HistoryKDatasUtils.get_previous_trading_date_cache(day)
|
data_list = list(block_info.KPLLimitUpDataRecordManager.list_all(day))
|
codes_set = set()
|
if data_list:
|
for d in data_list:
|
if len(d[4]) > 6:
|
codes_set.add(d[3])
|
data_dict[day] = list(codes_set)
|
return_str = json.dumps({"code": 0, "data": data_dict})
|
elif type == 80:
|
# 撤单
|
data = json.loads(_str)
|
code = data["data"]["code"]
|
if code:
|
state = trade_manager.CodesTradeStateManager().get_trade_state(code)
|
if state == trade_constant.TRADE_STATE_BUY_PLACE_ORDER or state == trade_constant.TRADE_STATE_BUY_DELEGATED or state == trade_constant.TRADE_STATE_BUY_CANCEL_ING:
|
try:
|
l2_data_manager_new.L2TradeDataProcessor.cancel_buy(code, "手动撤销",
|
cancel_type=trade_constant.CANCEL_TYPE_HUMAN)
|
return_str = json.dumps({"code": 0})
|
except Exception as e:
|
return_str = json.dumps({"code": 2, "msg": str(e)})
|
else:
|
return_str = json.dumps({"code": 1, "msg": "未处于可撤单状态"})
|
else:
|
return_str = json.dumps({"code": 1, "msg": "请上传代码"})
|
|
elif type == 82:
|
# 获取委托列表
|
data = json.loads(_str)
|
update_time = data["data"]["update_time"]
|
results, update_time = huaxin_trade_record_manager.DelegateRecordManager.list_by_day(
|
tool.get_now_date_str("%Y%m%d"), update_time)
|
return_str = json.dumps(
|
{"code": 0, "data": {"list": results, "updateTime": update_time}, "msg": "请上传代码"})
|
|
elif type == 201:
|
# 加入黑名单
|
data = json.loads(_str)
|
codes = data["data"]["codes"]
|
for code in codes:
|
l2_trade_util.forbidden_trade(code, msg="手动加入", force=True)
|
name = gpcode_manager.get_code_name(code)
|
if not name:
|
results = HistoryKDatasUtils.get_gp_codes_names([code])
|
if results:
|
gpcode_manager.CodesNameManager.add_first_code_name(code, results[code])
|
|
return_str = json.dumps({"code": 0})
|
self.__notify_trade("black_list")
|
elif type == 202:
|
# 加入白名单
|
data = json.loads(_str)
|
codes = data["data"]["codes"]
|
try:
|
for code in codes:
|
# 自由流通市值>50亿,股价高于30块的不能加白名单
|
# limit_up_price = gpcode_manager.get_limit_up_price(code)
|
# if float(limit_up_price) > 30:
|
# raise Exception("股价高于30元")
|
# zyltgb = global_util.zyltgb_map.get(code)
|
# if zyltgb is None:
|
# global_data_loader.load_zyltgb()
|
# zyltgb = global_util.zyltgb_map.get(code)
|
# if zyltgb > 50 * 100000000:
|
# raise Exception("自由流通股本大于50亿")
|
|
gpcode_manager.WhiteListCodeManager().add_code(code)
|
name = gpcode_manager.get_code_name(code)
|
if not name:
|
results = HistoryKDatasUtils.get_gp_codes_names([code])
|
if results:
|
gpcode_manager.CodesNameManager.add_first_code_name(code, results[code])
|
return_str = json.dumps({"code": 0})
|
except Exception as e:
|
return_str = json.dumps({"code": 1, "msg": str(e)})
|
self.__notify_trade("white_list")
|
|
elif type == 203:
|
# 移除黑名单
|
data = json.loads(_str)
|
codes = data["data"]["codes"]
|
for code in codes:
|
l2_trade_util.remove_from_forbidden_trade_codes(code)
|
return_str = json.dumps({"code": 0})
|
self.__notify_trade("black_list")
|
elif type == 204:
|
# 移除白名单
|
data = json.loads(_str)
|
codes = data["data"]["codes"]
|
for code in codes:
|
gpcode_manager.WhiteListCodeManager().remove_code(code)
|
return_str = json.dumps({"code": 0})
|
self.__notify_trade("white_list")
|
elif type == 301:
|
# 黑名单列表
|
codes = gpcode_manager.BlackListCodeManager().list_codes()
|
datas = []
|
for code in codes:
|
name = gpcode_manager.get_code_name(code)
|
datas.append(f"{name}:{code}")
|
return_str = json.dumps({"code": 0, "data": datas})
|
elif type == 302:
|
# 黑名单列表
|
codes = gpcode_manager.WhiteListCodeManager().list_codes()
|
datas = []
|
for code in codes:
|
name = gpcode_manager.get_code_name(code)
|
datas.append(f"{name}:{code}")
|
return_str = json.dumps({"code": 0, "data": datas})
|
|
elif type == 401:
|
# 加入想要买
|
data = json.loads(_str)
|
codes = data["data"]["codes"]
|
for code in codes:
|
gpcode_manager.WantBuyCodesManager().add_code(code)
|
name = gpcode_manager.get_code_name(code)
|
if not name:
|
results = HistoryKDatasUtils.get_gp_codes_names([code])
|
if results:
|
gpcode_manager.CodesNameManager.add_first_code_name(code, results[code])
|
if "plates" in data["data"]:
|
for i in range(len(data["data"]["plates"])):
|
self.__KPLCodeLimitUpReasonManager.save_reason(codes[i], data["data"]["plates"][i])
|
|
return_str = json.dumps({"code": 0})
|
self.__notify_trade("want_list")
|
elif type == 402:
|
data = json.loads(_str)
|
codes = data["data"]["codes"]
|
for code in codes:
|
gpcode_manager.WantBuyCodesManager().remove_code(code)
|
return_str = json.dumps({"code": 0})
|
self.__notify_trade("want_list")
|
elif type == 403:
|
plate = None
|
include_codes = set()
|
if _str:
|
data = json.loads(_str)
|
plate = data.get("plate")
|
if plate:
|
code_map = self.__KPLCodeLimitUpReasonManager.list_all()
|
for k in code_map:
|
if code_map[k] == plate:
|
include_codes.add(k)
|
|
codes = gpcode_manager.WantBuyCodesManager().list_code_cache()
|
datas = []
|
for code in codes:
|
if plate and plate != '其他' and code not in include_codes:
|
continue
|
name = gpcode_manager.get_code_name(code)
|
datas.append(f"{name}:{code}")
|
|
return_str = json.dumps({"code": 0, "data": datas})
|
elif type == 411:
|
data = json.loads(_str)
|
codes = data["data"]["codes"]
|
for code in codes:
|
gpcode_manager.PauseBuyCodesManager().add_code(code)
|
name = gpcode_manager.get_code_name(code)
|
if not name:
|
results = HistoryKDatasUtils.get_gp_codes_names([code])
|
if results:
|
gpcode_manager.CodesNameManager.add_first_code_name(code, results[code])
|
return_str = json.dumps({"code": 0})
|
self.__notify_trade("pause_buy_list")
|
# 加入暂停买入列表
|
elif type == 412:
|
# 移除暂停买入列表
|
data = json.loads(_str)
|
codes = data["data"]["codes"]
|
for code in codes:
|
gpcode_manager.PauseBuyCodesManager().remove_code(code)
|
return_str = json.dumps({"code": 0})
|
self.__notify_trade("pause_buy_list")
|
|
elif type == 413:
|
# 暂停买入列表
|
codes = gpcode_manager.PauseBuyCodesManager().list_code()
|
datas = []
|
for code in codes:
|
name = gpcode_manager.get_code_name(code)
|
datas.append(f"{name}:{code}")
|
return_str = json.dumps({"code": 0, "data": datas})
|
|
elif type == 420:
|
# 是否可以撤单
|
data = json.loads(_str)
|
codes = data["data"]["codes"]
|
code = codes[0]
|
state = trade_manager.CodesTradeStateManager().get_trade_state(code)
|
if state != trade_constant.TRADE_STATE_BUY_CANCEL_SUCCESS and state != trade_constant.TRADE_STATE_BUY_SUCCESS:
|
return_str = json.dumps({"code": 0, "msg": "可以取消"})
|
else:
|
return_str = json.dumps({"code": 1, "msg": "不可以取消"})
|
|
elif type == 430:
|
# 查询代码属性
|
data = json.loads(_str)
|
code = data["data"]["code"]
|
# 查询是否想买单/白名单/黑名单/暂不买
|
code_name = gpcode_manager.get_code_name(code)
|
want = gpcode_manager.WantBuyCodesManager().is_in_cache(code)
|
white = gpcode_manager.WhiteListCodeManager().is_in_cache(code)
|
black = l2_trade_util.is_in_forbidden_trade_codes(code)
|
pause_buy = gpcode_manager.PauseBuyCodesManager().is_in_cache(code)
|
|
desc_list = []
|
if want:
|
desc_list.append("【想买单】")
|
if white:
|
desc_list.append("【白名单】")
|
if black:
|
desc_list.append("【黑名单】")
|
if pause_buy:
|
desc_list.append("【暂不买】")
|
return_str = json.dumps(
|
{"code": 0, "data": {"code_info": (code, code_name), "desc": "".join(desc_list)}})
|
|
|
elif type == 501:
|
data = json.loads(_str)
|
is_open = data["data"]["open"]
|
if is_open:
|
trade_manager.TradeStateManager().open_buy()
|
else:
|
trade_manager.TradeStateManager().close_buy()
|
return_str = json.dumps({"code": 0, "msg": ("开启成功" if is_open else "关闭成功")})
|
self.__notify_trade("trade_state")
|
elif type == 502:
|
can_buy = trade_manager.TradeStateManager().is_can_buy_cache()
|
return_str = json.dumps({"code": 0, "data": {"can_buy": can_buy}})
|
elif type == 503:
|
# 设置交易目标代码的模式
|
data = json.loads(_str)
|
mode = data["data"]["mode"]
|
try:
|
TradeTargetCodeModeManager().set_mode(mode)
|
return_str = json.dumps({"code": 0, "data": {"mode": mode}})
|
except Exception as e:
|
return_str = json.dumps({"code": 1, "msg": str(e)})
|
self.__notify_trade("trade_mode")
|
elif type == 504:
|
# 获取交易目标代码模式
|
mode = TradeTargetCodeModeManager().get_mode_cache()
|
return_str = json.dumps({"code": 0, "data": {"mode": mode}})
|
elif type == 601:
|
pass
|
# 加自选
|
elif type == 602:
|
pass
|
# 移除自选
|
|
sk.send(return_str.encode())
|
|
# print("----------handler end ----------")
|
|
def finish(self):
|
super().finish() # 可以不调用父类的finish(),方法,父类的finish方法什么都没做
|
# print("--------finish方法被执行---")
|
|
|
if __name__ == "__main__1":
|
|
# 交易成功无法读取时备用
|
while True:
|
try:
|
datas = trade_juejin.get_execution_reports()
|
# 上传数据
|
fdatas = []
|
for d in datas:
|
fdatas.append(
|
{"code": d[0], "money": d[4], "num": d[2], "price": d[3], "time": d[7], "trade_num": d[5],
|
"type": d[1] - 1})
|
print(fdatas)
|
if fdatas:
|
try:
|
trade_manager.process_trade_success_data(fdatas)
|
except Exception as e:
|
logging.exception(e)
|
trade_manager.save_trade_success_data(fdatas)
|
except:
|
pass
|
time.sleep(1.5)
|