Administrator
2023-12-01 95e52fd8db1d2801110cc84f1e4bd42a546a4a5c
修改卖相关功能
1 文件已重命名
7个文件已修改
3个文件已添加
301 ■■■■ 已修改文件
test/test_code_attribute.py 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test/test_sell.py 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
third_data/code_plate_key_manager.py 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
third_data/data_server.py 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
third_data/kpl_block_util.py 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/huaxin/huaxin_trade_server.py 153 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/sell/sell_manager.py 43 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/sell/sell_rule_manager.py 50 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/trade_huaxin.py 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/trade_juejin.py 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
utils/sell_util.py 33 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test/test_code_attribute.py
@@ -1,5 +1,5 @@
from code_attribute import code_nature_analyse
from utils import init_data_util
from utils import init_data_util, tool
def is_too_high(datas):
@@ -13,7 +13,7 @@
    code_str = "600148"
    codes = code_str.split(",")
    for code in codes:
        if code.find("00") != 0 and code.find("60") != 0:
        if not tool.is_shsz_code(code):
            continue
        try:
            datas = init_data_util.get_volumns_by_code(code, 120)
test/test_sell.py
New file
@@ -0,0 +1,4 @@
from trade.huaxin import huaxin_trade_server
if __name__ == '__main__':
    huaxin_trade_server.TradeServerProcessor.test_sell()
third_data/code_plate_key_manager.py
@@ -518,7 +518,7 @@
                max_rank = 3
                msg_list.append(f"{bc}高位板")
                break
            elif bc.find("00") != 0 and bc.find("60") != 0:
            elif not tool.is_shsz_code(bc):
                max_rank = 3
                msg_list.append(f"{bc}创业板/科创板")
                break
third_data/data_server.py
@@ -560,7 +560,7 @@
                    code = d[0]
                    limit_up_reasons[code] = d[5]
                    codes_set.add(code)
                    if code.find("00") == 0 or code.find("60") == 0:
                    if tool.is_shsz_code(code):
                        limit_up_time = time.strftime("%H:%M:%S", time.localtime(d[2]))
                        code_price_manager.Buy1PriceManager().set_limit_up_time(code, limit_up_time)
                add_codes = codes_set - self.__latest_limit_up_codes_set
@@ -579,7 +579,7 @@
                if add_codes:
                    for code in add_codes:
                        # 根据涨停原因判断是否可以买
                        if code.find("00") == 0 or code.find("60") == 0:
                        if tool.is_shsz_code(code):
                            try:
                                # 判断是否下单
                                trade_state = trade_manager.CodesTradeStateManager().get_trade_state(code)
third_data/kpl_block_util.py
@@ -145,7 +145,7 @@
        if k[3] == code:
            # 获取当前代码涨停时间
            limit_up_time = int(k[5])
        if shsz and k[3].find("00") != 0 and k[3].find("60") != 0:
        if shsz and not tool.is_shsz_code(k[3]):
            continue
        # 剔除高位板
        if k[3] in yesterday_current_limit_up_codes:
@@ -175,7 +175,7 @@
        if k[0] == code:
            # 获取当前代码涨停时间
            limit_up_time = int(k[2])
        if shsz and k[0].find("00") != 0 and k[0].find("60") != 0:
        if shsz and not tool.is_shsz_code(k[0]):
            continue
        # 剔除高位板
        if k[0] in yesterday_current_limit_up_codes:
trade/huaxin/huaxin_trade_server.py
@@ -1,12 +1,9 @@
import concurrent.futures
import contextlib
import copy
import datetime
import hashlib
import io
import json
import logging
import mmap
import multiprocessing
import queue
import random
@@ -15,13 +12,11 @@
import threading
import time
import dask
import numpy
import psutil
import requests
import huaxin_client.constant
from line_profiler import LineProfiler
import constant
import inited_data
@@ -34,27 +29,26 @@
from huaxin_client.client_network import SendResponseSkManager
from huaxin_client.trade_transform_protocol import TradeResponse
from l2 import l2_data_manager_new, l2_log, code_price_manager, l2_data_util, l2_data_manager, transaction_progress, \
    l2_data_log, l2_data_source_util
from l2.cancel_buy_strategy import HourCancelBigNumComputer, LCancelBigNumComputer, DCancelBigNumComputer, \
    GCancelBigNumComputer, SecondCancelBigNumComputer, LCancelRateManager, LatestCancelIndexManager, \
    l2_data_source_util
