| | |
| | | import http |
| | | import json |
| | | import logging |
| | | import multiprocessing |
| | | import queue |
| | | import socketserver |
| | | import threading |
| | | import time |
| | | from http.server import BaseHTTPRequestHandler |
| | | |
| | | import psutil |
| | | import requests |
| | | |
| | | from code_attribute import global_data_loader |
| | | from huaxin_client import l2_client_test, l1_subscript_codes_manager |
| | | from log_module.log import logger_local_huaxin_l2_upload |
| | | from log_module.log import logger_local_huaxin_l2_transaction_big_order, logger_system, \ |
| | | logger_local_huaxin_l2_transaction_accurate_big_order |
| | | from third_data.custom_block_in_money_manager import CodeInMoneyManager, BlockInMoneyRankManager |
| | | from third_data.history_k_data_manager import HistoryKDataManager |
| | | from third_data.history_k_data_util import HistoryKDatasUtils |
| | | from trade.buy_radical.block_special_codes_manager import BlockSpecialCodesManager |
| | | from utils import tool, middle_api_protocol, global_util |
| | | import urllib.parse as urlparse |
| | | from urllib.parse import parse_qs |
| | | |
| | | |
| | | def run(): |
| | | codes_sh, codes_sz = l1_subscript_codes_manager.get_codes() |
| | | codes = [x.decode() for x in codes_sh] |
| | | codes.extend([x.decode() for x in codes_sz]) |
| | | codes.sort() |
| | | cpu_count = 16 |
| | | page_size = int(len(codes) / cpu_count) + 1 |
| | | class DataServer(BaseHTTPRequestHandler): |
| | | # 禁用日志输出 |
| | | def log_message(self, format, *args): |
| | | pass |
| | | |
| | | big_order_queue = multiprocessing.Queue() |
| | | def do_GET(self): |
| | | path = self.path |
| | | url = urlparse.urlparse(path) |
| | | response_data = "" |
| | | if url.path == "/get_block_codes_money": |
| | | # 获取板块对应的代码与该代码的净流入 |
| | | ps_dict = dict([(k, v[0]) for k, v in parse_qs(url.query).items()]) |
| | | block = ps_dict.get('block') |
| | | try: |
| | | fdatas = BlockInMoneyRankManager().get_block_codes_money(block) |
| | | response_data = json.dumps({"code": 0, "data": fdatas}) |
| | | except Exception as e: |
| | | response_data = json.dumps({"code": 1, "msg": str(e)}) |
| | | elif url.path == "/get_big_order_list": |
| | | # 获获取代码的大买/卖单列表 |
| | | ps_dict = dict([(k, v[0]) for k, v in parse_qs(url.query).items()]) |
| | | code = ps_dict.get('code') |
| | | try: |
| | | buy_datas = CodeInMoneyManager().get_big_buy_money_list(code) |
| | | if buy_datas is None: |
| | | buy_datas = [] |
| | | sell_datas = CodeInMoneyManager().get_big_sell_money_list(code) |
| | | if sell_datas is None: |
| | | sell_datas = [] |
| | | response_data = json.dumps({"code": 0, "data": {"buy": buy_datas, "sell": sell_datas}}) |
| | | except Exception as e: |
| | | response_data = json.dumps({"code": 1, "msg": str(e)}) |
| | | elif url.path == "/get_code_money_info": |
| | | # 获取代码金额 |
| | | ps_dict = dict([(k, v[0]) for k, v in parse_qs(url.query).items()]) |
| | | code = ps_dict.get('code') |
| | | money_info = CodeInMoneyManager().get_money_info(code) |
| | | response_data = json.dumps({"code": 0, "data": money_info}) |
| | | elif url.path == "/get_codes_money_info": |
| | | ps_dict = dict([(k, v[0]) for k, v in parse_qs(url.query).items()]) |
| | | codes_str = ps_dict.get('codes') |
| | | codes = json.loads(codes_str) |
| | | fresults = {} |
| | | for code in codes: |
| | | money_info = CodeInMoneyManager().get_money_info(code) |
| | | fresults[code] = money_info |
| | | response_data = json.dumps({"code": 0, "data": fresults}) |
| | | self.send_response(200) |
| | | # 发给请求客户端的响应数据 |
| | | self.send_header('Content-type', 'application/json') |
| | | self.end_headers() |
| | | self.wfile.write(response_data.encode()) |
| | | |
| | | for i in range(cpu_count): |
| | | process = multiprocessing.Process(target=l2_client_test.run, |
| | | args=(codes[i * page_size:(i + 1) * page_size], big_order_queue,)) |
| | | process.start() |
| | | |
| | | class ThreadedHTTPServer(socketserver.ThreadingMixIn, http.server.HTTPServer): |
| | | pass |
| | | |
| | | |
| | | def __run_server(addr, port): |
| | | handler = DataServer |
| | | try: |
| | | httpd = ThreadedHTTPServer((addr, port), handler) |
| | | print("L2_IN_MONEY HTTP server is at: http://%s:%d/" % (addr, port)) |
| | | httpd.serve_forever() |
| | | except Exception as e: |
| | | logger_system.exception(e) |
| | | logger_system.error(f"端口服务器:{port} 启动失败") |
| | | |
| | | |
| | | def __run_upload_big_order_task(_queue: queue.Queue): |
| | | # 运行上传大单任务 |
| | | while True: |
| | | try: |
| | | data = big_order_queue.get() |
| | | logger_local_huaxin_l2_upload.info(f"{data}") |
| | | datas = [] |
| | | while not _queue.empty(): |
| | | datas.append(_queue.get()) |
| | | if datas: |
| | | # 上传数据 |
| | | requests.post("http://192.168.84.71:12881/upload_deal_big_orders", json.dumps(datas)) |
| | | except: |
| | | pass |
| | | finally: |
| | | time.sleep(1) |
| | | |
| | | |
| | | def __get_special_codes(): |
| | | """ |
| | | 获取特殊的代码,需要订阅300w以上的大单 |
| | | @return: 代码集合 |
| | | """ |
| | | try: |
| | | zylt_volume_map = global_util.zylt_volume_map |
| | | codes = set() |
| | | last_trade_day = HistoryKDatasUtils.get_latest_trading_date(1)[0] |
| | | for code in zylt_volume_map: |
| | | if code == '601288': |
| | | print("") |
| | | volume = zylt_volume_map.get(code) |
| | | # 今日涨停价要突破昨日最高价 |
| | | k_bars = HistoryKDataManager().get_history_bars(code, last_trade_day) |
| | | if k_bars and 10e8 <= k_bars[0]["close"] * volume * tool.get_limit_up_rate(code) <= 300e8: |
| | | # 自由流通市值在10亿-300亿以上 |
| | | limit_up_price = round(tool.get_limit_up_rate(code) * k_bars[0]["close"], 2) |
| | | if limit_up_price > k_bars[0]["high"] or True: |
| | | # 今日涨停价要突破昨日最高价 |
| | | codes.add(code) |
| | | # 获取辨识度的票 |
| | | special_codes = BlockSpecialCodesManager().get_origin_code_blocks_dict().keys() |
| | | if special_codes: |
| | | codes |= set(special_codes) |
| | | return codes |
| | | except Exception as e: |
| | | logger_system.exception(e) |
| | | return set() |
| | | |
| | | |
| | | def __save_accurate_big_order(big_accurate_order_queue): |
| | | while True: |
| | | try: |
| | | data = big_accurate_order_queue.get() |
| | | logger_local_huaxin_l2_transaction_accurate_big_order.info(f"{data}") |
| | | except: |
| | | pass |
| | | |
| | | |
| | | def run(): |
| | | special_codes = __get_special_codes() |
| | | codes_sh, codes_sz = l1_subscript_codes_manager.get_codes() |
| | | codes = [x.decode() for x in codes_sh] |
| | | codes.extend([x.decode() for x in codes_sz]) |
| | | codes.sort() |
| | | # 绑定CPU的核心是0-16 |
| | | cpu_count = 16 |
| | | page_size = int(len(codes) / cpu_count) + 1 |
| | | |
| | | big_order_queue = multiprocessing.Queue(maxsize=1024) |
| | | big_accurate_order_queue = multiprocessing.Queue(maxsize=1024) |
| | | # 大单上传队列 |
| | | big_order_upload_queue = queue.Queue(maxsize=1024) |
| | | |
| | | for i in range(cpu_count): |
| | | process = multiprocessing.Process(target=l2_client_test.run, |
| | | args=( |
| | | codes[i * page_size:(i + 1) * page_size], big_order_queue, |
| | | big_accurate_order_queue, special_codes,)) |
| | | |
| | | process.start() |
| | | # 绑核运行 |
| | | psutil.Process(process.pid).cpu_affinity([i]) |
| | | threading.Thread(target=__run_upload_big_order_task, args=(big_order_upload_queue,), daemon=True).start() |
| | | threading.Thread(target=__save_accurate_big_order, args=(big_accurate_order_queue,), daemon=True).start() |
| | | |
| | | while True: |
| | | try: |
| | | data = big_order_queue.get() |
| | | CodeInMoneyManager().add_data(data) |
| | | # 添加上传数据 |
| | | big_order_upload_queue.put_nowait(data) |
| | | logger_local_huaxin_l2_transaction_big_order.info(f"{data}") |
| | | except: |
| | | pass |
| | | |
| | | |
| | | def __compute_and_upload(): |
| | | def __upload_data(type_, data_): |
| | | root_data = { |
| | | "type": type_, |
| | | "data": data_ |
| | | } |
| | | requests.post("http://127.0.0.1:9004/upload_kpl_data", json.dumps(root_data)) |
| | | |
| | | def __upload_codes_in_money(): |
| | | """ |
| | | 上传所有代码的流入 |
| | | @param type_: |
| | | @param data_: |
| | | @return: |
| | | """ |
| | | |
| | | root_data = { |
| | | "data": json.dumps(CodeInMoneyManager().get_code_money_dict()) |
| | | } |
| | | requests.post("http://127.0.0.1:9004/upload_codes_in_money", json.dumps(root_data)) |
| | | |
| | | while True: |
| | | try: |
| | | if not tool.is_trade_time(): |
| | | continue |
| | | BlockInMoneyRankManager().compute() |
| | | in_list = BlockInMoneyRankManager().get_in_list() |
| | | out_list = BlockInMoneyRankManager().get_out_list() |
| | | # (代码,名称,强度,主力净额) |
| | | fins = [(0, x[0], 0, x[1]) for x in in_list[:100]] |
| | | fouts = [(0, x[0], 0, x[1]) for x in out_list[:50]] |
| | | # 上传 |
| | | __upload_data("jingxuan_rank", json.dumps(fins)) |
| | | __upload_data("jingxuan_rank_out", json.dumps(fouts)) |
| | | __upload_codes_in_money() |
| | | except Exception as e: |
| | | logging.exception(e) |
| | | finally: |
| | | time.sleep(3) |
| | | |
| | | |
| | | def __update_today_limit_up_records(): |
| | | while True: |
| | | try: |
| | | BlockInMoneyRankManager().load_today_limit_up_codes() |
| | | except: |
| | | pass |
| | | finally: |
| | | time.sleep(3) |
| | | |
| | | |
| | | if __name__ == "__main__": |
| | | # 载入自由流通量 |
| | | global_data_loader.load_zyltgb_volume_from_db() |
| | | threading.Thread(target=__compute_and_upload, daemon=True).start() |
| | | # 启动内部接口服务 |
| | | threading.Thread(target=__run_server, args=("0.0.0.0", 9005,), daemon=True).start() |
| | | # 启用定时更新当日涨停 |
| | | threading.Thread(target=__update_today_limit_up_records, daemon=True).start() |
| | | run() |
| | | while True: |
| | | time.sleep(2) |