Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ramen merge attempt #1 (Print subvolume path alongside subvolume ID) #3

Open
wants to merge 13 commits into
base: master
Choose a base branch
from
160 changes: 105 additions & 55 deletions subvolume.py → 0_check-btrfs-sub-size-diff__original.py
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@
import math, array
from functools import lru_cache
import multiprocessing,os
#from sortedcontainers import SortedList
import bisect

#function to convert a pair of positive integers to a single integer
#we want to decrease memory consumption, thus we need this trick
#http://szudzik.com/ElegantPairing.pdf
Expand Down Expand Up @@ -221,7 +221,7 @@ def add(self,tree,key,start,stop,inode):
#do not search deep, since we add one subvolume at a time
ranges=[]
for myrange,snapshotinodelist in extent.items():
if find_snapshot_in_list(snapshotinodelist,tree,False):
if find_snapshot_in_list(snapshotinodelist,tree,True):
ranges.append(myrange)
ranges.sort()

Expand Down Expand Up @@ -274,9 +274,9 @@ def add(self,tree,key,start,stop,inode):
#if newbase differs in an end of base then that end must
#be deleted because the end of the interval will be added
if base.lower>target.lower and newbase.lower!=base.lower:
self.delete_range(key,base.lower,tree,False)
self.delete_range(key,base.lower,tree,True)
if base.upper<target.upper and newbase.upper!=base.upper:
self.delete_range(key,base.upper,tree,False)
self.delete_range(key,base.upper,tree,True)

#target must be modifed as well if the inrvals partially
#exists already in base
Expand Down Expand Up @@ -418,10 +418,10 @@ def find_snapshot_size_to_current(self):
results[snapshot]+=self.find_snapshots_size([snapshot],[current])
return results

def disk_parse_queue(semaphore,lock,barrier,queue,path,tree):
with semaphore:
pid = os.getpid()
print("Parsing subvolume:",tree,"pid",pid)
#try to optimize parsing by piping, but to no avail
def disk_parse_pipe(pipe,path,tree):
print("Parsing subvolume:",tree)
fs=btrfs.FileSystem(path)
min_key=btrfs.ctree.Key(0,btrfs.ctree.EXTENT_DATA_KEY,0)
for header, data in btrfs.ioctl.search_v2(fs.fd, tree,min_key):
Expand All @@ -432,28 +432,99 @@ def disk_parse_queue(semaphore,lock,barrier,queue,path,tree):
#key = pool.apply(unique_number, (datum.disk_bytenr,datum.disk_num_bytes,))
stop=datum.offset+datum.num_bytes
#key=res.get()
queue.put((key,datum.offset,stop,datum.key.objectid,tree))
if lock.acquire(timeout=0):
#print("Waiting for queue to empty",tree,pid)
queue.join()
#print("queue emptied",tree,pid)
lock.release()
#queue.put(None)
pipe.send((key,datum.offset,stop,datum.key.objectid))
pipe.send(None)
pipe.close()
os.close(fs.fd)
del fs
if lock.acquire():
#print("Waiting for queue to empty",tree,pid)
queue.join()
#print("queue emptied",tree,pid)
print("Finished subvolume:",tree,pid)
lock.release()
i=barrier.wait()
if i == 0:
queue.put(None)
queue.join()
queue.close()



def pipe_add(data_tree,path,tree,analyze_files):
parent_conn, child_conn = multiprocessing.Pipe(False)
p = multiprocessing.Process(target=disk_parse_pipe, args=(child_conn,path,tree,))
p.start()
while True:
res=parent_conn.recv()
if res !=None:
if analyze_files:
data_tree.add(tree,res[0],res[1],res[2],res[3])
else:
data_tree.add(tree,res[0],res[1],res[2],0)
else:
break
p.join()


#try to optimize parsing by using multiprocessing


#return the data to add for this extent
#unfortunately we have to open each time the filesystem and reparse partially the
#data
def actual_extent_parsing(item):
header,path,tree=item
result=None
fs=btrfs.FileSystem(path)
key=btrfs.ctree.Key(header.objectid,btrfs.ctree.EXTENT_DATA_KEY,header.offset)
for header,data in btrfs.ioctl.search_v2(fs.fd, tree,key,nr_items=1):
datum=btrfs.ctree.FileExtentItem(header,data)
if datum.type != btrfs.ctree.FILE_EXTENT_INLINE:# and datum.disk_bytenr !=0:
key=unique_number(datum.disk_bytenr,datum.disk_num_bytes)
stop=datum.offset+datum.num_bytes
result = (key,datum.offset,stop,datum.key.objectid)
os.close(fs.fd)
del fs
return result