from l2.cancel_buy_strategy import LCancelBigNumComputer, GCancelBigNumComputer, SecondCancelBigNumComputer, \
    LCancelRateManager, \
    UCancelBigNumComputer
from l2.huaxin import huaxin_target_codes_manager
from l2.huaxin.huaxin_target_codes_manager import HuaXinL1TargetCodesManager
from l2.l2_data_listen_manager import L2DataListenManager
from l2.l2_data_manager_new import L2TradeDataProcessor
from l2.l2_data_util import L2DataUtil
from l2.l2_sell_manager import L2MarketSellManager
from l2.l2_transaction_data_manager import HuaXinTransactionDatasProcessor
from log_module import async_log_util, log_export
from log_module.log import hx_logger_l2_upload, hx_logger_contact_debug, hx_logger_trade_callback, \
    hx_logger_l2_orderdetail, hx_logger_l2_transaction, hx_logger_l2_market_data, logger_l2_trade_buy_queue, \
    logger_l2_g_cancel, logger_debug, logger_system, logger_trade, logger_l2_process
from third_data import block_info, kpl_api, kpl_data_manager
from log_module.log import hx_logger_contact_debug, hx_logger_trade_callback, \
    hx_logger_l2_orderdetail, hx_logger_l2_transaction, hx_logger_l2_market_data, logger_l2_g_cancel, logger_debug, \
    logger_system, logger_trade
from third_data import block_info, kpl_data_manager
from third_data.code_plate_key_manager import KPLCodeJXBlockManager, CodePlateKeyBuyManager
from third_data.history_k_data_util import JueJinApi, HistoryKDatasUtils
from third_data.kpl_data_manager import KPLDataManager
from third_data.kpl_util import KPLDataType
from trade import deal_big_money_manager, current_price_process_manager, trade_huaxin, trade_manager, l2_trade_util, \
from trade import trade_manager, l2_trade_util, \
    l2_trade_factor
from trade.deal_big_money_manager import DealOrderNoManager
@@ -62,9 +56,10 @@
    huaxin_trade_record_manager, huaxin_trade_order_processor
from trade.huaxin.huaxin_trade_record_manager import PositionManager
from trade.l2_trade_factor import L2PlaceOrderParamsManager
from trade.sell_rule_manager import SellRuleManager, SellRule
from trade.sell import sell_manager
from trade.sell.sell_rule_manager import SellRuleManager, SellRule
from trade.trade_manager import TradeTargetCodeModeManager
from utils import socket_util, data_export_util, middle_api_protocol, tool, huaxin_util, output_util
from utils import socket_util, data_export_util, middle_api_protocol, tool, huaxin_util, output_util, sell_util
trade_data_request_queue = queue.Queue()
@@ -240,7 +235,7 @@
                        list_ = JueJinApi.get_exchanges_codes(["SHSE", "SZSE"])
                        fdatas = []
                        for d in list_:
                            if d["sec_id"].find("60") != 0 and d["sec_id"].find("00") != 0:
                            if not tool.is_shsz_code(d["sec_id"]):
                                continue
                            if d["sec_level"] != 1:
                                continue
@@ -295,6 +290,7 @@
    @classmethod
    def __sell(cls, datas):
        rules = SellRuleManager().list_can_excut_rules_cache()
        excuted_rule_ids = set()
        if rules:
            for d in datas:
                code = d[0]
@@ -303,10 +299,44 @@
                buy1_price = d[5]
                if buy1_volume:
                    for r in rules:
                        if r.code == code and r.buy1_volume >= buy1_volume:
                            # 提交卖
                            huaxin_trade_api.order(2, code, r.sell_volume, tool.get_buy_min_price(buy1_price))
                            SellRuleManager().excute_sell(r.id_)
                        # 生效时间
                        if r.code == code:
                            # --------判断是否可以执行--------
                            can_excute = False
                            if round(float(buy1_price), 2) <= round(float(r.buy1_price), 2):
                                # 价格已经触发
                                if r.buy1_volume:
                                    if r.buy1_volume >= buy1_volume:
                                        # 量价触发
                                        can_excute = True
                                else:
                                    can_excute = True
                                    # 价格触发
                                # 获取价格类型
                                if not can_excute:
                                    continue
                                # 请求卖出锁
                                SellRuleManager().require_sell_lock(r.id_)
                                try:
                                    if r.id_ in excuted_rule_ids:
                                        continue
                                    excuted_rule_ids.add(r.id_)
                                    # 获取最新的执行状况
                                    r = SellRuleManager().get_by_id(r.id_)
                                    if r.excuted:
                                        continue
                                    # 提交卖
                                    limit_down_price = gpcode_manager.get_limit_down_price(code)
                                    limit_up_price = gpcode_manager.get_limit_up_price(code)
                                    sell_manager.start_sell(code, r.sell_volume, r.sell_price_type, limit_up_price,
                                                            limit_down_price,
                                                            buy1_price)
                                    SellRuleManager().excute_sell(r.id_)
                                except:
                                    pass
                                finally:
                                    SellRuleManager().release_sell_lock(r.id_)
    # 保存现价
    @classmethod
