Administrator
2023-07-07 2324df309e160a0e47992718ccdb37271ccee05f
华鑫适配
3个文件已修改
1个文件已添加
93 ■■■■■ 已修改文件
l2/huaxin/huaxin_target_codes_manager.py 30 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
main.py 34 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/current_price_process_manager.py 3 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/huaxin/trade_server.py 26 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/huaxin/huaxin_target_codes_manager.py
New file
@@ -0,0 +1,30 @@
"""
华鑫目标代码管理
"""
import json
from db import redis_manager
redisManager = redis_manager.RedisManager(0)
def __get_redis():
    return redisManager.getRedis()
__L2_CODE_KEY = "huaxin_l2_code_list"
def clear():
    __get_redis().delete(__L2_CODE_KEY)
def push(datas):
    __get_redis().lpush(__L2_CODE_KEY, json.dumps(datas))
def pop():
    val = __get_redis().lpop(__L2_CODE_KEY)
    if val:
        return json.loads(val)
    return val
main.py
@@ -11,25 +11,6 @@
import l2.l2_data_util
def __read_server_pipe(pipe):
    if pipe:
        while True:
            value = pipe.recv()
            if value is not None:
                value = json.loads(value)
                if value.get("type") == "clear_l2":
                    code = value["data"]["code"]
                    print("清除l2数据", code)
                    if len(code) != 6:
                        continue
                    l2_data_manager.clear_l2_data(code)
                    # 删除level2的数据
                    if l2.l2_data_util.local_today_datas and code in l2.l2_data_util.local_today_datas:
                        l2.l2_data_util.local_today_datas.pop(code)
                    if l2.l2_data_util.local_latest_datas and code in l2.l2_data_util.local_latest_datas:
                        l2.l2_data_util.local_latest_datas.pop(code)
            time.sleep(0.1)
# 交易服务
@@ -45,14 +26,10 @@
# 主服务
def createServer(pipe_juejin, pipe_gui):
def createServer():
    print("create Server")
    t1 = threading.Thread(target=lambda: __read_server_pipe(pipe_gui), daemon=True)
    t1.start()
    laddr = "", 9001
    tcpserver = MyThreadingTCPServer(laddr, MyBaseRequestHandle, pipe_juejin=pipe_juejin)  # 注意:参数是MyBaseRequestHandle
    tcpserver = MyThreadingTCPServer(laddr, MyBaseRequestHandle, pipe_juejin=None)  # 注意:参数是MyBaseRequestHandle
    # tcpserver.handle_request()  # 只接受一个客户端连接
    tcpserver.serve_forever()  # 永久循环执行,可以接受多个客户端连接
@@ -64,12 +41,7 @@
if __name__ == '__main__':
    # tradeServerProcess = multiprocessing.Process(target=createTradeServer)
    serverProcess = multiprocessing.Process(target=createServer, args=(None, None,))
    jueJinTradeProcess = multiprocessing.Process(target=trade_juejin.run)
    # tradeServerProcess.start()
    serverProcess = multiprocessing.Process(target=createServer)
    serverProcess.start()
    jueJinTradeProcess.start()
    # 将tradeServer作为主进程
    createTradeServer()
trade/current_price_process_manager.py
@@ -5,6 +5,7 @@
import decimal
import logging
from l2.huaxin import huaxin_target_codes_manager
from ths import client_manager
import constant
from code_attribute import gpcode_manager
@@ -121,7 +122,7 @@
                min_volume = 50 * 10000 // limit_up_price
                add_datas.append((d, min_volume, limit_up_price))
                try:
                    huaxin_trade_api.set_l2_codes_data(add_datas)
                    huaxin_target_codes_manager.push(add_datas)
                except Exception as e:
                    logging.exception(e)
        else:
trade/huaxin/trade_server.py
@@ -12,7 +12,7 @@
import constant
from code_attribute import gpcode_manager
from l2 import l2_data_manager_new, l2_data_log
from l2.huaxin import l2_huaxin_util
from l2.huaxin import l2_huaxin_util, huaxin_target_codes_manager
from logs_.log import logger_l2_error
from trade.huaxin.huaxin_log import logger_l2_orderdetail, logger_l2_transaction, logger_l2_upload, \
    logger_contact_debug, logger_trade_callback, logger_trade_debug
@@ -155,7 +155,8 @@
                        origin_start_time = round(time.time() * 1000)
                        try:
                            # 转换数据格式
                            datas = l2_huaxin_util.get_format_l2_datas(code, datas, gpcode_manager.get_limit_up_price(code))
                            datas = l2_huaxin_util.get_format_l2_datas(code, datas,
                                                                       gpcode_manager.get_limit_up_price(code))
                            __start_time = round(time.time() * 1000)
                            l2_data_manager_new.L2TradeDataProcessor().process_add_datas(code, datas, 0, __start_time)
                        except Exception as e:
@@ -331,10 +332,25 @@
            time.sleep(1)
def __set_target_codes():
    while True:
        try:
            datas = huaxin_target_codes_manager.pop()
            if datas:
                result = huaxin_trade_api.set_l2_codes_data(datas)
                print("设置L2代码结果:", result)
        except Exception as e:
            logging.exception(e)
        finally:
            time.sleep(1)
def run():
    t1 = threading.Thread(target=lambda: read_trade_data_queue())
    # 后台运行
    t1.setDaemon(True)
    # 拉取交易信息
    t1 = threading.Thread(target=lambda: read_trade_data_queue(), daemon=True)
    t1.start()
    t1 = threading.Thread(target=lambda: __set_target_codes(), daemon=True)
    t1.start()
    laddr = "0.0.0.0", 10008