forked from snowflakedb/snowflake-connector-python
-
Notifications
You must be signed in to change notification settings - Fork 0
/
arrow_iterator.pyx
83 lines (62 loc) · 2.42 KB
/
arrow_iterator.pyx
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
#
# Copyright (c) 2012-2019 Snowflake Computing Inc. All right reserved.
#
# distutils: language = c++
from logging import getLogger
from cpython.ref cimport PyObject
logger = getLogger(__name__)
'''
the unit in this iterator
EMPTY_UNIT: default
ROW_UNIT: fetch row by row if the user call `fetchone()`
TABLE_UNIT: fetch one arrow table if the user call `fetch_pandas()`
'''
ROW_UNIT, TABLE_UNIT, EMPTY_UNIT = 'row', 'table', ''
cdef extern from "cpp/ArrowIterator/CArrowIterator.hpp" namespace "sf":
cdef cppclass CArrowIterator:
void addRecordBatch(PyObject * rb)
PyObject* next();
void reset();
cdef extern from "cpp/ArrowIterator/CArrowChunkIterator.hpp" namespace "sf":
cdef cppclass CArrowChunkIterator(CArrowIterator):
CArrowChunkIterator(PyObject* context) except +
cdef extern from "cpp/ArrowIterator/CArrowTableIterator.hpp" namespace "sf":
cdef cppclass CArrowTableIterator(CArrowIterator):
CArrowTableIterator(PyObject* context) except +
cdef class PyArrowIterator:
cdef object reader
cdef object context
cdef CArrowIterator* cIterator
cdef str unit
cdef PyObject* cret
def __cinit__(self, object arrow_stream_reader, object arrow_context):
self.reader = arrow_stream_reader
self.context = arrow_context
self.cIterator = NULL
self.unit = ''
def __dealloc__(self):
del self.cIterator
def __next__(self):
self.cret = self.cIterator.next()
if not self.cret:
logger.error("Internal error from CArrowIterator\n")
# it looks like this line can help us get into python and detect the global variable immediately
# however, this log will not show up for unclear reason
ret = <object>self.cret
if ret is None:
raise StopIteration
else:
return ret
def init(self, str iter_unit):
# init chunk (row) iterator or table iterator
if iter_unit != ROW_UNIT and iter_unit != TABLE_UNIT:
raise NotImplementedError
elif iter_unit == ROW_UNIT:
self.cIterator = new CArrowChunkIterator(<PyObject*>self.context)
elif iter_unit == TABLE_UNIT:
self.cIterator = new CArrowTableIterator(<PyObject*>self.context)
self.unit = iter_unit
# read
for rb in self.reader:
self.cIterator.addRecordBatch(<PyObject*>rb)
self.cIterator.reset()