-
Notifications
You must be signed in to change notification settings - Fork 0
/
mongo_tool.py
95 lines (80 loc) · 2.87 KB
/
mongo_tool.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
#!/usr/bin/env python
# coding=utf-8
"""
@desc: 对pymongo模块的简单封装
@author: luluo
@date: 2018/9/10
"""
import pymongo
from pymongo import client_session
class MongoConn(object):
"""
for mongodb
"""
sep = ":"
def __init__(self, conf=None):
#uri = 'mongodb://{username}:{password}@{host}:{port}/'.format(**conf)
uri = 'mongodb://localhost:27017'
self.client = pymongo.MongoClient(uri)
self.db = None
self.coll = None
def close(self):
try:
self.client.close()
except DbException as e:
self.client = None
def choose_db(self, db_name):
try:
self.db = self.client[db_name]
return True
except DbException as e:
return False
def choose_coll(self, coll_name):
if self.sep in coll_name:
db, coll_name = coll_name.split(self.sep)
if not self.choose_db(db):
return False
# if coll_name not in self.db.collection_names():
# return False
self.coll = self.db[coll_name]
return True
def get_db(self, db_name=None):
if db_name is not None:
return self.client[db_name]
return self.db
# def insert(self, coll_name, info):
# self.coll = self.get_coll(coll_name)
# return self.coll.insert(info)
def mset(self, coll_name, info):
with self.client.start_session(causal_consistency=True) as session:
self.coll = self.get_coll(coll_name)
if isinstance(info, list):
try:
return self.coll.insert_many(info, session=session)
except Exception as e:
raise ValueError(e)
elif isinstance(info, dict):
try:
self.coll.insert_one(info, session=session)
except Exception as e:
raise ValueError(e)
else:
raise Exception("It doesn't support this type of info")
def mput(self, coll_name, old, new):
with self.client.start_session(causal_consistency=True) as session:
self.coll = self.get_coll(coll_name)
try:
return self.coll.update_many(old, {"$set": new,
"$currentDate": {"lastModified": True}},
session=session)
except Exception as e:
return e
def get_coll(self, coll_name=None):
if coll_name is not None:
if self.sep in coll_name:
db_name, coll_name = coll_name.split(self.sep)
return self.client[db_name][coll_name]
return self.db[coll_name]
return self.coll
def __getattr__(self, item, *args, **kwargs):
return getattr(self.coll, item, *args, **kwargs)