Skip to content

Commit

Permalink
A little refactoring to improve inference and precompilation (#880)
Browse files Browse the repository at this point in the history
* A little refactoring to improve inference and precompilation

The changes in files that are not precompile.jl are inference
improvements; mainly from inspecting results of `@code_typed`,
Cthulhu.jl, and SnoopCompile.jl. The changes in precompile.jl are from
comments from @timholy recommending that in our precompile process, we
can just call regular code instead needing to call `precompile` with
methods/arg types. I'm aware I don't understand all the details around
precompilation, method invalidation, etc. but unfortunately, I feel a
bit blocked with CSV.jl's precompilation. With the changes in #875, we
now see a fixed overhead of allocations when parsing due, I'm told, to
an issue in Base Julia
(JuliaLang/julia#34055).
  • Loading branch information
quinnj authored Sep 1, 2021
1 parent 79f856f commit 4efa2f2
Show file tree
Hide file tree
Showing 7 changed files with 340 additions and 336 deletions.
8 changes: 8 additions & 0 deletions src/CSV.jl
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ const DEFAULT_MAX_INLINE_STRING_LENGTH = 32
const TRUE_STRINGS = ["true", "True", "TRUE", "T", "1"]
const FALSE_STRINGS = ["false", "False", "FALSE", "F", "0"]
const ValidSources = Union{Vector{UInt8}, SubArray{UInt8, 1, Vector{UInt8}}, IO, Cmd, AbstractString, AbstractPath}
const MAX_INPUT_SIZE = Int64(2)^42
const EMPTY_INT_ARRAY = Int64[]

include("keyworddocs.jl")
include("utils.jl")
Expand Down Expand Up @@ -67,4 +69,10 @@ end
include("precompile.jl")
_precompile_()

function __init__()
# CSV.File(IOBuffer(PRECOMPILE_DATA))
# foreach(row -> row, CSV.Rows(IOBuffer(PRECOMPILE_DATA)))
# CSV.File(joinpath(dirname(pathof(CSV)), "..", "test", "testfiles", "promotions.csv"))
end

end # module
172 changes: 105 additions & 67 deletions src/context.jl
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,10 @@ function checkinvalidcolumns(dict, argname, ncols, names)
return
end

@noinline nonconcretetypes(types) = throw(ArgumentError("Non-concrete types passed in `types` keyword argument, please provide concrete types for columns: $types"))

struct Context
transpose::Val
transpose::Bool
name::String
names::Vector{Symbol}
rowsguess::Int64
Expand Down Expand Up @@ -139,7 +141,7 @@ struct Context
streaming::Bool
end

@refargs function Context(source,
@refargs function Context(source::ValidSources,
# file options
# header can be a row number, range of rows, or actual string vector
header::Union{Integer, Vector{Symbol}, Vector{String}, AbstractVector{<:Integer}},
Expand All @@ -150,39 +152,39 @@ end
transpose::Bool,
comment::Union{String, Nothing},
ignoreemptyrows::Bool,
ignoreemptylines,
ignoreemptylines::Union{Nothing, Bool},
select,
drop,
limit::Union{Integer, Nothing},
buffer_in_memory::Bool,
threaded,
threaded::Union{Nothing, Bool},
ntasks::Union{Nothing, Integer},
tasks,
tasks::Union{Nothing, Integer},
rows_to_check::Integer,
lines_to_check,
lines_to_check::Union{Nothing, Integer},
# parsing options
missingstrings,
missingstring,
delim,
missingstrings::Union{Nothing, String, Vector{String}},
missingstring::Union{Nothing, String, Vector{String}},
delim::Union{Nothing, UInt8, Char, String},
ignorerepeated::Bool,
quoted::Bool,
quotechar,
openquotechar,
closequotechar,
escapechar,
dateformat,
dateformats,
decimal,
truestrings,
falsestrings,
quotechar::Union{UInt8, Char},
openquotechar::Union{Nothing, UInt8, Char},
closequotechar::Union{Nothing, UInt8, Char},
escapechar::Union{UInt8, Char},
dateformat::Union{Nothing, String, Dates.DateFormat, Parsers.Format, AbstractVector, AbstractDict},
dateformats::Union{Nothing, String, Dates.DateFormat, Parsers.Format, AbstractVector, AbstractDict},
decimal::Union{UInt8, Char},
truestrings::Union{Nothing, Vector{String}},
falsestrings::Union{Nothing, Vector{String}},
# type options
type,
types,
typemap,
pool,
type::Union{Nothing, Type},
types::Union{Nothing, Type, AbstractVector, AbstractDict},
typemap::Dict,
pool::Union{Bool, Real, AbstractVector, AbstractDict},
downcast::Bool,
lazystrings,
stringtype,
lazystrings::Bool,
stringtype::StringTypes,
strict::Bool,
silencewarnings::Bool,
maxwarnings::Integer,
Expand All @@ -192,13 +194,15 @@ end

# initial argument validation and adjustment
@inbounds begin
!isa(source, IO) && !isa(source, AbstractVector{UInt8}) && !isa(source, Cmd) && !isfile(source) &&
throw(ArgumentError("\"$source\" is not a valid file or doesn't exist"))
((source isa AbstractString || source isa AbstractPath) && !isfile(source)::Bool) && throw(ArgumentError("\"$source\" is not a valid file or doesn't exist"))
if types !== nothing
if types isa AbstractVector || types isa AbstractDict
any(x->!concrete_or_concreteunion(x), types isa AbstractDict ? values(types) : types) && throw(ArgumentError("Non-concrete types passed in `types` keyword argument, please provide concrete types for columns: $types"))
if types isa AbstractVector
any(x->!concrete_or_concreteunion(x), types) && nonconcretetypes(types)
elseif types isa AbstractDict
typs = values(types)
any(x->!concrete_or_concreteunion(x), typs) && nonconcretetypes(typs)
else
concrete_or_concreteunion(types) || throw(ArgumentError("Non-concrete types passed in `types` keyword argument, please provide concrete types for columns: $types"))
concrete_or_concreteunion(types) || nonconcretetypes(types)
end
end
checkvaliddelim(delim)
Expand Down Expand Up @@ -239,14 +243,28 @@ end
Base.depwarn("`threaded` keyword argument is deprecated; to avoid multithreaded parsing, pass `ntasks=1`", :Context)
ntasks = threaded ? Threads.nthreads() : 1
end
header = (isa(header, Integer) && header == 1 && skipto == 1) ? -1 : header
isa(header, Integer) && skipto != -1 && (skipto > header || throw(ArgumentError("data row ($skipto) must come after header row ($header)")))
skipto = skipto == -1 ? (isa(header, Vector{Symbol}) || isa(header, Vector{String}) ? 0 : last(header)) + 1 : skipto # by default, data starts on line after header
if header isa Integer
if header == 1 && skipto == 1
header = -1
elseif skipto != -1 && skipto < header
throw(ArgumentError("skipto row ($skipto) must come after header row ($header)"))
end
end
if skipto == -1
if isa(header, Vector{Symbol}) || isa(header, Vector{String})
skipto = 0
elseif header isa Integer
# by default, data starts on line after header
skipto = header + 1
elseif header isa AbstractVector{<:Integer}
skipto = last(header) + 1
end
end
debug && println("header is: $header, skipto computed as: $skipto")
# getsource will turn any input into a `AbstractVector{UInt8}`
buf, pos, len, tempfile = getsource(source, buffer_in_memory)
if len > Int64(2)^42
throw(ArgumentError("delimited source to parse too large; must be < $(2^42) bytes"))
if len > MAX_INPUT_SIZE
throw(ArgumentError("delimited source to parse too large; must be < $MAX_INPUT_SIZE bytes"))
end
# skip over initial BOM character, if present
pos = consumeBOM(buf, pos)
Expand All @@ -259,9 +277,12 @@ end
sentinel = missingstring === nothing ? missingstring : (isempty(missingstring) || (missingstring isa Vector && length(missingstring) == 1 && missingstring[1] == "")) ? missing : missingstring isa String ? [missingstring] : missingstring

if delim === nothing
del = isa(source, AbstractString) && endswith(source, ".tsv") ? UInt8('\t') :
isa(source, AbstractString) && endswith(source, ".wsv") ? UInt8(' ') :
UInt8('\n')
if source isa AbstractString || source isa AbstractPath
filename = string(source)
del = endswith(filename, ".tsv") ? UInt8('\t') : endswith(filename, ".wsv") ? UInt8(' ') : UInt8('\n')
else
del = UInt8('\n')
end
else
del = (delim isa Char && isascii(delim)) ? delim % UInt8 :
(sizeof(delim) == 1 && isascii(delim)) ? delim[1] % UInt8 : delim
Expand All @@ -278,32 +299,47 @@ end
end

df = dateformat isa AbstractVector || dateformat isa AbstractDict ? nothing : dateformat
wh1 = UInt8(' ')
wh2 = UInt8('\t')
if sentinel isa Vector
for sent in sentinel
if contains(sent, " ")
wh1 = 0x00
end
if contains(sent, "\t")
wh2 = 0x00
end
end
end
headerpos = datapos = pos
if !transpose
# step 1: detect the byte position where the column names start (headerpos)
# and where the first data row starts (datapos)
headerpos, datapos = detectheaderdatapos(buf, pos, len, oq, eq, cq, cmt, ignoreemptyrows, header, skipto)
debug && println("headerpos = $headerpos, datapos = $datapos")

# step 2: detect delimiter (or use given) and detect number of (estimated) rows and columns
d, rowsguess = detectdelimandguessrows(buf, headerpos, datapos, len, oq, eq, cq, del, cmt, ignoreemptyrows)
debug && println("estimated rows: $rowsguess")
debug && println("detected delimiter: \"$(escape_string(d isa UInt8 ? string(Char(d)) : d))\"")

# step 3: build Parsers.Options w/ parsing arguments
wh1 = d == UInt(' ') ? 0x00 : UInt8(' ')
wh2 = d == UInt8('\t') ? 0x00 : UInt8('\t')
if sentinel isa Vector
for sent in sentinel
if contains(sent, " ")
wh1 = 0x00
end
if contains(sent, "\t")
wh2 = 0x00
end
end
end
end
# step 2: detect delimiter (or use given) and detect number of (estimated) rows and columns
# step 3: build Parsers.Options w/ parsing arguments
if del isa UInt8
d, rowsguess = detectdelimandguessrows(buf, headerpos, datapos, len, oq, eq, cq, cmt, ignoreemptyrows, del)
wh1 = d == UInt(' ') ? 0x00 : wh1
wh2 = d == UInt8('\t') ? 0x00 : wh2
options = Parsers.Options(sentinel, wh1, wh2, oq, cq, eq, d, decimal, trues, falses, df, ignorerepeated, ignoreemptyrows, comment, quoted, parsingdebug)
elseif del isa Char
_, rowsguess = detectdelimandguessrows(buf, headerpos, datapos, len, oq, eq, cq, cmt, ignoreemptyrows)
options = Parsers.Options(sentinel, wh1, wh2, oq, cq, eq, del, decimal, trues, falses, df, ignorerepeated, ignoreemptyrows, comment, quoted, parsingdebug)
d = del
elseif del isa String
_, rowsguess = detectdelimandguessrows(buf, headerpos, datapos, len, oq, eq, cq, cmt, ignoreemptyrows)
options = Parsers.Options(sentinel, wh1, wh2, oq, cq, eq, del, decimal, trues, falses, df, ignorerepeated, ignoreemptyrows, comment, quoted, parsingdebug)
d = del
else
error("invalid delim type")
end
debug && println("estimated rows: $rowsguess")
debug && println("detected delimiter: \"$(escape_string(d isa UInt8 ? string(Char(d)) : d))\"")

if !transpose
# step 4a: if we're ignoring repeated delimiters, then we ignore any
# that start a row, so we need to check if we need to adjust our headerpos/datapos
if ignorerepeated
Expand All @@ -318,10 +354,6 @@ end
ncols = length(names)
else
# transpose
d, rowsguess = detectdelimandguessrows(buf, pos, pos, len, oq, eq, cq, del, cmt, ignoreemptyrows)
wh1 = d == UInt(' ') ? 0x00 : UInt8(' ')
wh2 = d == UInt8('\t') ? 0x00 : UInt8('\t')
options = Parsers.Options(sentinel, wh1, wh2, oq, cq, eq, d, decimal, trues, falses, df, ignorerepeated, ignoreemptyrows, comment, quoted, parsingdebug)
rowsguess, names, positions, endpositions = detecttranspose(buf, pos, len, options, header, skipto, normalizenames)
ncols = length(names)
datapos = isempty(positions) ? 0 : positions[1]
Expand Down Expand Up @@ -357,7 +389,7 @@ end
else
T = types === nothing ? (streaming ? Union{stringtype, Missing} : NeedsTypeDetection) : types
columns = Vector{Column}(undef, ncols)
foreach(1:ncols) do i
for i = 1:ncols
col = Column(T, options)
columns[i] = col
end
Expand Down Expand Up @@ -427,7 +459,7 @@ end
end
elseif select isa Base.Callable
for i = 1:ncols
select(i, names[i]) || willdrop!(columns, i)
select(i, names[i])::Bool || willdrop!(columns, i)
end
else
throw(ArgumentError("`select` keyword argument must be an `AbstractVector` of `Int`, `Symbol`, `String`, or `Bool`, or a selector function of the form `(i, name) -> keep::Bool`"))
Expand All @@ -448,7 +480,7 @@ end
end
elseif drop isa Base.Callable
for i = 1:ncols
drop(i, names[i]) && willdrop!(columns, i)
drop(i, names[i])::Bool && willdrop!(columns, i)
end
else
throw(ArgumentError("`drop` keyword argument must be an `AbstractVector` of `Int`, `Symbol`, `String`, or `Bool`, or a selector function of the form `(i, name) -> keep::Bool`"))
Expand All @@ -459,7 +491,7 @@ end
# determine if we can use threads while parsing
limit = something(limit, typemax(Int64))
minrows = min(limit, rowsguess)
nthreads = something(ntasks, Threads.nthreads())
nthreads = Int(something(ntasks, Threads.nthreads()))
if ntasks === nothing && !streaming && nthreads > 1 && !transpose && minrows > (nthreads * 5) && (minrows * ncols) >= 5_000
threaded = true
ntasks = nthreads
Expand Down Expand Up @@ -487,6 +519,7 @@ end
# but we also don't guarantee limit will be exact w/ multithreaded parsing
origrowsguess = rowsguess
if limit !== typemax(Int64)
limit = Int64(limit)
limitposguess = ceil(Int64, (limit / (origrowsguess * 0.8)) * len)
newlen = [0, limitposguess, min(limitposguess * 2, len)]
findrowstarts!(buf, options, newlen, ncols, columns, stringtype, downcast, 5)
Expand All @@ -495,7 +528,10 @@ end
debug && println("limiting, adjusting len to $len")
end
chunksize = div(len - datapos, ntasks)
chunkpositions = [i == 0 ? datapos : i == ntasks ? len : (datapos + chunksize * i) for i = 0:ntasks]
chunkpositions = Vector{Int64}(undef, ntasks + 1)
for i = 0:ntasks
chunkpositions[i + 1] = i == 0 ? datapos : i == ntasks ? len : (datapos + chunksize * i)
end
debug && println("initial byte positions before adjusting for start of rows: $chunkpositions")
avgbytesperrow, successfullychunked = findrowstarts!(buf, options, chunkpositions, ncols, columns, stringtype, downcast, rows_to_check)
if successfullychunked
Expand All @@ -506,7 +542,9 @@ end
debug && println("multi-threaded column types sampled as: $columns")
# check if we need to adjust column pooling
if finalpool == 0.0 || finalpool == 1.0
foreach(col -> col.pool = finalpool, columns)
for col in columns
col.pool = finalpool
end
end
else
debug && println("something went wrong chunking up a file for multithreaded parsing, falling back to single-threaded parsing")
Expand All @@ -521,7 +559,7 @@ end

end # @inbounds begin
return Context(
Val(transpose),
transpose,
getname(source),
names,
rowsguess,
Expand Down
Loading

0 comments on commit 4efa2f2

Please sign in to comment.