| | |
| | | import decimal |
| | | import json |
| | | import logging |
| | | import random |
| | | import socketserver |
| | | import socket |
| | | import threading |
| | | import time |
| | | |
| | | from cancel_strategy.s_l_h_cancel_strategy import LCancelBigNumComputer |
| | | from utils import alert_util, data_process, global_util, ths_industry_util, tool, import_util, socket_util |
| | | from utils import data_process, global_util, ths_industry_util, tool, import_util, socket_util |
| | | from code_attribute import code_volumn_manager, global_data_loader, gpcode_manager, first_target_code_data_processor |
| | | import constant |
| | | from user import authority |
| | | from l2 import l2_data_manager_new, l2_data_manager, l2_data_log, l2_log, code_price_manager |
| | | import l2_data_util |
| | | from l2 import l2_data_manager_new, l2_data_manager, code_price_manager |
| | | import l2.l2_data_util |
| | | |
| | | from third_data import block_info |
| | | from third_data.code_plate_key_manager import CodesHisReasonAndBlocksManager |
| | | from third_data.history_k_data_util import HistoryKDatasUtils |
| | | from third_data.kpl_data_manager import KPLCodeLimitUpReasonManager |
| | | from ths import l2_listen_pos_health_manager, l2_code_operate, client_manager |
| | | from trade import trade_data_manager, trade_manager, l2_trade_util, \ |
| | | current_price_process_manager, trade_juejin, trade_constant |
| | | from code_attribute.code_data_util import ZYLTGBUtil |
| | | import l2.transaction_progress |
| | | |
| | | from log_module.log import logger_l2_error, logger_device, logger_trade_delegate, logger_buy_1_volumn_record, \ |
| | | logger_l2_trade_queue, logger_l2_latest_data, logger_l2_trade_buy_queue, logger_debug |
| | | from log_module.log import logger_trade_delegate, logger_buy_1_volumn_record, \ |
| | | logger_l2_trade_queue, logger_l2_trade_buy_queue, logger_debug |
| | | from trade.huaxin import huaxin_trade_record_manager |
| | | from trade.trade_manager import TradeTargetCodeModeManager |
| | | from trade.trade_queue_manager import THSBuy1VolumnManager, thsl2tradequeuemanager |
| | |
| | | super().setup() # 可以不调用父类的setup()方法,父类的setup方法什么都没做 |
| | | # print("----setup方法被执行-----") |
| | | # print("打印传入的参数:", self.server.pipe_trade) |
| | | self.l2CodeOperate = l2_code_operate.L2CodeOperate.get_instance() |
| | | |
| | | def __notify_trade(self, type_): |
| | | if self.server.pipe_trade: |
| | |
| | | else: |
| | | print(_str) |
| | | return_str = "OK" |
| | | if type == 0: |
| | | try: |
| | | |
| | | origin_start_time = round(time.time() * 1000) |
| | | __start_time = round(time.time() * 1000) |
| | | |
| | | # level2盘口数据 |
| | | day, client, channel, code, capture_time, process_time, origin_datas, origin_datas_count = l2.l2_data_util.parseL2Data( |
| | | _str) |
| | | last_health_time = self.last_l2_listen_health_time.get((client, channel)) |
| | | # --------------------------------设置L2健康状态-------------------------------- |
| | | if last_health_time is None or __start_time - last_health_time > 1000: |
| | | self.last_l2_listen_health_time[(client, channel)] = __start_time |
| | | # 更新监听位健康状态 |
| | | if origin_datas_count == 0: |
| | | l2_listen_pos_health_manager.set_unhealthy(client, channel) |
| | | else: |
| | | l2_listen_pos_health_manager.set_healthy(client, channel) |
| | | |
| | | l2_log.threadIds[code] = random.randint(0, 100000) |
| | | if True: |
| | | # 间隔1s保存一条l2的最后一条数据 |
| | | if code not in self.l2_save_time_dict or origin_start_time - self.l2_save_time_dict[ |
| | | code] >= 1000 and len(origin_datas) > 0: |
| | | self.l2_save_time_dict[code] = origin_start_time |
| | | logger_l2_latest_data.info("{}#{}#{}", code, capture_time, origin_datas[-1]) |
| | | |
| | | # 10ms的网络传输延时 |
| | | capture_timestamp = __start_time - process_time - 10 |
| | | # print("截图时间:", process_time) |
| | | __start_time = l2_data_log.l2_time(code, round(time.time() * 1000) - __start_time, |
| | | "截图时间:{} 数据解析时间".format(process_time)) |
| | | |
| | | cid, pid = gpcode_manager.get_listen_code_pos(code) |
| | | |
| | | __start_time = l2_data_log.l2_time(code, round(time.time() * 1000) - __start_time, |
| | | "l2获取代码位置耗时") |
| | | # 判断目标代码位置是否与上传数据位置一致 |
| | | if cid is not None and pid is not None and client == int(cid) and channel == int(pid): |
| | | # l2.l2_data_util.set_l2_data_latest_count(code, len(origin_datas)) |
| | | l2_data_util.save_l2_latest_data_number(code, origin_datas_count) |
| | | # 保存l2数据条数 |
| | | if not origin_datas: |
| | | # or not l2.l2_data_util.is_origin_data_diffrent(origin_datas,self.latest_oringin_data.get(code)): |
| | | raise Exception("无新增数据") |
| | | # 保存最近的数据 |
| | | self.latest_oringin_data[code] = origin_datas |
| | | limit_up_price = gpcode_manager.get_limit_up_price(code) |
| | | datas = l2.l2_data_util.L2DataUtil.format_l2_data(origin_datas, code, limit_up_price) |
| | | try: |
| | | # 校验客户端代码 |
| | | l2_code_operate.verify_with_l2_data_pos_info(code, client, channel) |
| | | __start_time = round(time.time() * 1000) |
| | | if gpcode_manager.is_listen(code): |
| | | __start_time = l2_data_log.l2_time(code, |
| | | round(time.time() * 1000) - __start_time, |
| | | "l2外部数据预处理耗时") |
| | | l2_data_manager_new.L2TradeDataProcessor.process(code, datas, capture_timestamp) |
| | | __start_time = l2_data_log.l2_time(code, |
| | | round(time.time() * 1000) - __start_time, |
| | | "l2数据有效处理外部耗时", |
| | | False) |
| | | # 保存原始数据数量 |
| | | # l2_data_util.save_l2_latest_data_number(code, len(origin_datas)) |
| | | # if round(time.time() * 1000) - __start_time > 20: |
| | | # l2_data_log.l2_time(code, round(time.time() * 1000) - __start_time, |
| | | # "异步保存原始数据条数耗时", |
| | | # False) |
| | | |
| | | except l2_data_manager.L2DataException as l: |
| | | # 单价不符 |
| | | if l.get_code() == l2_data_manager.L2DataException.CODE_PRICE_ERROR: |
| | | key = "{}-{}-{}".format(client, channel, code) |
| | | if key not in self.l2_data_error_dict or round( |
| | | time.time() * 1000) - self.l2_data_error_dict[key] > 10000: |
| | | # self.l2CodeOperate.repaire_l2_data(code) |
| | | logger_l2_error.warning("code-{} l2单价错误:{}", code, l.msg) |
| | | # 单价不一致时需要移除代码重新添加 |
| | | l2_code_operate.L2CodeOperate().remove_l2_listen(code, "l2监听单价错误") |
| | | self.l2_data_error_dict[key] = round(time.time() * 1000) |
| | | |
| | | except Exception as e: |
| | | print("异常", str(e), code) |
| | | logging.exception(e) |
| | | logger_l2_error.error("出错:{}".format(str(e))) |
| | | logger_l2_error.error("内容:{}".format(_str)) |
| | | finally: |
| | | |
| | | __end_time = round(time.time() * 1000) |
| | | # 只记录大于40ms的数据 |
| | | if __end_time - origin_start_time > 100: |
| | | l2_data_log.l2_time(code, round(time.time() * 1000) - origin_start_time, |
| | | "l2数据处理总耗时", |
| | | True) |
| | | except Exception as e: |
| | | if str(e).find("新增数据"): |
| | | pass |
| | | else: |
| | | logger_l2_error.exception(e) |
| | | |
| | | elif type == 2: |
| | | if type == 2: |
| | | # 涨停代码 |
| | | dataList, is_add = data_process.parseGPCode(_str) |
| | | # 设置涨停时间 |
| | |
| | | limit_up_price = gpcode_manager.get_limit_up_price(code) |
| | | |
| | | if limit_up_price is not None: |
| | | code_price_manager.Buy1PriceManager().process(code, buy_one_price, buy_one_volumn, buy_time, limit_up_price, |
| | | code_price_manager.Buy1PriceManager().process(code, buy_one_price, buy_one_volumn, buy_time, |
| | | limit_up_price, |
| | | sell_one_price, sell_one_volumn) |
| | | _start_time = time.time() |
| | | msg += "买1价格处理:" + f"{_start_time - __start_time} " |
| | |
| | | buy_one_price_ = decimal.Decimal(round(float(buy_one_price), 2)).quantize( |
| | | decimal.Decimal("0.00")) |
| | | # 获取执行位时间 |
| | | order_begin_pos = l2_data_manager.TradePointManager().get_buy_compute_start_data_cache(code) |
| | | order_begin_pos = l2_data_manager.TradePointManager().get_buy_compute_start_data_cache( |
| | | code) |
| | | if True: |
| | | # 只有下单过后才获取交易进度 |
| | | exec_time = None |
| | | try: |
| | | if order_begin_pos.buy_exec_index and order_begin_pos.buy_exec_index > -1: |
| | | exec_time = \ |
| | | l2.l2_data_util.local_today_datas.get(code)[order_begin_pos.buy_exec_index]["val"]["time"] |
| | | l2.l2_data_util.local_today_datas.get(code)[ |
| | | order_begin_pos.buy_exec_index]["val"]["time"] |
| | | except: |
| | | pass |
| | | buy_progress_index = self.tradeBuyQueue.compute_traded_index(code, |
| | |
| | | buy_queue_result_list, |
| | | exec_time) |
| | | if buy_progress_index is not None: |
| | | LCancelBigNumComputer().set_trade_progress(code, order_begin_pos.buy_single_index, buy_progress_index, |
| | | LCancelBigNumComputer().set_trade_progress(code, |
| | | order_begin_pos.buy_single_index, |
| | | buy_progress_index, |
| | | l2.l2_data_util.local_today_datas.get( |
| | | code)) |
| | | |
| | |
| | | space = time.time() - __start_time |
| | | if space > 0.1: |
| | | logger_debug.info("{}成交队列处理时间:{},{}", code, space, msg) |
| | | |
| | | elif type == 20: |
| | | # 登录 |
| | | data = data_process.parse(_str)["data"] |
| | | try: |
| | | client_id, _authoritys = authority.login(data["account"], data["pwd"]) |
| | | return_str = data_process.toJson( |
| | | {"code": 0, "data": {"client": int(client_id), "authoritys": json.loads(_authoritys)}}) |
| | | except Exception as e: |
| | | return_str = data_process.toJson({"code": 1, "msg": str(e)}) |
| | | # 现价更新 |
| | | elif type == 40: |
| | | datas = data_process.parse(_str)["data"] |
| | |
| | | for item in datas: |
| | | volumn = item["volume"] |
| | | volumnUnit = item["volumeUnit"] |
| | | code_volumn_manager.save_today_volumn(item["code"], volumn, volumnUnit) |
| | | code_volumn_manager.CodeVolumeManager().save_today_volumn(item["code"], volumn, volumnUnit) |
| | | current_price_process_manager.accept_prices(datas) |
| | | # L2现价更新 |
| | | elif type == 41: |
| | |
| | | # if need_sync: |
| | | # # 同步数据 |
| | | # L2LimitUpMoneyStatisticUtil.verify_num(code, volumn, time_) |
| | | elif type == 30: |
| | | # 心跳信息 |
| | | data = data_process.parse(_str)["data"] |
| | | client_id = data["client"] |
| | | thsDead = data.get("thsDead") |
| | | logger_device.info("({})客户端信息:{}".format(client_id, json.dumps(data))) |
| | | client_manager.saveClientActive(int(client_id), host, thsDead) |
| | | if constant.is_windows(): |
| | | # 动态导入 |
| | | |
| | | if ths_util.is_ths_dead(client_id): |
| | | # TODO 重启同花顺 |
| | | # 报警 |
| | | l2_clients = authority.get_l2_clients() |
| | | if client_id in l2_clients: |
| | | alert_util.alarm() |
| | | elif type == 60: |
| | | # L2自启动成功 |
| | | data = data_process.parse(_str)["data"] |
| | | client_id = data["client"] |
| | | print("L2自启动成功", client_id) |
| | | now_str = tool.get_now_time_str() |
| | | ts = tool.get_time_as_second(now_str) |
| | | # 9点25到9点28之间的自启动就需要批量设置代码,目前永远不执行 |
| | | if tool.get_time_as_second("09:24:50") <= ts <= tool.get_time_as_second("09:28:00") and False: |
| | | # 准备批量设置代码 |
| | | return_json = {"code": 1, "msg": "等待批量设置代码"} |
| | | return_str = json.dumps(return_json) |
| | | # 获取排名前16位的代码 |
| | | codes = trade_data_manager.CodeActualPriceProcessor().get_top_rate_codes(16) |
| | | codes = sorted(codes) |
| | | if client_id == 2: |
| | | codes = codes[:constant.L2_CODE_COUNT_PER_DEVICE] |
| | | else: |
| | | codes = codes[constant.L2_CODE_COUNT_PER_DEVICE:] |
| | | codes_datas = [] |
| | | for i in range(0, constant.L2_CODE_COUNT_PER_DEVICE): |
| | | if i >= len(codes): |
| | | break |
| | | codes_datas.append((i, codes[i])) |
| | | # 如果设置失败需要重试2次 |
| | | for i in range(0, 3): |
| | | set_success = l2_code_operate.betch_set_client_codes(client_id, codes_datas) |
| | | if set_success: |
| | | break |
| | | else: |
| | | time.sleep(3) |
| | | else: |
| | | return_json = {"code": 0, "msg": "开启在线状态"} |
| | | return_str = json.dumps(return_json) |
| | | elif type == 70: |
| | | # 选股宝热门概念 |
| | | data_json = data_process.parse(_str) |
| | |
| | | state = trade_manager.CodesTradeStateManager().get_trade_state(code) |
| | | if state == trade_constant.TRADE_STATE_BUY_PLACE_ORDER or state == trade_constant.TRADE_STATE_BUY_DELEGATED or state == trade_constant.TRADE_STATE_BUY_CANCEL_ING: |
| | | try: |
| | | l2_data_manager_new.L2TradeDataProcessor.cancel_buy(code, "手动撤销", cancel_type=trade_constant.CANCEL_TYPE_HUMAN) |
| | | l2_data_manager_new.L2TradeDataProcessor.cancel_buy(code, "手动撤销", |
| | | cancel_type=trade_constant.CANCEL_TYPE_HUMAN) |
| | | return_str = json.dumps({"code": 0}) |
| | | except Exception as e: |
| | | return_str = json.dumps({"code": 2, "msg": str(e)}) |
| | |
| | | data = json.loads(_str) |
| | | codes = data["data"]["codes"] |
| | | for code in codes: |
| | | l2_trade_util.forbidden_trade(code, msg="手动加入") |
| | | l2_trade_util.forbidden_trade(code, msg="手动加入", force=True) |
| | | name = gpcode_manager.get_code_name(code) |
| | | if not name: |
| | | results = HistoryKDatasUtils.get_gp_codes_names([code]) |
| | |
| | | # print("--------finish方法被执行---") |
| | | |
| | | |
| | | def send_msg(client_id, data): |
| | | _ip = client_manager.getActiveClientIP(client_id) |
| | | # print("ip", client_id, _ip) |
| | | if _ip is None or len(_ip) <= 0: |
| | | raise Exception("客户端IP为空") |
| | | socketClient = socket.socket(socket.AF_INET, socket.SOCK_STREAM) |
| | | socketClient.connect((_ip, 9006)) |
| | | # 连接socket |
| | | try: |
| | | socketClient.send(json.dumps(data).encode()) |
| | | recv = socketClient.recv(1024) |
| | | result = str(recv, encoding="gbk") |
| | | return result |
| | | finally: |
| | | socketClient.close() |
| | | |
| | | |
| | | # 客户端心跳机制 |
| | | def test_client_server(): |
| | | while True: |
| | | clients = authority.get_l2_clients() |
| | | for client in clients: |
| | | # print("心跳", client) |
| | | try: |
| | | send_msg(client, {"action": "test"}) |
| | | except: |
| | | pass |
| | | # 矫正客户端代码 |
| | | l2_code_operate.correct_client_codes() |
| | | time.sleep(5) |
| | | |
| | | |
| | | # 获取采集客户端的状态 |
| | | def get_client_env_state(client): |
| | | result = send_msg(client, {"action": "getEnvState"}) |
| | | result = json.loads(result) |
| | | if result["code"] == 0: |
| | | return json.loads(result["data"]) |
| | | else: |
| | | raise Exception(result["msg"]) |
| | | |
| | | |
| | | # 修复采集客户端 |
| | | def repair_client_env(client): |
| | | result = send_msg(client, {"action": "repairEnv"}) |
| | | result = json.loads(result) |
| | | if result["code"] != 0: |
| | | raise Exception(result["msg"]) |
| | | |
| | | |
| | | # 同步目标标的到同花顺 |
| | | def sync_target_codes_to_ths(): |
| | | codes = gpcode_manager.get_second_gp_list() |
| | | code_list = [] |
| | | for code in codes: |
| | | code_list.append(code) |
| | | client = authority._get_client_ids_by_rule("data-maintain") |
| | | result = send_msg(client[0], {"action": "syncTargetCodes", "data": code_list}) |
| | | return result |
| | | |
| | | |
| | | # 修复同花顺主站 |
| | | def repair_ths_main_site(client): |
| | | result = send_msg(client, {"action": "updateTHSSite"}) |
| | | result = json.loads(result) |
| | | if result["code"] != 0: |
| | | raise Exception(result["msg"]) |
| | | else: |
| | | # 测速成功 |
| | | client_infos = [] |
| | | for index in range(0, constant.L2_CODE_COUNT_PER_DEVICE): |
| | | client_infos.append((client, index)) |
| | | l2_listen_pos_health_manager.init_all(client_infos) |
| | | |
| | | |
| | | if __name__ == "__main__1": |
| | | |
| | | # 交易成功无法读取时备用 |
| | |
| | | trade_manager.save_trade_success_data(fdatas) |
| | | except: |
| | | pass |
| | | time.sleep(1.5) |
| | | time.sleep(1.5) |