@@ -463,6 +493,13 @@
        except Exception as e:
            async_log_util.error(logger_l2_g_cancel, f"{code}-撤单异常:{str(e)}")
    @classmethod
    def test_sell(cls):
        # (代码, 现价, 涨幅, 量, 更新时间, 买1价格, 买1量)
        datas = [("600571", 12.14, 9.96, 100000000, tool.get_now_time_str(), 12.14, 10210),
                 ("600571", 12.04, 9.96, 100000000, tool.get_now_time_str(), 12.04, 10210)]
        cls.__sell(datas)
def clear_invalid_client():
    logger_system.info(f"trade_server clear_invalid_client 线程ID:{tool.get_thread_id()}")
@@ -524,20 +561,6 @@
            except Exception as e1:
                logging.exception(e1)
    # 撤卖单
    def __cancel_sell_order(self, code, order_ref):
        for i in range(0, 10):
            time.sleep(0.2)
            order_entity = huaxin_trade_order_processor.TradeResultProcessor.get_huaxin_order_by_order_ref(order_ref)
            if order_entity:
                if order_entity.orderStatus == huaxin_util.TORA_TSTP_OST_AllTraded:
                    # 成交的就不需要撤单了
                    return
        try:
            result = huaxin_trade_api.cancel_order(2, code, None, orderRef=order_ref)
        except Exception as e:
            logger_trade.exception(e)
    # 交易
    def OnTrade(self, client_id, request_id, data):
        try:
@@ -552,39 +575,11 @@
                if direction == 2:
                    # price_type: 0-价格笼子 1-跌停价  2-涨停价 3-现价 4-买5价
                    async_log_util.info(logger_trade, f"API卖: 接收数据-{data}")
                    # 获取现价
                    if price_type == 0:
                        current_price = TradeServerProcessor.get_l1_current_price(code)
                        if not current_price:
                            raise Exception("没有获取到L1现价")
                        price = tool.get_buy_min_price(current_price)
                    elif price_type == 1:
                        price_ = gpcode_manager.get_limit_down_price(code)
                        if not price_:
                            raise Exception("没有获取到跌停价")
                        price = round(float(price_), 2)
                    elif price_type == 2:
                        price_ = gpcode_manager.get_limit_up_price(code)
                        if not price_:
                            raise Exception("没有获取到涨停价")
                        price = round(float(price_), 2)
                    elif price_type == 3:
                        current_price = TradeServerProcessor.get_l1_current_price(code)
                        if not current_price:
                            raise Exception("没有获取到L1现价")
                        price = current_price
                    elif price_type == 4:
                        current_price = TradeServerProcessor.get_l1_current_price(code)
                        if not current_price:
                            raise Exception("没有获取到L1现价")
                        price = round(float(current_price) - 0.05, 2)
                    async_log_util.info(logger_trade, f"API卖:  单价-{price}")
                    order_ref = huaxin_util.create_order_ref()
                    result = huaxin_trade_api.order(direction, code, volume, price, sinfo=sinfo, order_ref=order_ref,
                                                    blocking=True, request_id=request_id)
                    # 如果是在正常交易时间提交的2s之内还未成交的需要撤单
                    if int("092958") <= int(tool.get_now_time_str().replace(":", "")) <= int("150000"):
                        self.__cancel_sell_thread_pool.submit(lambda: self.__cancel_sell_order(code, order_ref))
                    current_price = TradeServerProcessor.get_l1_current_price(code)
                    limit_down_price = gpcode_manager.get_limit_down_price(code)
                    limit_up_price = gpcode_manager.get_limit_up_price(code)
                    result = sell_manager.start_sell(code, volume, price_type, limit_up_price, limit_down_price,
                                                     current_price, blocking=True, request_id=request_id)
                    self.send_response(result, client_id, request_id)
                else:
                    result = huaxin_trade_api.order(direction, code, volume, price, price_type=price_type, sinfo=sinfo,
@@ -1030,10 +1025,30 @@
    def OnGetCodePositionInfo(self, client_id, request_id, data):
        try:
            code = data["code"]
            if not tool.is_shsz_code(code):
                raise Exception("非主板代码")
            # 获取代码基本信息
            # 查询是否想买单/白名单/黑名单/暂不买
            code_name = gpcode_manager.get_code_name(code)
            want = gpcode_manager.WantBuyCodesManager().is_in_cache(code)
            white = l2_trade_util.WhiteListCodeManager().is_in_cache(code)
            black = l2_trade_util.is_in_forbidden_trade_codes(code)
            pause_buy = gpcode_manager.PauseBuyCodesManager().is_in_cache(code)
            desc_list = []
            if want:
                desc_list.append("【想买单】")
            if white:
                desc_list.append("【白名单】")
            if black:
                desc_list.append("【黑名单】")
            if pause_buy:
                desc_list.append("【暂不买】")
            # 获取持仓
            positions = PositionManager.latest_positions
            sell_rules_count = len(SellRuleManager().list_can_excut_rules_cache())
            fdata = {"code": code, "total": 0, "available": 0, "sell_rules_count": sell_rules_count}
            fdata = {"code": code, "total": 0, "available": 0, "sell_rules_count": sell_rules_count,
                     "code_info": (code, code_name), "desc": "".join(desc_list)}
            if positions:
                for d in positions:
                    code_name = gpcode_manager.get_code_name(d["securityID"])
trade/sell/sell_manager.py
New file
@@ -0,0 +1,43 @@
"""
卖票管理器
"""
import concurrent.futures
import time
from log_module import async_log_util
from log_module.log import logger_trade
from trade.huaxin import huaxin_trade_order_processor, huaxin_trade_api
from utils import huaxin_util, sell_util, tool
__cancel_sell_thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=8)
# 撤卖单
def __cancel_sell_order(code, order_ref):
    for i in range(0, 10):
        time.sleep(0.2)
        order_entity = huaxin_trade_order_processor.TradeResultProcessor.get_huaxin_order_by_order_ref(order_ref)
        if order_entity:
            if order_entity.orderStatus == huaxin_util.TORA_TSTP_OST_AllTraded:
                # 成交的就不需要撤单了
                return
    try:
        result = huaxin_trade_api.cancel_order(2, code, None, orderRef=order_ref)
    except Exception as e:
        logger_trade.exception(e)
