-
Notifications
You must be signed in to change notification settings - Fork 59
/
utils.jl
141 lines (121 loc) · 4.55 KB
/
utils.jl
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Determines the total number of bytes needed to store `n` bytes with padding.
# Note that the Arrow standard requires buffers to be aligned to 8-byte boundaries.
padding(n::Integer, alignment) = ((n + alignment - 1) ÷ alignment) * alignment
paddinglength(n::Integer, alignment) = padding(n, alignment) - n
function writezeros(io::IO, n::Integer)
s = 0
for i ∈ 1:n
s += Base.write(io, 0x00)
end
s
end
# efficient writing of arrays
writearray(io, col) = writearray(io, maybemissing(eltype(col)), col)
function writearray(io::IO, ::Type{T}, col) where {T}
if col isa Vector{T}
n = Base.write(io, col)
elseif isbitstype(T) && (
col isa Vector{Union{T,Missing}} || col isa SentinelVector{T,T,Missing,Vector{T}}
)
# need to write the non-selector bytes of isbits Union Arrays
n = Base.unsafe_write(io, pointer(col), sizeof(T) * length(col))
elseif col isa ChainedVector
n = 0
for A in col.arrays
n += writearray(io, T, A)
end
else
n = 0
data = Vector{UInt8}(undef, sizeof(col))
buf = IOBuffer(data; write=true)
for x in col
n += Base.write(buf, coalesce(x, ArrowTypes.default(T)))
end
n = Base.write(io, take!(buf))
end
return n
end
getbit(v::UInt8, n::Integer) = (v & (1 << (n - 1))) > 0x00
function setbit(v::UInt8, b::Bool, n::Integer)
if b
v | 0x02^(n - 1)
else
v & (0xff ⊻ 0x02^(n - 1))
end
end
# Determines the number of bytes used by `n` bits, optionally with padding.
function bitpackedbytes(n::Integer, alignment)
ℓ = cld(n, 8)
return ℓ + paddinglength(ℓ, alignment)
end
# count # of missing elements in an iterable
nullcount(col) = count(ismissing, col)
# like startswith for strings, but on byte buffers
function _startswith(a::AbstractVector{UInt8}, pos::Integer, b::AbstractVector{UInt8})
for i = 1:length(b)
@inbounds check = a[pos + i - 1] == b[i]
check || return false
end
return true
end
# read a single element from a byte vector
# copied from read(::IOBuffer, T) in Base
function readbuffer(t::AbstractVector{UInt8}, pos::Integer, ::Type{T}) where {T}
GC.@preserve t begin
ptr::Ptr{T} = pointer(t, pos)
x = unsafe_load(ptr)
end
end
# given a number of unique values; what dict encoding _index_ type is most appropriate
encodingtype(n) =
n < div(typemax(Int8), 2) ? Int8 :
n < div(typemax(Int16), 2) ? Int16 : n < div(typemax(Int32), 2) ? Int32 : Int64
maybemissing(::Type{T}) where {T} = T === Missing ? Missing : Base.nonmissingtype(T)
withmissing(U::Union, S) = U >: Missing ? Union{Missing,S} : S
withmissing(T, S) = T === Missing ? Union{Missing,S} : S
function getfooter(filebytes)
len = readbuffer(filebytes, length(filebytes) - 9, Int32)
FlatBuffers.getrootas(Meta.Footer, filebytes[(end - (9 + len)):(end - 10)], 0)
end
function getrb(filebytes)
f = getfooter(filebytes)
rb = f.recordBatches[1]
return filebytes[(rb.offset + 1):(rb.offset + 1 + rb.metaDataLength)]
# FlatBuffers.getrootas(Meta.Message, filebytes, rb.offset)
end
function readmessage(filebytes, off=9)
@assert readbuffer(filebytes, off, UInt32) === 0xFFFFFFFF
len = readbuffer(filebytes, off + 4, Int32)
FlatBuffers.getrootas(Meta.Message, filebytes, off + 8)
end
function tobuffer(data; kwargs...)
io = IOBuffer()
write(io, data; kwargs...)
seekstart(io)
return io
end
toidict(x::Base.ImmutableDict) = x
# ref https://github.com/apache/arrow-julia/pull/238#issuecomment-919415809
function toidict(pairs)
isempty(pairs) && return Base.ImmutableDict{String,String}()
dict = Base.ImmutableDict(first(pairs))
for pair in Iterators.drop(pairs, 1)
dict = Base.ImmutableDict(dict, pair)
end
return dict
end