Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

load big files in chunks #66

Merged
merged 9 commits into from
Apr 27, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
250 changes: 210 additions & 40 deletions easyaccess/easyaccess.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ def colored(line, color): return line
import webbrowser
import signal

class KeyParser(argparse.ArgumentParser):
def error(self, message):
sys.exit(2)

logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s',
datefmt='%m/%d/%Y %I:%M:%S %p')

Expand Down Expand Up @@ -1799,7 +1803,7 @@ def create_table(self, table, columns, dtypes):
self.cur.execute(qtable)
if self.autocommit: self.con.commit()

def insert_data(self, table, columns, values, dtypes=None):
def insert_data(self, table, columns, values, dtypes=None, niter = 0):
"""Insert data into a DB table.
Trim trailing whitespace from string columns. Because of the
Expand Down Expand Up @@ -1845,31 +1849,59 @@ def insert_data(self, table, columns, values, dtypes=None):
raise cx_Oracle.DatabaseError(msg)

print(colored(
'\n Inserted %d rows and %d columns into table %s in %.2f seconds' % (
len(values), len(columns), table.upper(), t2 - t1), "green"))
'\n [Iter: %d] Inserted %d rows and %d columns into table %s in %.2f seconds' % (
niter+1, len(values), len(columns), table.upper(), t2 - t1), "green"))


def do_load_table(self, line, name=''):
"""
DB:Loads a table from a file (csv or fits) taking name from filename and columns from header
Usage: load_table <filename>
Usage: load_table <filename> [--tablename NAME] [--chunksize CHUNK]
Ex: example.csv has the following content
RA,DEC,MAG
1.23,0.13,23
0.13,0.01,22
This command will create a table named EXAMPLE with 3 columns RA,DEC and MAG and values taken from file
Optional Arguments:
--tablename NAME given name for the table, default is taken from filename
--chunksize CHUNK Number of rows to be inserted at a time. Useful for large files
that do not fit in memory
Note: - For csv or tab files, first line must have the column names (without # or any other comment) and same format
as data (using ',' or space)
- For fits file header must have columns names and data types
- For filenames use <table_name>.csv or <table_name>.fits do not use extra points
"""
filename = self.get_filename(line)
line = line.replace(';','')
load_parser = KeyParser(prog='', usage='', add_help=False)
load_parser.add_argument('filename', help='name for the file', action='store', default=None)
load_parser.add_argument('--tablename', help='name for the table', action='store', default='')
load_parser.add_argument('--chunksize', help='number of rows to read in blocks to avoid memory '
'issues', action='store', type=int, default=None)
load_parser.add_argument('-h', '--help', help='print help', action='store_true')
try:
load_args = load_parser.parse_args(line.split())
except SystemExit:
self.do_help('load_table')
return
if load_args.help:
self.do_help('load_table')
return
filename = self.get_filename(load_args.filename)
name = load_args.tablename
chunk = load_args.chunksize
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a cool use of argparser

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, and some other commands can be expanded as well to add optional arguments

if filename is None: return
base, ext = os.path.splitext(os.path.basename(filename))

if ext == '.h5' and chunk is not None:
print(colored("\nHDF5 file upload with chunksize is not supported yet. Try without "
"--chunksize\n","red"))
return

if name == '':
table = base
else:
Expand All @@ -1882,34 +1914,85 @@ def do_load_table(self, line, name=''):
return

try:
data = self.load_data(filename)
data, iterator = self.load_data(filename)
except:
print_exception()
return

# Get the data in a way that Oracle understands
columns = data.ea_get_columns()
values = data.ea_get_values()
dtypes = data.ea_get_dtypes()

# Clean up the original object
del data
iteration = 0
done = False
total_rows = 0
if data.file_type == 'pandas':
while not done:
try:
if iterator:
df = data.get_chunk(chunk)
else:
df = data
df.file_type = 'pandas'
if len(df) == 0: break
if iteration == 0:
dtypes = eafile.get_dtypes(df)
columns = df.columns.values.tolist()
values = df.values.tolist()
total_rows += len(df)
except:
break
if iteration == 0:
try:
self.create_table(table, columns, dtypes)
except:
print_exception()
self.drop_table(table)
return
try:
if not done:
self.insert_data(table, columns, values, dtypes, iteration)
iteration += 1
if not iterator: done =True
except:
print_exception()
self.drop_table(table)
return

try:
self.create_table(table, columns, dtypes)
except:
print_exception()
self.drop_table(table)
return
if data.file_type == 'fits':
if chunk is None: chunk = data[1].get_nrows()
start = 0
while not done:
try:
df = data
if iteration == 0:
dtypes = eafile.get_dtypes(df)
columns = df[1].get_colnames()
values = df[1][start:start+chunk].tolist()
start += chunk
if len(values) == 0 : break
total_rows += len(values)
except:
break
if iteration == 0:
try:
self.create_table(table, columns, dtypes)
except:
print_exception()
self.drop_table(table)
return

try:
self.insert_data(table, columns, values, dtypes)
except:
print_exception()
self.drop_table(table)
return
try:
if not done:
self.insert_data(table, columns, values, dtypes, iteration)
iteration += 1
except:
print_exception()
self.drop_table(table)
return

print(colored('\n Table %s loaded successfully.\n' % table.upper(), "green"))


print(colored('\n ** Table %s loaded successfully with %d rows.\n' % (table.upper(), total_rows),
"green"))
print(colored(' You may want to refresh the metadata so your new table appears during\n autocompletion',"cyan"))
print(colored(' DESDB ~> refresh_metadata_cache;',"cyan"))

Expand All @@ -1927,24 +2010,53 @@ def do_append_table(self, line, name=''):
"""
DB:Appends a table from a file (csv or fits) taking name from filename and columns from header.
Usage: append_table <filename>
Usage: append_table <filename> [--tablename NAME] [--chunksize CHUNK]
Ex: example.csv has the following content
RA,DEC,MAG
1.23,0.13,23
0.13,0.01,22
This command will append the contents of example.csv to the table named EXAMPLE.
It is meant to use after load_table command
Optional Arguments:
--tablename NAME given name for the table, default is taken from filename
--chunksize CHUNK Number of rows to be inserted at a time. Useful for large files
that do not fit in memory
Note: - For csv or tab files, first line must have the column names (without # or any other comment) and same format
as data (using ',' or space)
- For fits file header must have columns names and data types
- For filenames use <table_name>.csv or <table_name>.fits do not use extra points
"""

filename = self.get_filename(line)
line = line.replace(';','')
append_parser = KeyParser(prog='', usage='', add_help=False)
append_parser.add_argument('filename', help='name for the file', action='store', default=None)
append_parser.add_argument('--tablename', help='name for the table to append to', action='store', default='')
append_parser.add_argument('--chunksize', help='number of rows to read in blocks to avoid memory '
'issues', action='store', default=None, type=int)
append_parser.add_argument('-h', '--help', help='print help', action='store_true')
try:
append_args = append_parser.parse_args(line.split())
except SystemExit:
self.do_help('append_table')
return
if append_args.help:
self.do_help('append_table')
return
filename = self.get_filename(append_args.filename)
name = append_args.tablename
chunk = append_args.chunksize
if filename is None: return
base, ext = os.path.splitext(os.path.basename(filename))

if ext == '.h5' and chunk is not None:
print(colored("\nHDF5 file upload with chunksize is not supported yet. Try without "
"--chunksize\n","red"))
return


if name == '':
table = base
else:
Expand All @@ -1956,23 +2068,65 @@ def do_append_table(self, line, name=''):
'\n DESDB ~> CREATE TABLE %s (COL1 TYPE1(SIZE), ..., COLN TYPEN(SIZE));\n' % table.upper())
return
try:
data = self.load_data(filename)
data, iterator = self.load_data(filename)
except:
print_exception()
return

columns = data.ea_get_columns()
values = data.ea_get_values()
dtypes = data.ea_get_dtypes()
del data

try:
self.insert_data(table, columns, values, dtypes)
except:
print_exception()
return
iteration = 0
done = False
total_rows = 0
if data.file_type == 'pandas':
while not done:
try:
if iterator:
df = data.get_chunk(chunk)
else:
df = data
df.file_type = 'pandas'
if len(df) == 0: break
if iteration == 0:
dtypes = eafile.get_dtypes(df)
columns = df.columns.values.tolist()
values = df.values.tolist()
total_rows += len(df)
except:
break
try:
if not done:
self.insert_data(table, columns, values, dtypes, iteration)
iteration += 1
if not iterator: done = True
except:
print_exception()
return

if data.file_type == 'fits':
if chunk is None: chunk = data[1].get_nrows()
start = 0
while not done:
try:
df = data
if iteration == 0:
dtypes = eafile.get_dtypes(df)
columns = df[1].get_colnames()
values = df[1][start:start+chunk].tolist()
start += chunk
if len(values) == 0 : break
total_rows += len(values)
except:
break
try:
if not done:
self.insert_data(table, columns, values, dtypes, iteration)
iteration += 1
except:
print_exception()
return
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like a lot of code duplicated between do_load_table and do_append_table. Also, can't we abstract away the difference between pandas and fits data at this level? Specifically, providing a single interface to grabbing chunks (probably code to add in fileio.py)?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, there is a lot. I wanted to get it done first but yes, there repetition inside load_table and append_table and among themselves. There is room to reduce these


print(colored('\n Table %s appended successfully.' % table.upper(), "green"))
print(colored('\n ** Table %s appended successfully with %d rows.' % (table.upper(), total_rows),
"green"))


def complete_append_table(self, text, line, start_idx, end_idx):
Expand Down Expand Up @@ -2254,10 +2408,18 @@ def initial_message(quiet=False, clear=True):
help="Loads a sql command, execute it and exit")
parser.add_argument("-lt", "--load_table", dest='loadtable',
help="Loads data from a csv, tab, or fits formatted file \
into a DB table using the filename as the table name")
into a DB table using the filename as the table name or a custom \
name with --tablename MYTABLE")
parser.add_argument("-at", "--append_table", dest='appendtable',
help="Appends data from a csv, tab, or fits formatted file \
into a DB table using the filename as the table name")
into a DB table using the filename as the table name or a custom \
name with --tablename MYABLE")
parser.add_argument("--tablename", dest='tablename',
help="Custom table name to be used with --load_table\
or --append_table")
parser.add_argument("--chunksize", dest='chunksize', type=int, default = None,
help="Number of rows to be inserted at a time. Useful for large files \
that do not fit in memory. Use with --load_table")
parser.add_argument("-s", "--db",dest='db', #choices=[...]?
help="Override database name [dessci,desoper,destest]")
parser.add_argument("-q", "--quiet", action="store_true", dest='quiet',
Expand Down Expand Up @@ -2377,12 +2539,20 @@ def colored(line, color): return line
initial_message(args.quiet, clear=False)
cmdinterp = easy_or(conf, desconf, db, interactive=False, quiet=args.quiet)
linein = "load_table " + args.loadtable
if args.tablename is not None:
linein += ' --tablename ' + args.tablename
if args.chunksize is not None:
linein += ' --chunksize ' + str(args.chunksize)
cmdinterp.onecmd(linein)
os._exit(0)
elif args.appendtable is not None:
initial_message(args.quiet, clear=False)
cmdinterp = easy_or(conf, desconf, db, interactive=False, quiet=args.quiet)
linein = "append_table " + args.appendtable
if args.tablename is not None:
linein += ' --tablename ' + args.tablename
if args.chunksize is not None:
linein += ' --chunksize ' + str(args.chunksize)
cmdinterp.onecmd(linein)
os._exit(0)
else:
Expand Down
Loading