| | |
| | | import json |
| | | import logging |
| | | import random |
| | | import time |
| | | import unittest |
| | | from copy import deepcopy |
| | | from unittest import mock |
| | |
| | | import log |
| | | import tool |
| | | from db import redis_manager |
| | | from l2 import l2_log, l2_data_manager, transaction_progress |
| | | from l2.safe_count_manager import BuyL2SafeCountManager |
| | | from l2.transaction_progress import TradeBuyQueue |
| | | from trade import trade_data_manager |
| | |
| | | redis_info.delete(k) |
| | | BuyL2SafeCountManager().clear_data(code) |
| | | |
| | | transaction_progress.TradeBuyQueue().set_traded_index(code, 0) |
| | | |
| | | |
| | | class VirtualTrade(unittest.TestCase): |
| | | |
| | | def __process_buy_queue(self,code, buy_queue, time_): |
| | | if time_ == "09:32:37": |
| | | def __process_buy_queue(self, code, buy_queue, time_): |
| | | if time_ == "10:25:14": |
| | | print("进入调试") |
| | | limit_up_price = gpcode_manager.get_limit_up_price(code) |
| | | buy_one_price = limit_up_price |
| | |
| | | try: |
| | | buy_one_price_ = decimal.Decimal(round(float(buy_one_price), 2)).quantize( |
| | | decimal.Decimal("0.00")) |
| | | buy_progress_index = TradeBuyQueue().compute_traded_index(code, buy_one_price_, buy_queue_result_list) |
| | | # 获取执行位时间 |
| | | exec_time = None |
| | | buy_single_index, buy_exec_index, compute_index, num, count, max_num_set = l2_data_manager.TradePointManager.get_buy_compute_start_data( |
| | | code) |
| | | if buy_exec_index: |
| | | try: |
| | | exec_time = l2.l2_data_util.local_today_datas.get(code)[buy_exec_index]["val"]["time"] |
| | | except: |
| | | pass |
| | | buy_progress_index = TradeBuyQueue().compute_traded_index(code, buy_one_price_, |
| | | buy_queue_result_list, exec_time) |
| | | if buy_progress_index is not None: |
| | | l2.cancel_buy_strategy.HourCancelBigNumComputer.set_trade_progress(code, buy_progress_index, |
| | | l2.cancel_buy_strategy.HourCancelBigNumComputer.set_trade_progress(code, time_, buy_exec_index, buy_progress_index, |
| | | l2.l2_data_util.local_today_datas.get( |
| | | code), |
| | | l2.l2_data_util.local_today_num_operate_map.get( |
| | | code)) |
| | | log.logger_l2_trade_buy_queue.info("获取成交位置成功: code-{} index-{} 数据-{}", code, |
| | | buy_progress_index, |
| | | json.dumps(buy_queue_result_list)) |
| | | log.logger_l2_trade_buy_queue.info("获取成交位置成功: code-{} index-{} 数据-{}", code, |
| | | buy_progress_index, |
| | | json.dumps(buy_queue_result_list)) |
| | | except Exception as e: |
| | | pass |
| | | |
| | | # @unittest.skip("跳过此单元测试") |
| | | @unittest.skip("跳过此单元测试") |
| | | def test_trade(self): |
| | | code = "002131" |
| | | code = "002117" |
| | | clear_trade_data(code) |
| | | l2.l2_data_util.load_l2_data(code) |
| | | total_datas = deepcopy(l2.l2_data_util.local_today_datas[code]) |
| | |
| | | trade_progress_list, buy_queues = log.get_trade_progress(code) |
| | | |
| | | for indexs in pos_list: |
| | | l2.l2_data_manager_new.L2TradeDataProcessor.random_key[code] = mock.Mock(return_value=random.randint(0, 100000)) |
| | | l2_log.threadIds[code] = mock.Mock( |
| | | return_value=random.randint(0, 100000)) |
| | | # 设置封单额,获取买1量 |
| | | for i in range(0, 100): |
| | | time_ = total_datas[indexs[0]]["val"]["time"] |
| | |
| | | break |
| | | |
| | | print("----------------处理位置", indexs) |
| | | if indexs[0] >= 224: |
| | | if indexs[0] >= 661: |
| | | print("进入调试") |
| | | l2.l2_data_manager_new.L2TradeDataProcessor.process_add_datas(code, total_datas[indexs[0]:indexs[1] + 1], 0, 0) |
| | | l2.l2_data_manager_new.L2TradeDataProcessor.process_add_datas(code, total_datas[indexs[0]:indexs[1] + 1], 0, |
| | | 0) |
| | | |
| | | @unittest.skip("跳过此单元测试") |
| | | def test_h_cancel(self): |
| | | code = "002870" |
| | | l2.l2_data_util.load_l2_data(code) |
| | | total_datas = l2.l2_data_util.local_today_datas.get(code) |
| | | total_datas = total_datas[:899] |
| | | l2.l2_data_util.local_today_datas[code] = total_datas |
| | | l2.l2_data_util.load_num_operate_map(l2.l2_data_util.local_today_num_operate_map, code, total_datas, True) |
| | | |
| | | buy_progress_index = 523 |
| | | l2.cancel_buy_strategy.HourCancelBigNumComputer.set_trade_progress(code, buy_progress_index, |
| | | l2.l2_data_util.local_today_datas.get( |
| | | code), |
| | | l2.l2_data_util.local_today_num_operate_map.get( |
| | | code)) |
| | | |
| | | |
| | | # class TestTrade(unittest.TestCase): |
| | |
| | | class TestTradedProgress(unittest.TestCase): |
| | | @unittest.skip("跳过此单元测试") |
| | | def test_get_progress(self): |
| | | code = "002328" |
| | | code = "000925" |
| | | l2.l2_data_util.load_l2_data(code) |
| | | # l2.l2_data_util.local_today_datas[code] = l2.l2_data_util.local_today_datas[code][:898] |
| | | |
| | | TradeBuyQueue.get_traded_index = mock.Mock(return_value=(10, False)) |
| | | buy_progress_index = TradeBuyQueue().compute_traded_index(code, "6.94", [1270, 9999, 1973]) |
| | | buy_progress_index = TradeBuyQueue().compute_traded_index(code, "9.76", |
| | | [9999, 1506], "09:32:45") |
| | | print("获取到交易进度:", buy_progress_index) |
| | | |
| | | @unittest.skip("跳过此单元测试") |
| | | def test_sort(self): |
| | | list = [1, 2, 3] |
| | | result_list = itertools.permutations(list, 3) |
| | | print(result_list) |
| | | for r in result_list: |
| | | print(r) |
| | | _start = round(time.time() * 1000) |
| | | count = 0 |
| | | for i in range(0, 100000): |
| | | # if i > 30: |
| | | count += 1 |
| | | print("耗时", round(time.time() * 1000) - _start) |
| | | |
| | | |
| | | if __name__ == "__main__": |