Skip to content

Commit

Permalink
Merge pull request #66 from mgckind/chunks
Browse files Browse the repository at this point in the history
load big files in chunks
  • Loading branch information
mgckind committed Apr 27, 2016
2 parents 1b360b9 + 66f91c6 commit af6c4c0
Show file tree
Hide file tree
Showing 2 changed files with 244 additions and 55 deletions.
250 changes: 210 additions & 40 deletions easyaccess/easyaccess.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ def colored(line, color): return line
import webbrowser
import signal

class KeyParser(argparse.ArgumentParser):
def error(self, message):
sys.exit(2)

logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s',
datefmt='%m/%d/%Y %I:%M:%S %p')

Expand Down Expand Up @@ -1799,7 +1803,7 @@ def create_table(self, table, columns, dtypes):
self.cur.execute(qtable)
if self.autocommit: self.con.commit()

def insert_data(self, table, columns, values, dtypes=None):
def insert_data(self, table, columns, values, dtypes=None, niter = 0):
"""Insert data into a DB table.
Trim trailing whitespace from string columns. Because of the
Expand Down Expand Up @@ -1845,31 +1849,59 @@ def insert_data(self, table, columns, values, dtypes=None):
raise cx_Oracle.DatabaseError(msg)

print(colored(
'\n Inserted %d rows and %d columns into table %s in %.2f seconds' % (
len(values), len(columns), table.upper(), t2 - t1), "green"))
'\n [Iter: %d] Inserted %d rows and %d columns into table %s in %.2f seconds' % (
niter+1, len(values), len(columns), table.upper(), t2 - t1), "green"))


def do_load_table(self, line, name=''):
"""
DB:Loads a table from a file (csv or fits) taking name from filename and columns from header
Usage: load_table <filename>
Usage: load_table <filename> [--tablename NAME] [--chunksize CHUNK]
Ex: example.csv has the following content
RA,DEC,MAG
1.23,0.13,23
0.13,0.01,22
This command will create a table named EXAMPLE with 3 columns RA,DEC and MAG and values taken from file
Optional Arguments:
--tablename NAME given name for the table, default is taken from filename
--chunksize CHUNK Number of rows to be inserted at a time. Useful for large files
that do not fit in memory
Note: - For csv or tab files, first line must have the column names (without # or any other comment) and same format
as data (using ',' or space)
- For fits file header must have columns names and data types
- For filenames use <table_name>.csv or <table_name>.fits do not use extra points
"""
filename = self.get_filename(line)
line = line.replace(';','')
load_parser = KeyParser(prog='', usage='', add_help=False)
load_parser.add_argument('filename', help='name for the file', action='store', default=None)
load_parser.add_argument('--tablename', help='name for the table', action='store', default='')
load_parser.add_argument('--chunksize', help='number of rows to read in blocks to avoid memory '
'issues', action='store', type=int, default=None)
load_parser.add_argument('-h', '--help', help='print help', action='store_true')
try:
load_args = load_parser.parse_args(line.split())
except SystemExit:
self.do_help('load_table')
return
if load_args.help:
self.do_help('load_table')
return
filename = self.get_filename(load_args.filename)
name = load_args.tablename
chunk = load_args.chunksize
if filename is None: return
base, ext = os.path.splitext(os.path.basename(filename))

if ext == '.h5' and chunk is not None:
print(colored("\nHDF5 file upload with chunksize is not supported yet. Try without "
"--chunksize\n","red"))
return

if name == '':
table = base
else:
Expand All @@ -1882,34 +1914,85 @@ def do_load_table(self, line, name=''):
return

try:
data = self.load_data(filename)
data, iterator = self.load_data(filename)
except:
print_exception()
return

# Get the data in a way that Oracle understands
columns = data.ea_get_columns()
values = data.ea_get_values()
dtypes = data.ea_get_dtypes()

# Clean up the original object
del data
iteration = 0
done = False
total_rows = 0
if data.file_type == 'pandas':
while not done:
try:
if iterator:
df = data.get_chunk(chunk)
else:
df = data
df.file_type = 'pandas'
if len(df) == 0: break
if iteration == 0:
dtypes = eafile.get_dtypes(df)
columns = df.columns.values.tolist()
values = df.values.tolist()
total_rows += len(df)
except:
break
if iteration == 0:
try:
self.create_table(table, columns, dtypes)
except:
print_exception()
self.drop_table(table)
return
try:
if not done:
self.insert_data(table, columns, values, dtypes, iteration)
iteration += 1
if not iterator: done =True
except:
print_exception()
self.drop_table(table)
return

try:
self.create_table(table, columns, dtypes)
except:
print_exception()
self.drop_table(table)
return
if data.file_type == 'fits':
if chunk is None: chunk = data[1].get_nrows()
start = 0
while not done:
try:
df = data
if iteration == 0:
dtypes = eafile.get_dtypes(df)
columns = df[1].get_colnames()
values = df[1][start:start+chunk].tolist()
start += chunk
if len(values) == 0 : break
total_rows += len(values)
except:
break
if iteration == 0:
try:
self.create_table(table, columns, dtypes)
except:
print_exception()
self.drop_table(table)
return

try:
self.insert_data(table, columns, values, dtypes)
except:
print_exception()
self.drop_table(table)
return
try:
if not done:
self.insert_data(table, columns, values, dtypes, iteration)
iteration += 1
except:
print_exception()
self.drop_table(table)
return

print(colored('\n Table %s loaded successfully.\n' % table.upper(), "green"))


print(colored('\n ** Table %s loaded successfully with %d rows.\n' % (table.upper(), total_rows),
"green"))
print(colored(' You may want to refresh the metadata so your new table appears during\n autocompletion',"cyan"))
print(colored(' DESDB ~> refresh_metadata_cache;',"cyan"))

Expand All @@ -1927,24 +2010,53 @@ def do_append_table(self, line, name=''):
"""
DB:Appends a table from a file (csv or fits) taking name from filename and columns from header.
Usage: append_table <filename>
Usage: append_table <filename> [--tablename NAME] [--chunksize CHUNK]
Ex: example.csv has the following content
RA,DEC,MAG
1.23,0.13,23
0.13,0.01,22
This command will append the contents of example.csv to the table named EXAMPLE.
It is meant to use after load_table command
Optional Arguments:
--tablename NAME given name for the table, default is taken from filename
--chunksize CHUNK Number of rows to be inserted at a time. Useful for large files
that do not fit in memory
Note: - For csv or tab files, first line must have the column names (without # or any other comment) and same format
as data (using ',' or space)
- For fits file header must have columns names and data types
- For filenames use <table_name>.csv or <table_name>.fits do not use extra points
"""

filename = self.get_filename(line)
line = line.replace(';','')
append_parser = KeyParser(prog='', usage='', add_help=False)
append_parser.add_argument('filename', help='name for the file', action='store', default=None)
append_parser.add_argument('--tablename', help='name for the table to append to', action='store', default='')
append_parser.add_argument('--chunksize', help='number of rows to read in blocks to avoid memory '
'issues', action='store', default=None, type=int)
append_parser.add_argument('-h', '--help', help='print help', action='store_true')
try:
append_args = append_parser.parse_args(line.split())
except SystemExit:
self.do_help('append_table')
return
if append_args.help:
self.do_help('append_table')
return
filename = self.get_filename(append_args.filename)
name = append_args.tablename
chunk = append_args.chunksize
if filename is None: return
base, ext = os.path.splitext(os.path.basename(filename))

if ext == '.h5' and chunk is not None:
print(colored("\nHDF5 file upload with chunksize is not supported yet. Try without "
"--chunksize\n","red"))
return


if name == '':
table = base
else:
Expand All @@ -1956,23 +2068,65 @@ def do_append_table(self, line, name=''):
'\n DESDB ~> CREATE TABLE %s (COL1 TYPE1(SIZE), ..., COLN TYPEN(SIZE));\n' % table.upper())
return
try:
data = self.load_data(filename)
data, iterator = self.load_data(filename)
except:
print_exception()
return

columns = data.ea_get_columns()
values = data.ea_get_values()
dtypes = data.ea_get_dtypes()
del data

try:
self.insert_data(table, columns, values, dtypes)
except:
print_exception()
return
iteration = 0
done = False
total_rows = 0
if data.file_type == 'pandas':
while not done:
try:
if iterator:
df = data.get_chunk(chunk)
else:
df = data
df.file_type = 'pandas'
if len(df) == 0: break
if iteration == 0:
dtypes = eafile.get_dtypes(df)
columns = df.columns.values.tolist()
values = df.values.tolist()
total_rows += len(df)
except:
break
try:
if not done:
self.insert_data(table, columns, values, dtypes, iteration)
iteration += 1
if not iterator: done = True
except:
print_exception()
return

if data.file_type == 'fits':
if chunk is None: chunk = data[1].get_nrows()
start = 0
while not done:
try:
df = data
if iteration == 0:
dtypes = eafile.get_dtypes(df)
columns = df[1].get_colnames()
values = df[1][start:start+chunk].tolist()
start += chunk
if len(values) == 0 : break
total_rows += len(values)
except:
break
try:
if not done:
self.insert_data(table, columns, values, dtypes, iteration)
iteration += 1
except:
print_exception()
return

print(colored('\n Table %s appended successfully.' % table.upper(), "green"))
print(colored('\n ** Table %s appended successfully with %d rows.' % (table.upper(), total_rows),
"green"))


def complete_append_table(self, text, line, start_idx, end_idx):
Expand Down Expand Up @@ -2254,10 +2408,18 @@ def initial_message(quiet=False, clear=True):
help="Loads a sql command, execute it and exit")
parser.add_argument("-lt", "--load_table", dest='loadtable',
help="Loads data from a csv, tab, or fits formatted file \
into a DB table using the filename as the table name")
into a DB table using the filename as the table name or a custom \
name with --tablename MYTABLE")
parser.add_argument("-at", "--append_table", dest='appendtable',
help="Appends data from a csv, tab, or fits formatted file \
into a DB table using the filename as the table name")
into a DB table using the filename as the table name or a custom \
name with --tablename MYABLE")
parser.add_argument("--tablename", dest='tablename',
help="Custom table name to be used with --load_table\
or --append_table")
parser.add_argument("--chunksize", dest='chunksize', type=int, default = None,
help="Number of rows to be inserted at a time. Useful for large files \
that do not fit in memory. Use with --load_table")
parser.add_argument("-s", "--db",dest='db', #choices=[...]?
help="Override database name [dessci,desoper,destest]")
parser.add_argument("-q", "--quiet", action="store_true", dest='quiet',
Expand Down Expand Up @@ -2377,12 +2539,20 @@ def colored(line, color): return line
initial_message(args.quiet, clear=False)
cmdinterp = easy_or(conf, desconf, db, interactive=False, quiet=args.quiet)
linein = "load_table " + args.loadtable
if args.tablename is not None:
linein += ' --tablename ' + args.tablename
if args.chunksize is not None:
linein += ' --chunksize ' + str(args.chunksize)
cmdinterp.onecmd(linein)
os._exit(0)
elif args.appendtable is not None:
initial_message(args.quiet, clear=False)
cmdinterp = easy_or(conf, desconf, db, interactive=False, quiet=args.quiet)
linein = "append_table " + args.appendtable
if args.tablename is not None:
linein += ' --tablename ' + args.tablename
if args.chunksize is not None:
linein += ' --chunksize ' + str(args.chunksize)
cmdinterp.onecmd(linein)
os._exit(0)
else:
Expand Down
Loading

0 comments on commit af6c4c0

Please sign in to comment.