| | |
| | | import json |
| | | |
| | | from db.redis_manager_delegate import RedisUtils |
| | | from log_module import async_log_util |
| | | from utils import global_util, tool |
| | | from db import redis_manager_delegate as redis_manager |
| | | from log_module.log import logger_day_volumn |
| | |
| | | |
| | | # 设置今日量 |
| | | def set_today_volumn(code, volumn): |
| | | logger_day_volumn.info("code:{} volumn:{}".format(code, volumn)) |
| | | async_log_util.info(logger_day_volumn, "code:{} volumn:{}".format(code, volumn)) |
| | | global_util.today_volumn[code] = volumn |
| | | # 有1000手的变化才保存 |
| | | if code in __today_volumn_cache and volumn - __today_volumn_cache[code] < 100000: |
| | |
| | | def set_today_volumns(datas): |
| | | for d in datas: |
| | | code, volumn = d |
| | | logger_day_volumn.info("code:{} volumn:{}".format(code, volumn)) |
| | | async_log_util.info(logger_day_volumn, "code:{} volumn:{}".format(code, volumn)) |
| | | global_util.today_volumn[code] = volumn |
| | | # 有1000手的变化才保存 |
| | | if code in __today_volumn_cache and volumn - __today_volumn_cache[code] < 100000: |
| | |
| | | from code_attribute import global_data_loader, code_volumn_manager, first_target_code_data_processor |
| | | from code_attribute.code_data_util import ZYLTGBUtil |
| | | from db import redis_manager_delegate as redis_manager |
| | | from log_module import async_log_util |
| | | from log_module.log import logger_l2_codes_subscript |
| | | from third_data import kpl_data_manager, kpl_api |
| | | from trade import current_price_process_manager |
| | |
| | | l2_codes_queue.clear() |
| | | |
| | | @classmethod |
| | | def push(cls, datas,request_id=None): |
| | | l2_codes_queue.put_nowait((int(time.time()), datas,request_id)) |
| | | def push(cls, datas, request_id=None): |
| | | l2_codes_queue.put_nowait((int(time.time()), datas, request_id)) |
| | | logger_l2_codes_subscript.info("加入L2代码处理队列:数量-{}", len(datas)) |
| | | # cls.__get_redis().lpush(cls.__L2_CODE_KEY, json.dumps()) |
| | | |
| | |
| | | |
| | | @classmethod |
| | | def set_level_1_codes_datas(cls, datas, request_id=None): |
| | | logger_l2_codes_subscript.info(f"({request_id})接受到L1的数据,开始预处理") |
| | | async_log_util.info(logger_l2_codes_subscript, f"({request_id})接受到L1的数据,开始预处理") |
| | | yesterday_codes = kpl_data_manager.get_yesterday_limit_up_codes() |
| | | # 订阅的代码 |
| | | flist = [] |
| | |
| | | continue |
| | | # 获取自由流通市值 |
| | | if code not in global_util.zyltgb_map: |
| | | __start_time = time.time() |
| | | zylt = kpl_api.getZYLTAmount(code) |
| | | async_log_util.info(logger_l2_codes_subscript, |
| | | f"{request_id} {code}获取自由流通市值耗时-{round((time.time() - __start_time) * 1000)}ms") |
| | | if zylt: |
| | | # 保存自由流通股本 |
| | | ZYLTGBUtil.save_async(code, zylt // 10000, 1) |
| | |
| | | "zyltgb": zyltgb // 10000, "zyltgbUnit": 1} |
| | | flist.append(fitem) |
| | | code_volumn_manager.set_today_volumns(temp_volumns) |
| | | logger_l2_codes_subscript.info(f"{request_id}接受到L1的数据,预处理完成") |
| | | async_log_util.info(logger_l2_codes_subscript, f"{request_id}接受到L1的数据,预处理完成") |
| | | try: |
| | | tick_datas = first_target_code_data_processor.process_first_codes_datas(flist, request_id) |
| | | current_price_process_manager.accept_prices(tick_datas,request_id) |
| | | current_price_process_manager.accept_prices(tick_datas, request_id) |
| | | except Exception as e: |
| | | logging.exception(e) |
| | | |
| | |
| | | import json |
| | | import logging |
| | | import multiprocessing |
| | | import queue |
| | | import random |
| | | import threading |
| | | import time |
| | |
| | | from utils import socket_util, huaxin_util, tool |
| | | |
| | | __response_thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=15) |
| | | __save_data_queue = queue.Queue() |
| | | |
| | | |
| | | def __run_save_data(): |
| | | while True: |
| | | try: |
| | | data = __save_data_queue.get() |
| | | huaxin_trade_record_manager.DelegateRecordManager.add_one(data) |
| | | except: |
| | | pass |
| | | finally: |
| | | time.sleep(0.1) |
| | | |
| | | |
| | | def __run_recv_queue_trade(queue: multiprocessing.Queue): |
| | |
| | | async_log_util.info(hx_logger_trade_callback, |
| | | f"response:request_id-{request_id}") |
| | | __response_thread_pool.submit(__set_response, data_json) |
| | | if request_id.find("trade") > 0: |
| | | data = data_json["data"].get("data") |
| | | if data: |
| | | __response_thread_pool.submit(huaxin_trade_record_manager.DelegateRecordManager.add_one, |
| | | data) |
| | | elif type_ == "trade_callback": |
| | | try: |
| | | # 交易回调 |
| | |
| | | direction = data.get("direction") |
| | | limitPrice = data.get("limitPrice") |
| | | volume = data.get("volume") |
| | | async_log_util.info(hx_logger_trade_callback, f"华鑫订单状态上报:code-{code} orderRef-{orderRef} orderSysID-{orderSysID} orderStatus-{orderStatus}") |
| | | is_shadow_order = False |
| | | # 获取涨停价 |
| | | limit_up_price = gpcode_manager.get_limit_up_price(code) |
| | |
| | | try: |
| | | TradeResultProcessor.process_order(order) |
| | | finally: |
| | | |
| | | try: |
| | | # 加入2次,增大加入成功率 |
| | | __response_thread_pool.submit( |
| | | huaxin_trade_record_manager.DelegateRecordManager.add_one, data) |
| | | async_log_util.info(hx_logger_trade_callback, |
| | | f"华鑫订单状态处理完成:code-{code} orderRef-{orderRef} orderSysID-{orderSysID} orderStatus-{orderStatus}") |
| | | __save_data_queue.put_nowait(data) |
| | | except Exception as e: |
| | | hx_logger_trade_debug.exception(e) |
| | | |
| | |
| | | queue_strategy_w_trade_r = queue_strategy_w_trade_r_ |
| | | t1 = threading.Thread(target=lambda: __run_recv_queue_trade(queue_strategy_r_trade_w_), daemon=True) |
| | | t1.start() |
| | | t1 = threading.Thread(target=lambda: __run_save_data(), daemon=True) |
| | | t1.start() |
| | | t1 = threading.Thread(target=lambda: CancelOrderManager().run(cancel_order), daemon=True) |
| | | t1.start() |
| | | |