-
Notifications
You must be signed in to change notification settings - Fork 1
/
whois.py
349 lines (312 loc) · 12 KB
/
whois.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
import json
import os
import pygtrie
import random
import re
import sys
from datetime import datetime, timedelta
from .utils import *
logger = get_logger('kusa')
CONFIG = load_config()
BOT = CONFIG['BOT']
ATBOT = f'[CQ:at,qq={BOT}]'
UNKNOWN = None
IDK = '我不知道'
taowa = '我俺你汝是谁爸妈爹娘父母爷奶姥'
qunyou = '群友'
MEMBER = os.path.expanduser('~/.kusa/member.json')
DEFAULT_DATA = {}
class Whois:
def __init__(self, **kwargs):
logger.info('初始化Whois')
if not os.path.exists(MEMBER):
dumpjson(DEFAULT_DATA, MEMBER)
self.api = kwargs['bot_api']
self._rosters = {}
self._update()
async def execute_async(self, message):
msg = message['raw_message'].strip()
group = str(message.get('group_id', ''))
user = str(message.get('user_id', ''))
atbot = False
''' ↓ ONLY FOR GROUP ↓ '''
if not group:
return None
if msg.startswith(ATBOT):
msg = msg[len(ATBOT):].strip()
atbot = True
if atbot and msg == '初始化群友':
return await self.init_group_member(group)
if msg == '查询群友':
return self.get_group_member(group)
if msg == '查询群友名单':
return self.get_group_member_list(group)
if re.search('是不是', msg):
prm = re.search('(.+)是不是(.+)[??]', msg)
if prm and prm[1] and prm[2]:
return self.alias_equals(group, user, prm[1], prm[2])
if atbot and (msg == '是谁?' or msg == '是谁?'):
return self.whois(group, user, ATBOT)
if not atbot and re.match('.*是谁[\??]', msg):
try:
obj = re.match('(.+)是谁[\??]', msg)[1]
return self.whois(group, user, obj)
except:
return IDK
if atbot and msg.startswith('我不是'):
return await self.del_alias(group, user, msg[3:])
if atbot and re.match('.+是.*', msg):
prm = re.match('(.+)是(.*)', msg)
try:
return self.add_alias(group, user, prm[1], prm[2].strip())
except:
return '嗯? {}'.format(str(sys.exc_info()))
if msg.startswith('请叫我') and atbot:
return self.set_default_alias(group, user, msg[3:])
if re.search('我(什么|谁|啥)(也|都)不是', msg) and atbot:
return self.del_all_alias(group, user)
def whois(self, group, user, obj):
self._update()
object = self.object_explainer(group, user, obj)
name = object['name']
uid = object['uid']
data = loadjson(MEMBER)
if name == '我':
obj = name
uid = str(BOT)
if name == '你':
if random.randint(1, 10) == 1:
return '你就是你'
try:
names = data[group][user]
except:
return f'我不认识{name}'
return f'你是{",".join(names)}!'
if not obj:
return IDK
if uid == UNKNOWN:
return f'{obj}?{IDK}'
if obj == name:
if random.randint(1, 10) == 1 or len(data[group][uid]) == 1:
return f'{name}就是{name}'
else:
return f'{name}是{",".join(data[group][uid][1:])}!'
else:
return f'{obj}是{data[group][uid][0]}'
def alias_equals(self, group, user, obj1, obj2):
self._update()
data = loadjson(MEMBER)
if not obj1 or not obj2:
return '啥?'
if obj1 == obj2:
return '这不是一样嘛'
root1 = self.object_explainer(group, user, obj1)['name']
root2 = self.object_explainer(group, user, obj2)['name']
if root1 == '我':
root1 = self.object_explainer(group, user, str(BOT))['name']
elif root1 == '你':
root1 = self.object_explainer(group, user, user)['name']
if root2 == '我':
root2 = self.object_explainer(group, user, str(BOT))['name']
elif root2 == '你':
root2 = self.object_explainer(group, user, user)['name']
if obj1 == '你':
obj1 = '我'
elif obj1 == '我':
obj1 = '你'
if obj2 == '你':
obj2 = '我'
elif obj2 == '我':
obj2 = '你'
if root1 and root2:
if root1 == root2:
return f'{obj1}是{root1},{obj2}也是{root2},所以{obj1}是{obj2}'
else:
return f'{obj1}是{root1},{obj2}是{root2},所以{obj1}不是{obj2}'
else:
return random.choice([
IDK,
'难说,毕竟兵不厌诈',
'不好说,我只能说懂的都懂',
'不知道,毕竟我只是一只小猫咪',
'それはどうかな…',
])
def add_alias(self, group, user, subject, object):
if not object:
return '是啥?'
self._update()
data = loadjson(MEMBER)
sbj = self.object_explainer(group, user, subject)
''' 如果默认名字存在,则obj是object对应的默认名字 '''
''' 如果默认名字不存在,则obj是object '''
obj = self.object_explainer(group, user, object)['name'] or object
''' 获取obj的owner '''
owner = self.get_uid(group, obj)
''' obj已被占用,即object存在对应的默认名字,现在obj是object对应的默认名字 '''
if owner != UNKNOWN:
''' 尝试将object转换成已记录的大小写形式 '''
# 遍历群友名单中owner的所有名字
for n in data[group][owner]:
# 除了大小写可能不同以外完全一致
if n.lower() == object.lower():
obj = n
break
# owner是sbj本身
if owner == sbj['uid']:
return f'{sbj["name"]}已经是{obj}了'
# owner是其他人
return f'唔得,{data[group][owner][0]}已经是{obj}了'
''' obj未被占用,即object不存在对应的默认名字,现在obj是object '''
if obj in taowa or True in [i in obj for i in taowa]:
return '不准套娃'
if qunyou in obj:
return '唔得,大家都是群友'
''' sbj准备 '''
if group not in data:
data[group] = {}
if sbj['uid'] not in data[group]:
if sbj['uid'] == UNKNOWN:
return f'我们群里有{subject}吗?'
data[group][sbj['uid']] = []
data[group][sbj['uid']].append(obj)
dumpjson(data, MEMBER)
self._update()
if not sbj["name"]:
sbj["name"] = subject
return f'好,现在{sbj["name"]}是{object}了'
def set_default_alias(self, group, user, name):
self._update()
if self.add_alias(group, user, '我', name) == '不准套娃':
return '不准套娃'
data = loadjson(MEMBER)
data[group][user].remove(name)
data[group][user] = [name,] + data[group][user]
dumpjson(data, MEMBER)
self._update()
return f'好的,{name}'
async def del_alias(self, group, user, name):
self._update()
data = loadjson(MEMBER)
if group not in data:
return None
elif user not in data[group]:
return '你谁啊?'
elif name in data[group][user]:
reply = f'好,你不再是{name}了'
data[group][user].remove(name)
if len(data[group][user]) == 0:
data[group].pop(user)
await self.init_group_member(group)
reply += '\n现在你是{}了'.format(data[group][user][0])
dumpjson(data, MEMBER)
self._update()
return reply
else:
return f'你本来就不是{name}'
def del_all_alias(self, group, user):
self._update()
data = loadjson(MEMBER)
if group not in data:
return None
elif user not in data[group]:
return '你谁啊?'
else:
if len(data[group][user]) == 1:
return f'唔得,你只剩下{data[group][user][0]}了'
to_del = data[group][user][1:]
data[group][user] = data[group][user][0:1]
dumpjson(data, MEMBER)
self._update()
return f'好,你不再是{",".join(to_del)}了'
def get_uid(self, group, obj):
self._update()
''' 如果obj是at,返回obj中的uid '''
try:
uid = re.match('\[CQ:at,qq=(\d+)\]', obj.strip())[1]
return uid
except:
pass
''' 如果obj本身就是uid,返回obj '''
data = loadjson(MEMBER)
if group in data and obj in data[group]:
return obj
''' 如果obj不是at也不是uid,从群友名单中找obj对应的uid '''
if group in self._rosters:
if obj.lower() in self._rosters[group]:
return self._rosters[group][obj.lower()]
''' 找不到,返回UNKNOWN '''
return UNKNOWN
def object_explainer(self, group, user, obj) -> dict:
'''
输入为用户视角:“我”是用户,“你”是BOT
输出为BOT视角 :“我”是BOT,“你”是用户
'''
self._update()
obj = obj.strip()
data = loadjson(MEMBER)
if obj == '我':
uid = user
name = '你'
elif obj == '你':
uid = str(BOT)
name = '我'
else:
# obj对应的uid和默认名字
# 若不存在则为UNKNOWN
uid = self.get_uid(group, obj)
try:
name = data[group][uid][0]
except:
name = UNKNOWN
return {'uid': uid, 'name': name}
def get_group_member(self, group):
data = loadjson(MEMBER)
if group in data and len(data[group]):
return '本群有{}群友'.format(len(data[group]))
def get_group_member_list(self, group):
data = loadjson(MEMBER)
if group in data and len(data[group]):
return '本群群友有{}'.format(",".join([data[group][p][0] for p in data[group].keys()]))
else:
return "没有查询到本群群友"
async def init_group_member(self, group):
data = loadjson(MEMBER)
group_member_list = await self.api.get_group_member_list(group_id=group)
nickname_set = set()
user_id_set = set()
if group not in data:
data[group] = {}
cnt_old = len(data[group])
cnt_add = 0
cnt_del = 0
cnt_new = len(group_member_list)
for member in group_member_list:
nickname = member.get('card', '')
if not nickname:
nickname = member.get('nickname', '无法获取昵称')
while nickname in nickname_set:
nickname += '2'
nickname_set.add(nickname)
user_id_set.add(str(member['user_id']))
if str(member['user_id']) not in data[group]:
data[group][str(member['user_id'])] = [nickname,]
cnt_add += 1
for member in list(data[group].keys()):
if member not in user_id_set:
data[group].pop(member)
cnt_del += 1
dumpjson(data, MEMBER)
reply = '群 {} 初始化群友完成,原来有{}群友,新增了{}群友,移除了{}群友,现在有{}群友'.format(
group, cnt_old, cnt_add, cnt_del, cnt_new
)
logger.info(reply)
return reply
def _update(self):
data = loadjson(MEMBER)
self._rosters = {}
for group in data:
self._rosters[group] = pygtrie.CharTrie()
for user in data[group]:
for name in data[group][user]:
if name.lower() not in self._rosters[group]:
self._rosters[group][name.lower()] = user