Administrator
5 天以前 48fb7a00951f91bdc707e5dd2d196e5bccb752c3
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
"""
卖出规则管理
"""
import json
import threading
 
from db import mysql_data_delegate as mysql_data
from utils import tool
import concurrent.futures
 
 
class SellRule:
    def __init__(self, id_=None, code=None, buy1_volume=None, buy1_price=None, sell_volume=None, sell_price_type=None,
                 day=None, create_time=None, excuted=0,
                 end_time=None, type=None, ):
        self.day = day
        self.create_time = create_time
        self.sell_volume = sell_volume
        self.buy1_volume = buy1_volume
        self.buy1_price = buy1_price
        self.sell_price_type = sell_price_type
        self.code = code
        self.id_ = id_
        self.excuted = 0
        self.end_time = end_time
        # 0-买入 1-买撤 2-卖 3-卖撤
        self.type = type
 
    def to_json_str(self):
        return json.dumps(vars(self))
 
    def to_dict(self):
        return vars(self)
 
    @classmethod
    def to_object(cls, json_str: str):
        d = json.loads(json_str)
        return SellRule(**d)
 
    @property
    def id(self):
        return self.id_
 
 
class TradeRuleManager:
    # 买入
    TYPE_BUY = 0
    # 卖撤
    TYPE_BUY_CANCEL = 1
    # 卖
    TYPE_SELL = 2
    # 卖撤
    TYPE_SELL_CANCEL = 3
 
    __instance = None
    __sell_rules_dict_cache = {}
    # 卖出锁
    __sell_lock_dict = {}
    __mysql_excute_thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=10)
 
    def __new__(cls, *args, **kwargs):
        if not cls.__instance:
            cls.__instance = super(TradeRuleManager, cls).__new__(cls, *args, **kwargs)
            cls.__instance.__load_datas()
        return cls.__instance
 
    def __load_datas(self):
        rules = self.list_rules()
        self.__sell_rules_dict_cache.clear()
        for rule in rules:
            self.__sell_rules_dict_cache[rule.id_] = rule
 
    def list_rules(self, day=tool.get_now_date_str()):
        results = mysql_data.Mysqldb().select_all(f"select * from sell_rules r where r.day='{day}'")
        fresults = []
        if results:
            for r in results:
                rule = SellRule()
                rule.id_ = r[0]
                rule.code = r[1]
                rule.buy1_volume = r[2]
                rule.buy1_price = r[3]
                rule.sell_volume = r[4]
                rule.sell_price_type = r[5]
                rule.day = r[6]
                rule.create_time = r[7]
                rule.excuted = r[8]
                rule.end_time = r[9]
                rule.type = r[10]
                fresults.append(rule)
        return fresults
 
    # 添加规则
    def add_rule(self, rule: SellRule):
        _id = tool.get_now_date_str('%Y%m%d') + tool.get_now_time_str().replace(":", "") + "_" + rule.code
        if not rule.id:
            rule.id_ = _id
        if not rule.day:
            rule.day = tool.get_now_date_str()
        mysql_data.Mysqldb().execute(
            "insert into sell_rules(_id,code,buy1_volume,buy1_price,sell_volume,sell_price_type,day,create_time,excuted,end_time,type) values ('%s','%s','%s','%s','%s','%s','%s',now() ,'%s','%s','%s') " % (
                rule.id_, rule.code, rule.buy1_volume, rule.buy1_price, rule.sell_volume, rule.sell_price_type,
                rule.day, rule.excuted, rule.end_time, rule.type))
        self.__sell_rules_dict_cache[_id] = rule
 
    def update_rule(self, rule: SellRule):
        if not rule.id_:
            raise Exception("缺失id")
        # 获取原来的数据
        old_rule: SellRule = self.get_by_id(rule.id_)
        rule.day = old_rule.day
        mysql_data.Mysqldb().execute(
            "update sell_rules set code = '%s',buy1_volume = '%s', buy1_price='%s',sell_volume = '%s',sell_price_type='%s',end_time = '%s' where _id='%s'" % (
                rule.code, rule.buy1_volume, rule.buy1_price, rule.sell_volume, rule.sell_price_type, rule.end_time,
                rule.id_))
        rule.excuted = old_rule.excuted
        rule.type = old_rule.type
        rule.create_time = old_rule.create_time
 
        self.__sell_rules_dict_cache[rule.id_] = rule
 
    # 删除规则
    def del_rule(self, _id):
        mysql_data.Mysqldb().execute(f"delete from sell_rules where _id='{_id}'")
        if _id in self.__sell_rules_dict_cache:
            self.__sell_rules_dict_cache.pop(_id)
 
    def list_rules_cache(self):
        return [self.__sell_rules_dict_cache[k] for k in self.__sell_rules_dict_cache]
 
    # 获取可以执行的规则
    def list_can_excut_rules_cache(self, types=None, code=None):
        if not types:
            types = [self.TYPE_BUY, self.TYPE_BUY_CANCEL, self.TYPE_SELL, self.TYPE_SELL_CANCEL]
        rules = []
        for k in self.__sell_rules_dict_cache:
            rule = self.__sell_rules_dict_cache[k]
            if code and code != rule.code:
                continue
            if rule.excuted == 0 and rule.type in types and rule.day == tool.get_now_date_str() and tool.trade_time_sub(
                    rule.end_time,
                    tool.get_now_time_str()) > 0:
                rules.append(rule)
        return rules
 
    # 执行卖
    def excuted(self, _id):
        if _id in self.__sell_rules_dict_cache:
            self.__sell_rules_dict_cache[_id].excuted = 1
        self.__mysql_excute_thread_pool.submit(mysql_data.Mysqldb().execute,
                                               f"update sell_rules r set r.excuted=1 where r._id='{_id}'")
 
    # 请求卖出锁
    def require_sell_lock(self, _id):
        if _id not in self.__sell_lock_dict:
            self.__sell_lock_dict[_id] = threading.RLock()
        self.__sell_lock_dict[_id].acquire()
 
    # 释放卖出锁
    def release_sell_lock(self, _id):
        if _id in self.__sell_lock_dict:
            self.__sell_lock_dict[_id].release()
 
    # 根据ID获取内容
    def get_by_id(self, _id):
        return self.__sell_rules_dict_cache.get(_id)
 
 
if __name__ == "__main__":
    TradeRuleManager().list_rules("2023-12-01")