import logging
|
import multiprocessing
|
import threading
|
import time
|
|
import zmq
|
|
import constant
|
from huaxin_client.communication.l2_communication import L2ChannelCommunicationParams, L2SharedMemoryDataUtil
|
|
|
class L2Strategy:
|
|
def __init__(self, commu_params):
|
self.context = zmq.Context()
|
self.commu_params = commu_params
|
|
def rec_delegate_data(self, delegate_ipc_addr, shared_memory):
|
socket = self.context.socket(zmq.REP)
|
socket.bind(delegate_ipc_addr)
|
while True:
|
msg = socket.recv_json()
|
try:
|
use_time = (time.time() - msg['time']) * 1000
|
decoded_data, decoded_use_time = L2SharedMemoryDataUtil.get_data(shared_memory)
|
print(f"获取到逐笔委托数据: 数据长度:{len(decoded_data)} 通信耗时:{use_time}ms 解码耗时:{decoded_use_time}s")
|
except Exception as e:
|
logging.exception(e)
|
finally:
|
socket.send_string("SUCCESS")
|
|
def rec_deal_data(self, deal_ipc_addr, shared_memory):
|
socket = self.context.socket(zmq.REP)
|
socket.bind(deal_ipc_addr)
|
while True:
|
msg = socket.recv_json()
|
try:
|
use_time = (time.time() - msg['time']) * 1000
|
decoded_data, decoded_use_time = L2SharedMemoryDataUtil.get_data(shared_memory)
|
print(f"获取到逐笔成交数据: 数据长度:{len(decoded_data)} 通信耗时:{use_time}ms 解码耗时:{decoded_use_time}s")
|
except Exception as e:
|
logging.exception(e)
|
finally:
|
socket.send_string("SUCCESS")
|
|
def run(self):
|
for param in self.commu_params:
|
threading.Thread(target=self.rec_delegate_data,
|
args=(param.delegate_ipc_addr, param.delegate_data_shared_memory,), daemon=True).start()
|
threading.Thread(target=self.rec_deal_data, args=(param.deal_ipc_addr, param.deal_data_shared_memory,),
|
daemon=True).start()
|
|
|
class L2DataCollector:
|
|
def __init__(self, commu_params):
|
self.context = zmq.Context()
|
self.commu_params = commu_params
|
|
def send_delegate_data(self, delegate_ipc_addr, shared_memory):
|
socket = self.context.socket(zmq.REQ)
|
socket.connect(delegate_ipc_addr)
|
while True:
|
try:
|
|
datas = ('000990', 7.52, 400, 93000030, 2012, 380422, 380421, 375477, '1') * 100
|
L2SharedMemoryDataUtil.set_data(datas, shared_memory)
|
socket.send_json({'data': [], "time": time.time()})
|
response = socket.recv_string()
|
print("收到委托反馈")
|
except Exception as e:
|
logging.exception(e)
|
finally:
|
time.sleep(5)
|
|
def send_deal_data(self, deal_ipc_addr, shared_memory):
|
socket = self.context.socket(zmq.REQ)
|
socket.connect(deal_ipc_addr)
|
while True:
|
try:
|
datas = ('888888', 7.52, 400, 93000030, 2012, 380422, 380421, 375477, '1') * 100
|
L2SharedMemoryDataUtil.set_data(datas, shared_memory)
|
socket.send_json({'data': [], "time": time.time()})
|
response = socket.recv_string()
|
except Exception as e:
|
logging.exception(e)
|
finally:
|
time.sleep(10)
|
|
def run(self):
|
for param in self.commu_params:
|
threading.Thread(target=self.send_delegate_data,
|
args=(param.delegate_ipc_addr, param.delegate_data_shared_memory,),
|
daemon=True).start()
|
threading.Thread(target=self.send_deal_data, args=(param.deal_ipc_addr, param.deal_data_shared_memory,),
|
daemon=True).start()
|
|
|
if __name__ == "__main__":
|
channel_count = 100
|
params = []
|
for i in range(0, channel_count):
|
delegate_data_array = L2SharedMemoryDataUtil.create_shared_memory()
|
deal_data_array = L2SharedMemoryDataUtil.create_shared_memory()
|
param = L2ChannelCommunicationParams(
|
delegate_ipc_addr=f"ipc://order_{i}.ipc",
|
deal_ipc_addr=f"ipc://deal_{i}.ipc",
|
delegate_data_shared_memory=delegate_data_array,
|
deal_data_shared_memory=deal_data_array)
|
if constant.is_windows():
|
param.delegate_ipc_addr = f"tcp://127.0.0.1:{15000 + i}"
|
param.deal_ipc_addr = f"tcp://127.0.0.1:{16000 + i}"
|
params.append(param)
|
strategy = L2Strategy(params)
|
dataCollector = L2DataCollector(params)
|
strategyProcess = multiprocessing.Process(target=strategy.run, args=())
|
l2Process = multiprocessing.Process(target=dataCollector.run, args=())
|
strategyProcess.start()
|
l2Process.start()
|
while True:
|
time.sleep(100)
|