| | |
| | | """ |
| | | 接受客户端数据的服务器 |
| | | """ |
| | | import datetime |
| | | import decimal |
| | | import json |
| | | import logging |
| | | import random |
| | | import socketserver |
| | | import socket |
| | | import threading |
| | | import time |
| | | |
| | | import cv2 |
| | | |
| | | import alert_util |
| | | import client_manager |
| | |
| | | import l2_data_manager |
| | | import l2_data_manager_new |
| | | import l2_data_util |
| | | import limit_up_time_manager |
| | | from l2.cancel_buy_strategy import HourCancelBigNumComputer |
| | | |
| | | from ocr import ocr_util |
| | | import ths_industry_util |
| | | import ths_util |
| | | import tool |
| | |
| | | import trade_manager |
| | | import l2_code_operate |
| | | from code_data_util import ZYLTGBUtil |
| | | import l2.transaction_progress |
| | | |
| | | from log import logger_l2_error, logger_device, logger_trade_delegate, logger_buy_1_volumn_record, \ |
| | | logger_l2_trade_queue, logger_l2_latest_data |
| | | logger_l2_trade_queue, logger_l2_latest_data, logger_l2_trade_buy_queue |
| | | from trade_queue_manager import THSBuy1VolumnManager, Buy1PriceManager, thsl2tradequeuemanager |
| | | |
| | | |
| | |
| | | buy1_price_manager = Buy1PriceManager() |
| | | l2_trade_queue_time_dict = {} |
| | | l2_save_time_dict = {} |
| | | l2_trade_buy_queue_dict = {} |
| | | tradeBuyQueue = l2.transaction_progress.TradeBuyQueue() |
| | | |
| | | def setup(self): |
| | | super().setup() # 可以不调用父类的setup()方法,父类的setup方法什么都没做 |
| | |
| | | # print("- " * 30) |
| | | sk: socket.socket = self.request |
| | | while True: |
| | | data = sk.recv(1024000) |
| | | data = sk.recv(1024 * 1024 * 20) |
| | | if len(data) == 0: |
| | | # print("客户端断开连接") |
| | | break |
| | |
| | | try: |
| | | origin_start_time = round(time.time() * 1000) |
| | | __start_time = round(time.time() * 1000) |
| | | do_id = random.randint(0, 100000) |
| | | |
| | | # level2盘口数据 |
| | | day, client, channel, code, capture_time, process_time, datas, origin_datas = l2_data_manager.parseL2Data( |
| | |
| | | # 10ms的网络传输延时 |
| | | capture_timestamp = __start_time - process_time - 10 |
| | | # print("截图时间:", process_time) |
| | | __start_time = l2_data_log.l2_time(code, round(time.time() * 1000) - __start_time, |
| | | __start_time = l2_data_log.l2_time(code, do_id, round(time.time() * 1000) - __start_time, |
| | | "截图时间:{} 数据解析时间".format(process_time)) |
| | | |
| | | cid, pid = gpcode_manager.get_listen_code_pos(code) |
| | | |
| | | __start_time = l2_data_log.l2_time(code, round(time.time() * 1000) - __start_time, |
| | | __start_time = l2_data_log.l2_time(code, do_id, round(time.time() * 1000) - __start_time, |
| | | "l2获取代码位置耗时") |
| | | # 判断目标代码位置是否与上传数据位置一致 |
| | | if cid is not None and pid is not None and client == int(cid) and channel == int(pid): |
| | |
| | | l2_code_operate.verify_with_l2_data_pos_info(code, client, channel) |
| | | __start_time = round(time.time() * 1000) |
| | | if gpcode_manager.is_listen(code): |
| | | __start_time = l2_data_log.l2_time(code, round(time.time() * 1000) - __start_time, |
| | | __start_time = l2_data_log.l2_time(code, do_id, |
| | | round(time.time() * 1000) - __start_time, |
| | | "l2外部数据预处理耗时") |
| | | l2_data_manager_new.L2TradeDataProcessor.process(code, datas, capture_timestamp) |
| | | __start_time = l2_data_log.l2_time(code, round(time.time() * 1000) - __start_time, |
| | | l2_data_manager_new.L2TradeDataProcessor.process(code, datas, capture_timestamp, |
| | | do_id) |
| | | __start_time = l2_data_log.l2_time(code, do_id, |
| | | round(time.time() * 1000) - __start_time, |
| | | "l2数据有效处理外部耗时", |
| | | False) |
| | | # 保存原始数据数量 |
| | | l2_data_util.save_l2_latest_data_number(code, len(origin_datas)) |
| | | if round(time.time() * 1000) - __start_time > 20: |
| | | l2_data_log.l2_time(code, round(time.time() * 1000) - __start_time, |
| | | l2_data_log.l2_time(code, do_id, round(time.time() * 1000) - __start_time, |
| | | "异步保存原始数据条数耗时", |
| | | False) |
| | | |
| | |
| | | __end_time = round(time.time() * 1000) |
| | | # 只记录大于40ms的数据 |
| | | if __end_time - origin_start_time > 100: |
| | | l2_data_log.l2_time(code, round(time.time() * 1000) - origin_start_time, |
| | | l2_data_log.l2_time(code, do_id, round(time.time() * 1000) - origin_start_time, |
| | | "l2数据处理总耗时", |
| | | True) |
| | | except Exception as e: |
| | |
| | | apply_time = tool.trade_time_add_second(apply_time, 1) |
| | | |
| | | print(apply_time) |
| | | l2_data_manager_new.SecondAverageBigNumComputer.set_apply_time(code, apply_time) |
| | | except Exception as e: |
| | | logging.exception(e) |
| | | |
| | |
| | | buy_time = data["buyTime"] |
| | | buy_one_price = data["buyOnePrice"] |
| | | buy_one_volumn = data["buyOneVolumn"] |
| | | buy_queue = data["buyQueue"] |
| | | buy_queue_result_list = self.tradeBuyQueue.save(code, gpcode_manager.get_limit_up_price(code), |
| | | buy_queue) |
| | | if buy_queue_result_list: |
| | | # 有数据 |
| | | try: |
| | | buy_one_price_ = decimal.Decimal(round(float(buy_one_price), 2)).quantize(decimal.Decimal("0.00")) |
| | | buy_progress_index = self.tradeBuyQueue.save_traded_index(code,buy_one_price_, |
| | | buy_queue_result_list) |
| | | if buy_progress_index is not None: |
| | | HourCancelBigNumComputer.set_trade_progress(code,buy_progress_index) |
| | | logger_l2_trade_buy_queue.info("获取成交位置成功: code-{} index-{} 数据-{}", code, |
| | | buy_progress_index, |
| | | json.loads(buy_queue_result_list)) |
| | | except Exception as e: |
| | | logger_l2_trade_buy_queue.warning("获取成交位置失败: code-{} 原因-{} 数据-{}", code, str(e), |
| | | json.loads(buy_queue_result_list)) |
| | | |
| | | # buy_queue是否有变化 |
| | | if self.l2_trade_buy_queue_dict.get(code) is None or buy_queue != self.l2_trade_buy_queue_dict.get( |
| | | code): |
| | | self.l2_trade_buy_queue_dict[code] = buy_queue |
| | | logger_l2_trade_buy_queue.info("{}-{}", code, buy_queue) |
| | | # 保存最近的记录 |
| | | if self.ths_l2_trade_queue_manager.save_recod(code, data): |
| | | if buy_time != "00:00:00": |
| | |
| | | if client_id in l2_clients: |
| | | alert_util.alarm() |
| | | elif type == 60: |
| | | # 心跳信息 |
| | | # L2自启动成功 |
| | | data = data_process.parse(_str)["data"] |
| | | client_id = data["client"] |
| | | print("L2自启动成功", client_id) |
| | | now_str = tool.get_now_time_str() |
| | | ts = tool.get_time_as_second(now_str) |
| | | # 9点25到9点28之间的自启动就需要批量设置代码 |
| | | if tool.get_time_as_second("09:24:50") <= ts <= tool.get_time_as_second("09:28:00"): |
| | | # 9点25到9点28之间的自启动就需要批量设置代码,目前永远不执行 |
| | | if tool.get_time_as_second("09:24:50") <= ts <= tool.get_time_as_second("09:28:00") and False: |
| | | # 准备批量设置代码 |
| | | return_json = {"code": 1, "msg": "等待批量设置代码"} |
| | | return_str = json.dumps(return_json) |
| | |
| | | break |
| | | else: |
| | | time.sleep(3) |
| | | |
| | | |
| | | else: |
| | | return_json = {"code": 0, "msg": "开启在线状态"} |
| | | return_str = json.dumps(return_json) |
| | | |
| | | # print("心跳:", client_id) |
| | | elif type == 100: |
| | | # 图像识别 |
| | | return_str = data_process.toJson({"code": 0, "data": {"datas": []}}) |
| | | pass |
| | | sk.send(return_str.encode()) |
| | | |
| | | # print("----------handler end ----------") |
| | |
| | | |
| | | if __name__ == "__main__": |
| | | try: |
| | | thsl2tradequeuemanager().test() |
| | | a=round(float("0002.90"),2) |
| | | print(decimal.Decimal(a).quantize(decimal.Decimal("0.00"))) |
| | | # repair_ths_main_site(2) |
| | | except Exception as e: |
| | | print(str(e)) |