-
Notifications
You must be signed in to change notification settings - Fork 265
/
configdb.h
270 lines (225 loc) · 11.4 KB
/
configdb.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
#pragma once
#include <string>
#include <map>
#include "sonicv2connector.h"
#include "redistran.h"
namespace swss {
class ConfigDBConnector_Native : public SonicV2Connector_Native
{
public:
static constexpr const char *INIT_INDICATOR = "CONFIG_DB_INITIALIZED";
ConfigDBConnector_Native(bool use_unix_socket_path = false, const char *netns = "");
void db_connect(std::string db_name, bool wait_for_init = false, bool retry_on = false);
void connect(bool wait_for_init = true, bool retry_on = false);
virtual void set_entry(std::string table, std::string key, const std::map<std::string, std::string>& data);
virtual void mod_entry(std::string table, std::string key, const std::map<std::string, std::string>& data);
std::map<std::string, std::string> get_entry(std::string table, std::string key);
std::vector<std::string> get_keys(std::string table, bool split = true);
std::map<std::string, std::map<std::string, std::string>> get_table(std::string table);
void delete_table(std::string table);
virtual void mod_config(const std::map<std::string, std::map<std::string, std::map<std::string, std::string>>>& data);
virtual std::map<std::string, std::map<std::string, std::map<std::string, std::string>>> get_config();
std::string getKeySeparator() const;
std::string getTableNameSeparator() const;
std::string getDbName() const;
protected:
std::string m_table_name_separator = "|";
std::string m_key_separator = "|";
std::string m_db_name;
};
#ifdef SWIG
%pythoncode %{
## Note: diamond inheritance, reusing functions in both classes
class ConfigDBConnector(SonicV2Connector, ConfigDBConnector_Native):
## Note: there is no easy way for SWIG to map ctor parameter netns(C++) to namespace(python)
def __init__(self, use_unix_socket_path = False, namespace = '', **kwargs):
if 'decode_responses' in kwargs and kwargs.pop('decode_responses') != True:
raise ValueError('decode_responses must be True if specified, False is not supported')
if namespace is None:
namespace = ''
super(ConfigDBConnector, self).__init__(use_unix_socket_path = use_unix_socket_path, namespace = namespace)
# Trick: to achieve static/instance method "overload", we must use initize the function in ctor
# ref: https://stackoverflow.com/a/28766809/2514803
self.serialize_key = self._serialize_key
self.deserialize_key = self._deserialize_key
## Note: callback is difficult to implement by SWIG C++, so keep in python
self.handlers = {}
self.fire_init_data = {}
@property
def KEY_SEPARATOR(self):
return self.getKeySeparator()
@property
def TABLE_NAME_SEPARATOR(self):
return self.getTableNameSeparator()
@property
def db_name(self):
return self.getDbName()
## Note: callback is difficult to implement by SWIG C++, so keep in python
def listen(self, init_data_handler=None):
## Start listen Redis keyspace event. Pass a callback function to `init` to handle initial table data.
self.pubsub = self.get_redis_client(self.db_name).pubsub()
self.pubsub.psubscribe("__keyspace@{}__:*".format(self.get_dbid(self.db_name)))
# Build a cache of data for all subscribed tables that will recieve the initial table data so we dont send duplicate event notifications
init_data = {tbl: self.get_table(tbl) for tbl in self.handlers if init_data_handler or self.fire_init_data[tbl]}
# Function to send initial data as series of updates through individual table callback handlers
def load_data(tbl, data):
if self.fire_init_data[tbl]:
for row, x in data.items():
self.__fire(tbl, row, x)
return False
return True
init_callback_data = {tbl: data for tbl, data in init_data.items() if load_data(tbl, data)}
# Pass all initial data that we DID NOT send as updates to handlers through the init callback if provided by caller
if init_data_handler:
init_data_handler(init_callback_data)
while True:
item = self.pubsub.listen_message()
if item['type'] == 'pmessage':
key = item['channel'].split(':', 1)[1]
try:
(table, row) = key.split(self.TABLE_NAME_SEPARATOR, 1)
if table in self.handlers:
client = self.get_redis_client(self.db_name)
data = self.raw_to_typed(client.hgetall(key))
if table in init_data and row in init_data[table]:
cache_hit = init_data[table][row] == data
del init_data[table][row]
if not init_data[table]:
del init_data[table]
if cache_hit: continue
self.__fire(table, row, data)
except ValueError:
pass #Ignore non table-formated redis entries
## Dynamic typed functions used in python
@staticmethod
def raw_to_typed(raw_data):
if raw_data is None:
return None
typed_data = {}
for raw_key in raw_data:
key = raw_key
# "NULL:NULL" is used as a placeholder for objects with no attributes
if key == "NULL":
pass
# A column key with ending '@' is used to mark list-typed table items
# TODO: Replace this with a schema-based typing mechanism.
elif key.endswith("@"):
value = raw_data[raw_key].split(',')
typed_data[key[:-1]] = value
else:
typed_data[key] = raw_data[raw_key]
return typed_data
@staticmethod
def typed_to_raw(typed_data):
if typed_data is None:
return {}
elif len(typed_data) == 0:
return { "NULL": "NULL" }
raw_data = {}
for key in typed_data:
value = typed_data[key]
if type(value) is list:
raw_data[key+'@'] = ','.join(value)
else:
raw_data[key] = str(value)
return raw_data
# Note: we could not use a class variable for KEY_SEPARATOR, but original dependent code is using
# these static functions. So we implement both static and instance functions with the same name.
# The static function will behave according to ConfigDB separators.
@staticmethod
def serialize_key(key, separator='|'):
if type(key) is tuple:
return separator.join(key)
else:
return str(key)
def _serialize_key(self, key):
return ConfigDBConnector.serialize_key(key, self.KEY_SEPARATOR)
@staticmethod
def deserialize_key(key, separator='|'):
tokens = key.split(separator)
if len(tokens) > 1:
return tuple(tokens)
else:
return key
def _deserialize_key(self, key):
return ConfigDBConnector.deserialize_key(key, self.KEY_SEPARATOR)
def __fire(self, table, key, data):
if table in self.handlers:
handler = self.handlers[table]
handler(table, key, data)
def subscribe(self, table, handler, fire_init_data=False):
self.handlers[table] = handler
self.fire_init_data[table] = fire_init_data
def unsubscribe(self, table):
if table in self.handlers:
self.handlers.pop(table)
def set_entry(self, table, key, data):
key = self.serialize_key(key)
raw_data = self.typed_to_raw(data)
super(ConfigDBConnector, self).set_entry(table, key, raw_data)
def mod_config(self, data):
raw_config = {}
for table_name, table_data in data.items():
raw_config[table_name] = {}
if table_data == None:
continue
for key, data in table_data.items():
raw_key = self.serialize_key(key)
raw_data = self.typed_to_raw(data)
raw_config[table_name][raw_key] = raw_data
super(ConfigDBConnector, self).mod_config(raw_config)
def mod_entry(self, table, key, data):
key = self.serialize_key(key)
raw_data = self.typed_to_raw(data)
super(ConfigDBConnector, self).mod_entry(table, key, raw_data)
def get_entry(self, table, key):
key = self.serialize_key(key)
raw_data = super(ConfigDBConnector, self).get_entry(table, key)
return self.raw_to_typed(raw_data)
def get_keys(self, table, split=True):
keys = super(ConfigDBConnector, self).get_keys(table, split)
ret = []
for key in keys:
ret.append(self.deserialize_key(key))
return ret
def get_table(self, table):
data = super(ConfigDBConnector, self).get_table(table)
ret = {}
for row, entry in data.items():
entry = self.raw_to_typed(entry)
ret[self.deserialize_key(row)] = entry
return ret
def get_config(self):
data = super(ConfigDBConnector, self).get_config()
ret = {}
for table_name, table in data.items():
for row, entry in table.items():
entry = self.raw_to_typed(entry)
ret.setdefault(table_name, {})[self.deserialize_key(row)] = entry
return ret
%}
#endif
class ConfigDBPipeConnector_Native: public ConfigDBConnector_Native
{
public:
ConfigDBPipeConnector_Native(bool use_unix_socket_path = false, const char *netns = "");
void set_entry(std::string table, std::string key, const std::map<std::string, std::string>& data) override;
void mod_config(const std::map<std::string, std::map<std::string, std::map<std::string, std::string>>>& data) override;
std::map<std::string, std::map<std::string, std::map<std::string, std::string>>> get_config() override;
private:
static const int64_t REDIS_SCAN_BATCH_SIZE = 30;
int _delete_entries(DBConnector& client, RedisTransactioner& pipe, const char *pattern, int cursor);
void _delete_table(DBConnector& client, RedisTransactioner& pipe, std::string table);
void _set_entry(RedisTransactioner& pipe, std::string table, std::string key, const std::map<std::string, std::string>& data);
void _mod_entry(RedisTransactioner& pipe, std::string table, std::string key, const std::map<std::string, std::string>& data);
int _get_config(DBConnector& client, RedisTransactioner& pipe, std::map<std::string, std::map<std::string, std::map<std::string, std::string>>>& data, int cursor);
};
#ifdef SWIG
%pythoncode %{
class ConfigDBPipeConnector(ConfigDBConnector, ConfigDBPipeConnector_Native):
## Note: diamond inheritance, reusing functions in both classes
def __init__(self, **kwargs):
super(ConfigDBPipeConnector, self).__init__(**kwargs)
%}
#endif
}