forked from couchbase/couchbase-cli
-
Notifications
You must be signed in to change notification settings - Fork 0
/
cbbackupwrapper
executable file
·354 lines (300 loc) · 14 KB
/
cbbackupwrapper
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
#!/usr/bin/env python3
# -*-python-*-
import pump_transfer
import pump
import base64
import optparse
import os
import platform
import queue
import re
import json
import subprocess
import sys
import threading
import time
import urllib.request, urllib.error, urllib.parse
from typing import Any, List
from cluster_manager import ClusterManager
"""Written by Daniel Owen owend@couchbase.com on 27 June 2014
Version 1.4 Last updated 10 July 2014
The current implementation of cbbackup that comes with Couchbase Server 2.5.1
uses only one thead per node. Therefore when using cbbackup with the single-node
parameter we are limited to one thread - this impacts performance.
This script provides a wrapper to invoke multiple cbbackup processes.
It automatically detects which buckets and vbuckets are
on the node. It allow the user to specify how many vbuckets to backup in a single
cbbackup process and then invokes the necessary number of processes.
An example invocation is as follows:
python cbbackupwrapper.py http://127.0.0.1:8091 ../backup/ --single-node -n 4 \
-u Administrator -p myPassword --path /opt/couchbbase/bin/ -v
This will backup all the buckets on node 127.0.0.1 into ../backup
It will backup 4 vbuckets per cbbackup process
Access to the cluster is authenticated using username=Administrator and
password=myPassword.and cbbackup will be found in /opt/couchbase/bin
Run python cbbackupwrapper -h for more information.
See the cbrestorewrapper.py script for restoring backups made with this script."""
bucketList = []
vbucketList = []
backup_complete = False
process_queue = queue.Queue()
lock = threading.Lock()
def _exit_if_errors(errors):
if errors:
for error in errors:
print("ERROR: " + error)
sys.exit(1)
def opt_extra_help(parser, extra_defaults):
extra_help = "; ".join([f'{k}={extra_defaults[k][0]} ({extra_defaults[k][1]})'
for k in sorted(extra_defaults.keys())])
group = optparse.OptionGroup(parser, "Available extra config parameters (-x)",
extra_help)
parser.add_option_group(group)
def opt_extra_defaults():
return {
"batch_max_size": (1000, "Transfer this # of documents per batch"),
"batch_max_bytes": (400000, "Transfer this # of bytes per batch"),
"cbb_max_mb": (100000, "Split backup file on destination cluster if it exceeds MB"),
"max_retry": (10, "Max number of sequential retries if transfer fails"),
"report": (5, "Number batches transferred before updating progress bar in console"),
"report_full": (2000, "Number batches transferred before emitting progress information in console"),
"recv_min_bytes": (4096, "Amount of bytes for every TCP/IP call transferred"),
"rehash": (0, "For value 1, rehash the partition id's of each item; \
this is needed when transferring data between clusters with different number of partitions, \
such as when transferring data from an OSX server to a non-OSX cluster"),
"conflict_resolve":(1, "By default, enable conflict resolution."),
"data_only": (0, "For value 1, only transfer data from a backup file or cluster"),
"design_doc_only": (0, "For value 1, transfer design documents only from a backup file or cluster"),
"seqno": (0, "By default, start seqno from beginning."),
"uncompress": (0, "For value 1, restore data in uncompressed mode"),
"backoff_cap": (10, "Max backoff time during rebalance period"),
"flow_control": (1, "For value 0, disable flow control to improve throughput"),
"dcp_consumer_queue_length": (1000,"A DCP client needs a queue for incoming documents/messages. A large length is more efficient, but memory proportional to length*avg. doc size. Below length 150, performance degrades significantly."),
}
def opt_parse_extra(extra, extra_defaults):
"""Convert an extra string (comma-separated key=val pairs) into
a dict, using default values from extra_defaults dict."""
extra_in = dict([(x[0], x[1]) for x in
[(kv + '=').split('=') for kv in
(extra or "").split(',')]])
for k, v in extra_in.items():
if k and not extra_defaults.get(k):
sys.exit("error: unknown extra option: " + k)
return dict([(k, float(extra_in.get(k, extra_defaults[k][0])))
for k in extra_defaults.keys()])
def argumentParsing():
usage = "usage: %prog CLUSTER BACKUPDIR OPTIONS"
parser = optparse.OptionParser(usage)
opt_extra_help(parser, opt_extra_defaults())
parser.add_option('-b', '--bucket-source', default='',
help='Specify the bucket to backup. Defaults to all buckets')
parser.add_option('--single-node', action='store_true',
default=False, help='use a single server node from the source only')
parser.add_option('-u', '--username', default='Administrator',
help='REST username for source cluster or server node. Default is Administrator')
parser.add_option('-p', '--password', default='PASSWORD',
help='REST password for source cluster or server node. Defaults to PASSWORD')
parser.add_option("-s", "--ssl",
action="store_true", default=False,
help="Transfer data with SSL enabled")
parser.add_option('-v', '--verbose', action='store_true',
default=False, help='Enable verbose messaging')
parser.add_option('--path', default='.',
help='Specify the path to cbbackup. Defaults to current directory')
parser.add_option('--port', default='11210',
help='Specify the bucket port. Defaults to 11210')
parser.add_option('-n', '--number', default='100',
help='Specify the number of vbuckets per process. Defaults to 100')
parser.add_option('-P', '--parallelism', default='1',
help='Number of vbucket backup jobs to run at a time. Defaults to 1')
parser.add_option('-x', '--extra', default=None,
help="""Provide extra, uncommon config parameters;
comma-separated key=val(,key=val)* pairs""")
try:
import pump_bfd2
parser.add_option("-m", "--mode",
action="store", type="string", default="diff",
help="backup mode: full, diff or accu [default:%default]")
except ImportError:
parser.add_option("-m", "--mode",
action="store", type="string", default="full",
help=optparse.SUPPRESS_HELP)
options, rest = parser.parse_args()
if len(rest) != 2:
parser.print_help()
sys.exit("\nError: please provide both cluster IP and backup directory path.")
opt_parse_extra(options.extra, opt_extra_defaults())
return options, rest[0], rest[1]
def findAllVbucketsForBucket(node: str, bucket: str, restport: str, username: str, password: str,
single_node: bool) -> List[Any]:
cluster = "http://" + node + ":" + restport
rest = ClusterManager(cluster, username, password, False, False, None, False)
result, errors = rest.get_bucket(bucket)
_exit_if_errors(errors)
if not single_node:
return list(range(len(result['vBucketServerMap']['vBucketMap'])))
else:
thisNode = None
for node in result["nodes"]:
if "thisNode" in node and node["thisNode"]:
thisNode = node["hostname"].split(":")[0]
break
if thisNode == None:
_exit_if_errors(["Unable to find vbuckets for %s, could not locate thisNode" % cluster])
serverIdx = None
serverList = result['vBucketServerMap']['serverList']
for index in range(len(serverList)):
if serverList[index].split(":")[0] == thisNode:
serverIdx = index
break
if serverIdx == None:
_exit_if_errors(["Unable to find vbuckets for %s, thisNode not in serverList" % cluster])
vbucketList = []
vbucketMap = result['vBucketServerMap']['vBucketMap']
for vbid in range(len(vbucketMap)):
if vbucketMap[vbid][0] == serverIdx:
vbucketList.append(vbid)
return vbucketList
# Get the buckets that exist on the cluster
def getBuckets(node: str, rest_port: str, username: str, password: str) -> List[Any]:
request = urllib.request.Request(
f'http://{node}:{rest_port}/pools/default/buckets')
base64string = base64.encodebytes(f'{username}:{password}'.encode()).decode().replace('\n', '')
request.add_header('Authorization', f'Basic {base64string}')
try:
response = urllib.request.urlopen(request)
except:
print('Authorization failed. Please check username and password.')
sys.exit(1)
bucketsOnCluster = []
data = json.loads(response.read())
for item in data:
if item['bucketType'] == 'memcached':
print(f'skipping bucket that is not a couchbase-bucket: {item["name"]}')
else:
bucket = item['name']
bucketsOnCluster.append(bucket)
return bucketsOnCluster
def consumer(id: int, verbose: bool):
while True:
try:
if backup_complete:
return
commandline = process_queue.get(block=True, timeout=2)
if verbose:
with lock:
print(f'T({id}): {commandline}')
p = subprocess.Popen(commandline, shell=True)
p.wait()
if p.returncode == 1:
with lock:
print(f'Error with backup for running {commandline}')
process_queue.task_done()
time.sleep(1)
except Exception as e:
if process_queue.empty():
return
print(f'Exception {str(e)}')
if __name__ == '__main__':
# Parse the arguments given.
args, cluster, backupDir = argumentParsing()
backup_exe = 'cbbackup'
if platform.system() == "Windows":
backup_exe = 'cbbackup.exe'
# Remove any white-spaces from start and end of strings
backupDir = backupDir.strip()
path = args.path.strip()
if path == ".":
path = os.path.abspath(path)
if backupDir == ".":
backupDir = os.path.abspath(backupDir)
# Check to see if root backup directory exists
if not os.path.isdir(backupDir):
try:
os.makedirs(backupDir)
except:
sys.exit(f'Cannot create backup root directory: {backupDir}')
# Check to see if path is correct
if not os.path.isdir(path):
print('The path to cbbackup does not exist')
print('Please run with a different path')
sys.exit(1)
if not os.path.isfile(os.path.join(path, backup_exe)):
print(f'cbbackup could not be found in {path}')
sys.exit(1)
# Check to see if log directory exists if not create it
dir = os.path.join(backupDir, 'logs')
try:
os.stat(dir)
except:
try:
os.mkdir(dir)
except:
print(f'Error trying to create directory {dir}')
sys.exit(1)
# Separate out node and REST port
matchObj = re.match(r'^http://(.*):(\d+)$', cluster, re.I)
if matchObj:
node = matchObj.group(1)
rest = matchObj.group(2)
else:
print("Please enter the source as http://hostname:port")
print("For example http://localhost:8091 or http://127.0.0.1:8091")
sys.exit(1)
# Check to see if backing-up all buckets or just a specified bucket
if args.bucket_source == '':
bucketList = getBuckets(
node, rest, args.username, args.password)
else:
# Check that the bucket exists
for item in getBuckets(node, rest, args.username, args.password):
if item == args.bucket_source:
bucketList.append(args.bucket_source)
if len(bucketList) == 0:
print(f'Bucket {args.bucket_source} does not exist')
print('Please enter a different bucket')
sys.exit(1)
# For each bucket
for item in bucketList:
perbucketvbucketlist = findAllVbucketsForBucket(
node, item, rest, args.username, args.password, args.single_node)
for item in perbucketvbucketlist:
if item not in vbucketList:
vbucketList.append(item)
# If a bucket was specfified then set-up the string to pass to cbbackup.
specific_bucket = ''
if len(bucketList) == 1:
specific_bucket = ' -b ' + bucketList[0]
extra_options = ''
if args.extra:
extra_options = ' -x ' + args.extra
mode_options = ''
if args.mode:
mode_options = ' -m ' + args.mode
ssl_option = ''
if args.ssl:
ssl_option = ' -s '
worker_threads = []
for i in range(int(args.parallelism)):
t = threading.Thread(target=consumer, args=(i, args.verbose,))
t.daemon = True
t.start()
worker_threads.append(t)
# Group the number of vbuckets per process
print('Waiting for the backup to complete...')
processes = []
for i in range(0, len(vbucketList), int(args.number)):
chunk = vbucketList[i:i + int(args.number)]
vbucketsname = str(chunk[0]) + '-' + str(chunk[-1])
command_line = '"' + os.path.join(path, backup_exe) + '"' + ' -v -t 1 --vbucket-list=' \
+ ''.join(str(chunk).split()) + ' http://' + node + ':' + rest + ' ' \
+ '"' + os.path.join(backupDir, vbucketsname) + '"' + ' -u ' + args.username \
+ ' -p ' + args.password + extra_options + mode_options + ssl_option + specific_bucket \
+ ' 2> ' + '"' + os.path.join(backupDir, 'logs', vbucketsname) + '.err' + '"'
process_queue.put(command_line)
process_queue.join()
backup_complete = True
for worker in worker_threads:
worker.join()
with lock:
print('SUCCESSFULLY COMPLETED!')