Administrator
2022-09-08 e7f8c6013d777dd5ba10b8d548d2d3db6158d37a
server.py
@@ -2,6 +2,7 @@
import logging
import socketserver
import socket
import threading
import time
import data_process
@@ -66,7 +67,9 @@
                        __start_time = round(time.time() * 1000)
                        # level2盘口数据
                        day, client, channel, code, datas = l2_data_manager.parseL2Data(_str)
                        cid, pid = gpcode_manager.get_listen_code_pos(code)
                        # 判断目标代码位置是否与上传数据位置一致
                        if cid is not None and pid is not None and client == int(cid) and channel == int(pid):
                        try:
                            # print("L2数据接受",day,code,len(datas))
                            # 查询
@@ -98,8 +101,12 @@
                            if l.get_code() == l2_data_manager.L2DataException.CODE_PRICE_ERROR:
                                key = "{}-{}-{}".format(client, channel, code)
                                if key not in self.l2_data_error_dict or round(
                                        time.time() * 1000) - self.l2_data_error_dict[key] > 2000:
                                    self.l2CodeOperate.repaire_l2_data(code)
                                            time.time() * 1000) - self.l2_data_error_dict[key] > 10000:
                                        # self.l2CodeOperate.repaire_l2_data(code)
                                        # todo 太敏感移除代码
                                        logger_l2_error.warning("code-{} l2单价错误:{}", code, l.msg)
                                        # 单价不一致时需要移除代码重新添加
                                        l2_code_operate.L2CodeOperate().remove_l2_listen(code)
                                    self.l2_data_error_dict[key] = round(time.time() * 1000)
                        except Exception as e:
@@ -114,10 +121,6 @@
                                logger_l2_process.info("l2处理时间:{}-{}".format(code, __end_time - __start_time));
                    except:
                        pass
                elif type == 10:
                    # level2交易队列
                    try:
@@ -126,7 +129,6 @@
                            data_process.saveL2Data(day, code, setData)
                    except:
                        print("异常")
                elif type == 1:
                    # 设置股票代码
                    data_list = data_process.parseGPCode(_str)
@@ -138,7 +140,10 @@
                    gpcode_manager.set_gp_list(code_list)
                    # 重新订阅
                    self.server.pipe.send(json.dumps({"type": "resub"}))
                    sync_target_codes_to_ths()
                    # 同步同花顺目标代码
                    t1 = threading.Thread(target=lambda: sync_target_codes_to_ths())
                    t1.setDaemon(True)
                    t1.start()
                elif type == 2:
                    # 涨停代码
                    codeList = data_process.parseGPCode(_str)
@@ -185,17 +190,18 @@
                            {"code": 0, "data": {"client": int(client_id), "authoritys": json.loads(_authoritys)}})
                    except Exception as e:
                        return_str = data_process.toJson({"code": 1, "msg": str(e)})
                # 现价更新
                elif type == 40:
                    data = data_process.parse(_str)["data"]
                    if data is not None:
                        print("现价数量", len(data))
                    for item in data:
                        juejin.accpt_price(item["code"], float(item["price"]))
                elif type == 30:
                    data = data_process.parse(_str)["data"]
                    client_id = data["client"]
                    if "memery" in data:
                        mem = data["memery"]
                        logger_device.info("({})内存使用率:{}".format(client_id, mem))
                    logger_device.info("({})客户端信息:{}".format(client_id, json.dumps(data)))
                    data_process.saveClientActive(int(client_id), host)
                    # print("心跳:", client_id)
@@ -235,7 +241,8 @@
                send_msg(client, {"action": "test"})
            except:
                pass
        # 矫正客户端代码
        l2_code_operate.correct_client_codes()
        time.sleep(5)