| | |
| | | import gpcode_manager |
| | | import authority |
| | | import juejin |
| | | import l2_data_log |
| | | from l2 import l2_data_manager_new, l2_data_manager |
| | | from l2 import l2_data_manager_new, l2_data_manager, l2_data_log |
| | | import l2_data_util |
| | | from l2.cancel_buy_strategy import HourCancelBigNumComputer, L2LimitUpMoneyStatisticUtil |
| | | import l2.l2_data_util |
| | | |
| | | import ths_industry_util |
| | | import ths_util |
| | |
| | | l2_save_time_dict = {} |
| | | l2_trade_buy_queue_dict = {} |
| | | tradeBuyQueue = l2.transaction_progress.TradeBuyQueue() |
| | | last_time = {} |
| | | |
| | | def setup(self): |
| | | super().setup() # 可以不调用父类的setup()方法,父类的setup方法什么都没做 |
| | |
| | | # print("- " * 30) |
| | | sk: socket.socket = self.request |
| | | while True: |
| | | data = sk.recv(1024 * 1024 * 20) |
| | | data = sk.recv(1024 * 100) |
| | | if len(data) == 0: |
| | | # print("客户端断开连接") |
| | | break |
| | | _str = str(data, encoding="gbk") |
| | | if len(_str) > 0: |
| | | # print("结果:",_str) |
| | | type = data_process.parseType(_str) |
| | | type = -1 |
| | | try: |
| | | type = data_process.parseType(_str) |
| | | except: |
| | | print(_str) |
| | | return_str = "OK" |
| | | if type == 0: |
| | | |
| | | try: |
| | | origin_start_time = round(time.time() * 1000) |
| | | __start_time = round(time.time() * 1000) |
| | | do_id = random.randint(0, 100000) |
| | | |
| | | # level2盘口数据 |
| | | day, client, channel, code, capture_time, process_time, datas, origin_datas = l2_data_manager.parseL2Data( |
| | | day, client, channel, code, capture_time, process_time, datas, origin_datas = l2.l2_data_util.parseL2Data( |
| | | _str) |
| | | # 间隔1s保存一条l2的最后一条数据 |
| | | if code not in self.l2_save_time_dict or origin_start_time - self.l2_save_time_dict[ |
| | | code] >= 1000 and len(datas) > 0: |
| | | self.l2_save_time_dict[code] = origin_start_time |
| | | logger_l2_latest_data.info("{}#{}#{}", code, capture_time, datas[-1]) |
| | | if channel == 0: |
| | | now_time = round(time.time() * 1000) |
| | | if self.last_time.get(channel) is not None: |
| | | #print("接受到L2的数据", channel, now_time - self.last_time.get(channel), "解析耗时",now_time - origin_start_time) |
| | | pass |
| | | |
| | | # 10ms的网络传输延时 |
| | | capture_timestamp = __start_time - process_time - 10 |
| | | # print("截图时间:", process_time) |
| | | __start_time = l2_data_log.l2_time(code, do_id, round(time.time() * 1000) - __start_time, |
| | | "截图时间:{} 数据解析时间".format(process_time)) |
| | | self.last_time[channel] = now_time |
| | | |
| | | cid, pid = gpcode_manager.get_listen_code_pos(code) |
| | | if True: |
| | | # 间隔1s保存一条l2的最后一条数据 |
| | | if code not in self.l2_save_time_dict or origin_start_time - self.l2_save_time_dict[ |
| | | code] >= 1000 and len(datas) > 0: |
| | | self.l2_save_time_dict[code] = origin_start_time |
| | | logger_l2_latest_data.info("{}#{}#{}", code, capture_time, datas[-1]) |
| | | |
| | | __start_time = l2_data_log.l2_time(code, do_id, round(time.time() * 1000) - __start_time, |
| | | "l2获取代码位置耗时") |
| | | # 判断目标代码位置是否与上传数据位置一致 |
| | | if cid is not None and pid is not None and client == int(cid) and channel == int(pid): |
| | | try: |
| | | # 校验客户端代码 |
| | | l2_code_operate.verify_with_l2_data_pos_info(code, client, channel) |
| | | __start_time = round(time.time() * 1000) |
| | | if gpcode_manager.is_listen(code): |
| | | __start_time = l2_data_log.l2_time(code, do_id, |
| | | round(time.time() * 1000) - __start_time, |
| | | "l2外部数据预处理耗时") |
| | | l2_data_manager_new.L2TradeDataProcessor.process(code, datas, capture_timestamp, |
| | | do_id) |
| | | __start_time = l2_data_log.l2_time(code, do_id, |
| | | round(time.time() * 1000) - __start_time, |
| | | "l2数据有效处理外部耗时", |
| | | False) |
| | | # 保存原始数据数量 |
| | | l2_data_util.save_l2_latest_data_number(code, len(origin_datas)) |
| | | if round(time.time() * 1000) - __start_time > 20: |
| | | l2_data_log.l2_time(code, do_id, round(time.time() * 1000) - __start_time, |
| | | "异步保存原始数据条数耗时", |
| | | False) |
| | | # 10ms的网络传输延时 |
| | | capture_timestamp = __start_time - process_time - 10 |
| | | # print("截图时间:", process_time) |
| | | __start_time = l2_data_log.l2_time(code, do_id, round(time.time() * 1000) - __start_time, |
| | | "截图时间:{} 数据解析时间".format(process_time)) |
| | | |
| | | except l2_data_manager.L2DataException as l: |
| | | # 单价不符 |
| | | if l.get_code() == l2_data_manager.L2DataException.CODE_PRICE_ERROR: |
| | | key = "{}-{}-{}".format(client, channel, code) |
| | | if key not in self.l2_data_error_dict or round( |
| | | time.time() * 1000) - self.l2_data_error_dict[key] > 10000: |
| | | # self.l2CodeOperate.repaire_l2_data(code) |
| | | # todo 太敏感移除代码 |
| | | logger_l2_error.warning("code-{} l2单价错误:{}", code, l.msg) |
| | | # 单价不一致时需要移除代码重新添加 |
| | | l2_code_operate.L2CodeOperate().remove_l2_listen(code, "l2监听单价错误") |
| | | self.l2_data_error_dict[key] = round(time.time() * 1000) |
| | | cid, pid = gpcode_manager.get_listen_code_pos(code) |
| | | |
| | | except Exception as e: |
| | | print("异常", str(e), code) |
| | | logging.exception(e) |
| | | logger_l2_error.error("出错:{}".format(str(e))) |
| | | logger_l2_error.error("内容:{}".format(_str)) |
| | | finally: |
| | | __start_time = l2_data_log.l2_time(code, do_id, round(time.time() * 1000) - __start_time, |
| | | "l2获取代码位置耗时") |
| | | # 判断目标代码位置是否与上传数据位置一致 |
| | | if cid is not None and pid is not None and client == int(cid) and channel == int(pid): |
| | | try: |
| | | # 校验客户端代码 |
| | | l2_code_operate.verify_with_l2_data_pos_info(code, client, channel) |
| | | __start_time = round(time.time() * 1000) |
| | | if gpcode_manager.is_listen(code): |
| | | __start_time = l2_data_log.l2_time(code, do_id, |
| | | round(time.time() * 1000) - __start_time, |
| | | "l2外部数据预处理耗时") |
| | | l2_data_manager_new.L2TradeDataProcessor.process(code, datas, capture_timestamp, |
| | | do_id) |
| | | __start_time = l2_data_log.l2_time(code, do_id, |
| | | round(time.time() * 1000) - __start_time, |
| | | "l2数据有效处理外部耗时", |
| | | False) |
| | | # 保存原始数据数量 |
| | | l2_data_util.save_l2_latest_data_number(code, len(origin_datas)) |
| | | if round(time.time() * 1000) - __start_time > 20: |
| | | l2_data_log.l2_time(code, do_id, round(time.time() * 1000) - __start_time, |
| | | "异步保存原始数据条数耗时", |
| | | False) |
| | | |
| | | __end_time = round(time.time() * 1000) |
| | | # 只记录大于40ms的数据 |
| | | if __end_time - origin_start_time > 100: |
| | | l2_data_log.l2_time(code, do_id, round(time.time() * 1000) - origin_start_time, |
| | | "l2数据处理总耗时", |
| | | True) |
| | | except l2_data_manager.L2DataException as l: |
| | | # 单价不符 |
| | | if l.get_code() == l2_data_manager.L2DataException.CODE_PRICE_ERROR: |
| | | key = "{}-{}-{}".format(client, channel, code) |
| | | if key not in self.l2_data_error_dict or round( |
| | | time.time() * 1000) - self.l2_data_error_dict[key] > 10000: |
| | | # self.l2CodeOperate.repaire_l2_data(code) |
| | | # todo 太敏感移除代码 |
| | | logger_l2_error.warning("code-{} l2单价错误:{}", code, l.msg) |
| | | # 单价不一致时需要移除代码重新添加 |
| | | l2_code_operate.L2CodeOperate().remove_l2_listen(code, "l2监听单价错误") |
| | | self.l2_data_error_dict[key] = round(time.time() * 1000) |
| | | |
| | | except Exception as e: |
| | | print("异常", str(e), code) |
| | | logging.exception(e) |
| | | logger_l2_error.error("出错:{}".format(str(e))) |
| | | logger_l2_error.error("内容:{}".format(_str)) |
| | | finally: |
| | | |
| | | __end_time = round(time.time() * 1000) |
| | | # 只记录大于40ms的数据 |
| | | if __end_time - origin_start_time > 100: |
| | | l2_data_log.l2_time(code, do_id, round(time.time() * 1000) - origin_start_time, |
| | | "l2数据处理总耗时", |
| | | True) |
| | | except Exception as e: |
| | | logger_l2_error.exception(e) |
| | | elif type == 1: |
| | |
| | | |
| | | elif type == 5: |
| | | logger_trade_delegate.debug("接收到委托信息") |
| | | # 交易委托信息 |
| | | dataList = data_process.parseList(_str) |
| | | if self.last_trade_delegate_data != _str: |
| | | self.last_trade_delegate_data = _str |
| | | # 保存委托信息 |
| | | logger_trade_delegate.info(dataList) |
| | | __start_time = round(time.time() * 1000) |
| | | try: |
| | | # 设置申报时间 |
| | | for item in dataList: |
| | | apply_time = item["apply_time"] |
| | | if apply_time and len(apply_time) >= 8: |
| | | code = item["code"] |
| | | trade_state = trade_manager.get_trade_state(code) |
| | | # 设置下单状态的代码为已委托 |
| | | if trade_state == trade_manager.TRADE_STATE_BUY_PLACE_ORDER: |
| | | origin_apply_time = apply_time |
| | | apply_time = apply_time[0:6] |
| | | apply_time = "{}:{}:{}".format(apply_time[0:2], apply_time[2:4], apply_time[4:6]) |
| | | ms = origin_apply_time[6:9] |
| | | if int(ms) > 500: |
| | | # 时间+1s |
| | | apply_time = tool.trade_time_add_second(apply_time, 1) |
| | | # 交易委托信息 |
| | | dataList = data_process.parseList(_str) |
| | | if self.last_trade_delegate_data != _str: |
| | | self.last_trade_delegate_data = _str |
| | | # 保存委托信息 |
| | | logger_trade_delegate.info(dataList) |
| | | try: |
| | | # 设置申报时间 |
| | | for item in dataList: |
| | | apply_time = item["apply_time"] |
| | | if apply_time and len(apply_time) >= 8: |
| | | code = item["code"] |
| | | trade_state = trade_manager.get_trade_state(code) |
| | | # 设置下单状态的代码为已委托 |
| | | if trade_state == trade_manager.TRADE_STATE_BUY_PLACE_ORDER: |
| | | origin_apply_time = apply_time |
| | | apply_time = apply_time[0:6] |
| | | apply_time = "{}:{}:{}".format(apply_time[0:2], apply_time[2:4], |
| | | apply_time[4:6]) |
| | | ms = origin_apply_time[6:9] |
| | | if int(ms) > 500: |
| | | # 时间+1s |
| | | apply_time = tool.trade_time_add_second(apply_time, 1) |
| | | |
| | | print(apply_time) |
| | | except Exception as e: |
| | | logging.exception(e) |
| | | print(apply_time) |
| | | except Exception as e: |
| | | logging.exception(e) |
| | | |
| | | try: |
| | | trade_manager.process_trade_delegate_data(dataList) |
| | | except Exception as e: |
| | | logging.exception(e) |
| | | trade_manager.save_trade_delegate_data(dataList) |
| | | # 刷新交易界面 |
| | | trade_gui.THSGuiTrade().refresh_data() |
| | | try: |
| | | trade_manager.process_trade_delegate_data(dataList) |
| | | except Exception as e: |
| | | logging.exception(e) |
| | | trade_manager.save_trade_delegate_data(dataList) |
| | | # 刷新交易界面 |
| | | trade_gui.THSGuiTrade().refresh_data() |
| | | finally: |
| | | pass |
| | | |
| | | elif type == 4: |
| | | # 行业代码信息 |
| | |
| | | buy_one_price = data["buyOnePrice"] |
| | | buy_one_volumn = data["buyOneVolumn"] |
| | | buy_queue = data["buyQueue"] |
| | | buy_queue_result_list = self.tradeBuyQueue.save(code, gpcode_manager.get_limit_up_price(code), |
| | | buy_one_price, buy_time, |
| | | buy_queue) |
| | | if buy_queue_result_list: |
| | | # 有数据 |
| | | try: |
| | | buy_one_price_ = decimal.Decimal(round(float(buy_one_price), 2)).quantize( |
| | | decimal.Decimal("0.00")) |
| | | buy_progress_index = self.tradeBuyQueue.save_traded_index(code, buy_one_price_, |
| | | buy_queue_result_list) |
| | | if buy_progress_index is not None: |
| | | HourCancelBigNumComputer.set_trade_progress(code, buy_progress_index, |
| | | l2_data_manager.local_today_datas.get(code), |
| | | l2_data_manager.local_today_num_operate_map.get( |
| | | code)) |
| | | logger_l2_trade_buy_queue.info("获取成交位置成功: code-{} index-{} 数据-{}", code, |
| | | buy_progress_index, |
| | | json.dumps(buy_queue_result_list)) |
| | | except Exception as e: |
| | | print("买入队列", code, buy_queue_result_list) |
| | | logger_l2_trade_buy_queue.warning("获取成交位置失败: code-{} 原因-{} 数据-{}", code, str(e), |
| | | json.dumps(buy_queue_result_list)) |
| | | if buy_one_price is None: |
| | | print('买1价没有,', code) |
| | | limit_up_price = gpcode_manager.get_limit_up_price(code) |
| | | if limit_up_price is not None: |
| | | buy_queue_result_list = self.tradeBuyQueue.save(code, limit_up_price, buy_one_price, buy_time, |
| | | buy_queue) |
| | | if buy_queue_result_list: |
| | | # 有数据 |
| | | try: |
| | | buy_one_price_ = decimal.Decimal(round(float(buy_one_price), 2)).quantize( |
| | | decimal.Decimal("0.00")) |
| | | buy_progress_index = self.tradeBuyQueue.save_traded_index(code, buy_one_price_, |
| | | buy_queue_result_list) |
| | | if buy_progress_index is not None: |
| | | HourCancelBigNumComputer.set_trade_progress(code, buy_progress_index, |
| | | l2.l2_data_util.local_today_datas.get( |
| | | code), |
| | | l2.l2_data_util.local_today_num_operate_map.get( |
| | | code)) |
| | | logger_l2_trade_buy_queue.info("获取成交位置成功: code-{} index-{} 数据-{}", code, |
| | | buy_progress_index, |
| | | json.dumps(buy_queue_result_list)) |
| | | except Exception as e: |
| | | logging.exception(e) |
| | | print("买入队列", code, buy_queue_result_list) |
| | | logger_l2_trade_buy_queue.warning("获取成交位置失败: code-{} 原因-{} 数据-{}", code, str(e), |
| | | json.dumps(buy_queue_result_list)) |
| | | |
| | | # buy_queue是否有变化 |
| | | if self.l2_trade_buy_queue_dict.get(code) is None or buy_queue != self.l2_trade_buy_queue_dict.get( |
| | |
| | | l2_data_manager_new.L2TradeDataProcessor.cancel_buy(code, cancel_msg, "trade_queue") |
| | | if need_sync: |
| | | # 同步数据 |
| | | L2LimitUpMoneyStatisticUtil.verify_num(0, code, int(buy_one_volumn), buy_time) |
| | | L2LimitUpMoneyStatisticUtil.verify_num(code, int(buy_one_volumn), buy_time) |
| | | # print(buy_time, buy_one_price, buy_one_volumn) |
| | | |
| | | # print("L2买卖队列",datas) |