# 开始撤单
def start_sell(code, volume, price_type, limit_up_price, limit_down_price, current_price, blocking=False,
               request_id=None):
    price = sell_util.get_sell_price(price_type, limit_up_price, limit_down_price, current_price)
    if not price:
        raise Exception("价格获取出错")
    async_log_util.info(logger_trade, f"API卖:  单价-{price}")
    order_ref = huaxin_util.create_order_ref()
    result = huaxin_trade_api.order(2, code, volume, price, order_ref=order_ref,
                                    blocking=blocking, request_id=request_id)
    # 如果是在正常交易时间提交的2s之内还未成交的需要撤单
    if int("092958") <= int(tool.get_now_time_str().replace(":", "")) <= int("145655"):
        __cancel_sell_thread_pool.submit(lambda: __cancel_sell_order(code, order_ref))
    return result
trade/sell/sell_rule_manager.py
File was renamed from trade/sell_rule_manager.py
@@ -2,17 +2,23 @@
卖出规则管理
"""
import json
import threading
from db import mysql_data_delegate as mysql_data
from utils import tool
import concurrent.futures
class SellRule:
    def __init__(self, id_=None, code=None, buy1_volume=None, sell_volume=None, day=None, create_time=None, excuted=0,
    def __init__(self, id_=None, code=None, buy1_volume=None, buy1_price=None, sell_volume=None, sell_price_type=None,
                 day=None, create_time=None, excuted=0,
                 end_time=None):
        self.day = day
        self.create_time = create_time
        self.sell_volume = sell_volume
        self.buy1_volume = buy1_volume
        self.buy1_price = buy1_price
        self.sell_price_type = sell_price_type
        self.code = code
        self.id_ = id_
        self.excuted = 0
@@ -37,6 +43,11 @@
class SellRuleManager:
    __instance = None
    __sell_rules_dict_cache = {}
    # 卖出锁
    __sell_lock_dict = {}
    __mysql_excute_thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=10)
    def __new__(cls, *args, **kwargs):
        if not cls.__instance:
@@ -59,11 +70,13 @@
                rule.id_ = r[0]
                rule.code = r[1]
                rule.buy1_volume = r[2]
                rule.sell_volume = r[3]
                rule.day = r[4]
                rule.create_time = r[5]
                rule.excuted = r[6]
                rule.end_time = r[7]
                rule.buy1_price = r[3]
                rule.sell_volume = r[4]
                rule.sell_price_type = r[5]
                rule.day = r[6]
                rule.create_time = r[7]
                rule.excuted = r[8]
                rule.end_time = r[9]
                fresults.append(rule)
        return fresults
@@ -75,8 +88,9 @@
        if not rule.day:
            rule.day = tool.get_now_date_str()
        mysql_data.Mysqldb().execute(
            "insert into sell_rules(_id,code,buy1_volume,sell_volume,day,create_time,excuted,end_time) values ('%s','%s','%s','%s','%s',now() ,'%s','%s') " % (
                rule.id_, rule.code, rule.buy1_volume, rule.sell_volume, rule.day, rule.excuted, rule.end_time))
            "insert into sell_rules(_id,code,buy1_volume,buy1_price,sell_volume,sell_price_type,day,create_time,excuted,end_time) values ('%s','%s','%s','%s','%s','%s','%s',now() ,'%s','%s') " % (
                rule.id_, rule.code, rule.buy1_volume, rule.buy1_price, rule.sell_volume, rule.sell_price_type,
                rule.day, rule.excuted, rule.end_time))
        self.__sell_rules_dict_cache[_id] = rule
    # 删除规则
@@ -104,9 +118,23 @@
    def excute_sell(self, _id):
        if _id in self.__sell_rules_dict_cache:
            self.__sell_rules_dict_cache[_id].excuted = 1
        mysql_data.Mysqldb().execute(f"update sell_rules r set r.excuted=1 where r._id='{_id}'")
        self.__mysql_excute_thread_pool.submit(mysql_data.Mysqldb().execute, f"update sell_rules r set r.excuted=1 where r._id='{_id}'")
    # 请求卖出锁
    def require_sell_lock(self, _id):
        if _id not in self.__sell_lock_dict:
            self.__sell_lock_dict[_id] = threading.RLock()
        self.__sell_lock_dict[_id].acquire()
    # 释放卖出锁
    def release_sell_lock(self, _id):
        if _id in self.__sell_lock_dict:
            self.__sell_lock_dict[_id].release()
    # 根据ID获取内容
    def get_by_id(self, _id):
        return self.__sell_rules_dict_cache.get(_id)
if __name__ == "__main__":
    SellRuleManager().del_rule("20231123142639_000333")
    SellRuleManager().excute_sell("20231123142919_000333")
    SellRuleManager().list_rules("2023-12-01")
trade/trade_huaxin.py
@@ -37,7 +37,7 @@
    async_log_util.info(logger_trade, f"{code} trade_huaxin.order_volume 开始")
    try:
        price = round(float(price), 2)
        if code.find("00") != 0 and code.find("60") != 0:
        if not tool.is_shsz_code(code):
            raise Exception("只支持00开头与60开头的代码下单")
        # 保存下单信息
        shadow_price = tool.get_shadow_price(price)
trade/trade_juejin.py
@@ -66,7 +66,7 @@
def order_volume(code, price, count):
    if not constant.TRADE_ENABLE:
        return
    if code.find("00") != 0 and code.find("60") != 0:
    if not tool.is_shsz_code(code):
        raise Exception("只支持00开头与60开头的代码下单")
    code_str = code
    if code[0:2] == '00':
utils/sell_util.py
New file
@@ -0,0 +1,33 @@
"""
卖票相关类
"""
# 获取卖价
from utils import tool
def get_sell_price(price_type,limit_up_price,limit_down_price, current_price):
    price = None
    if price_type == 0:
        if not current_price:
            raise Exception("没有获取到L1现价")
        price = tool.get_buy_min_price(current_price)
    elif price_type == 1:
        if not limit_down_price:
            raise Exception("没有获取到跌停价")
        price = round(float(limit_down_price), 2)
    elif price_type == 2:
        if not limit_up_price:
            raise Exception("没有获取到涨停价")
        price = round(float(limit_up_price), 2)
    elif price_type == 3:
        if not current_price:
            raise Exception("没有获取到L1现价")
        price = current_price
    elif price_type == 4:
        if not current_price:
            raise Exception("没有获取到L1现价")
        price = round(float(current_price) - 0.05, 2)
    else:
        raise Exception("价格类型错误")
    return price