#main function to parse data from disk, generate 'interesting' extents
def generate_extents(path,tree):
#print("Parsing subvolume:",tree,path)
#pool = multiprocessing.Pool(processes=1)
fs=btrfs.FileSystem(path)
min_key=btrfs.ctree.Key(0,btrfs.ctree.EXTENT_DATA_KEY,0)
for header, _ in btrfs.ioctl.search_v2(fs.fd, tree,min_key):
if header.type == btrfs.ctree.EXTENT_DATA_KEY:
yield header,path,tree
os.close(fs.fd)
del fs
return


#parallelize parsing, return the data without order what is the best value for chunk size?
def disk_parse_parallel(pool,data_tree,path,tree,analyze_files):
print("Parsing subvolume:",tree)
#pool = multiprocessing.Pool(processes=4)
#fs=btrfs.FileSystem(path)
for res in pool.imap_unordered(actual_extent_parsing, generate_extents(path,tree),128):
#print(res)
if res!=None:
if analyze_files:
data_tree.add(tree,res[0],res[1],res[2],res[3])
else:
data_tree.add(tree,res[0],res[1],res[2],0)



#main function to parse data from disk and add the to the tree of extents, sequentially
def disk_parse(data_tree,fs,tree,analyze_files):
print("Parsing subvolume:",tree)
#pool = multiprocessing.Pool(processes=1)
min_key=btrfs.ctree.Key(0,btrfs.ctree.EXTENT_DATA_KEY,0)
for header, data in btrfs.ioctl.search_v2(fs.fd, tree,min_key):
if header.type == btrfs.ctree.EXTENT_DATA_KEY:
datum=btrfs.ctree.FileExtentItem(header,data)
if datum.type != btrfs.ctree.FILE_EXTENT_INLINE:# and datum.disk_bytenr !=0:
key=unique_number(datum.disk_bytenr,datum.disk_num_bytes)
#key = pool.apply(unique_number, (datum.disk_bytenr,datum.disk_num_bytes,))
stop=datum.offset+datum.num_bytes
#key=res.get()
if analyze_files:
data_tree.add(tree,key,datum.offset,stop,datum.key.objectid)
else:
data_tree.add(tree,key,datum.offset,stop,0)

def main():
multiprocessing.set_start_method('spawn')
parser = argparse.ArgumentParser()
Expand All @@ -463,8 +534,6 @@ def main():
help="path of the btrfs filesystem")
parser.add_argument("-r", "--root", type=int,default=5,
help="current active subvolume to analyze first, default is 5")
parser.add_argument("-c", "--cpus", type=int, default=2*len(os.sched_getaffinity(0)),
help="Number of disk reading processes to spawn. More processes means more memory usage.")
group = parser.add_mutually_exclusive_group()
group.add_argument('-i', '--ignore', action='store_true',help="Do not analyze the specified subvolumes")
group.add_argument('-o', '--only', action='store_true',help="Analyze only the specified subvolumes")
Expand All @@ -489,7 +558,7 @@ def main():
args.ignore=True

#remove the unneeded subvolumes
if args.ignore:
if args.ignore:
for item in special_subvolumes:
try:
parse_trees.remove(item)
Expand All @@ -508,36 +577,17 @@ def main():
changed_snapshots.rotate(-1)
parse_trees=list(changed_snapshots)
data_tree.add_snapshots(parse_trees)

#parse the trees from newer to older
parse_trees=list(reversed(parse_trees))
pool = multiprocessing.Pool(processes=4)
print("Subvolumes to parse:",parse_trees)

queue = multiprocessing.JoinableQueue()
barrier = multiprocessing.Barrier(len(parse_trees))
lock = multiprocessing.RLock()
semaphore = multiprocessing.BoundedSemaphore(args.cpus)
list_of_producers=[]
for tree in parse_trees:
#process=multiprocessing.Process(target=disk_parse_queue,name="Sub"+str(tree), args=(semaphore,queue,args.path,tree),daemon=True)
process=multiprocessing.Process(target=disk_parse_queue,name="Sub"+str(tree), args=(semaphore,lock,barrier,queue,args.path,tree),daemon=True)
list_of_producers.append(process)
process.start()

while True:
res=queue.get()
queue.task_done()
if res !=None:
if args.files:
data_tree.add(res[4],res[0],res[1],res[2],res[3])
else:
data_tree.add(res[4],res[0],res[1],res[2],0)
else:
break

print("Parsing Done")
for process in list_of_producers:
process.join()
#disk_parse(data_tree,fs,tree,args.files)
disk_parse_parallel(pool,data_tree,args.path,tree,args.files)
#pipe_add(data_tree,args.path,tree,args.files)
pool.close()
pool.join()

data_tree.transform()
unique_sum=0
Expand Down
5 changes: 5 additions & 0 deletions 1_check-btrfs-sub-size-diff__prototype.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
sudo btrfs subvolume list $1 --sort=-rootid
echo -e "\n############################################################################\n"
sudo ./"0_check-btrfs-sub-size-diff__original.py" $1
echo -e "\n############################################################################\n"
sudo btrfs subvolume list $1 --sort=-rootid
Loading