-
Notifications
You must be signed in to change notification settings - Fork 0
/
connector.py
101 lines (71 loc) · 2.72 KB
/
connector.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
import snowflake.connector
from elasticsearch import Elasticsearch
from elasticsearch.helpers import streaming_bulk
import yaml
import json, os
import tqdm
from loguru import logger
class Connector:
def __init__(self):
logger.info("loading configs")
with open('config/connector.yml', 'r') as file:
c = yaml.safe_load(file)
self._c = c
logger.info("Connecting snowflake ...")
conn_sf = snowflake.connector.connect(
user=c['snowflake']['username'],
password=c['snowflake']['password'],
account=c['snowflake']['account'],
warehouse=c['snowflake']['warehouse'],
database=c['snowflake']['database'],
schema=c['snowflake']['scheme']
)
self._sf = conn_sf.cursor()
logger.info("Connecting elasticsearch ...")
conn_es = Elasticsearch(
c['elasticsearch']['host'],
ca_certs = c['elasticsearch']['ca_cert'],
basic_auth = (c['elasticsearch']['username'], c['elasticsearch']['password'])
)
self._es = conn_es
def getColumns(self):
c = "show columns in table " + self._c['snowflake']['table']
result = self._sf.execute(c)
col = []
for row in result:
col.append(row[2])
return col
def getCount(self):
c = "select count(*) as count from " + self._c['snowflake']['table']
result = self._sf.execute(c)
for row in result:
count = row[0]
logger.info("Total records found " + str(count))
return count
def pull(self, offset):
col = self.getColumns()
select_columns = ', '.join(col)
q = "select "+select_columns+" from "+self._c['snowflake']['table']+" limit " + str(self._c['snowflake']['limit'])+ " offset " + str(offset);
result = self._sf.execute(q)
for row in result:
doc = dict(zip(col, row))
yield doc
def stream(self, offset):
for ok, action in streaming_bulk(
client=self._es, index=self._c['elasticsearch']['index'], actions=self.pull(offset),):
self._success += 1
self._progress.update(1)
def push(self):
logger.info("Data transfer started")
self._count = self.getCount()
self._success = 0
batch_no = 1
limit = self._c['snowflake']['limit']
self._progress = tqdm.tqdm(unit="docs", total=self._count)
while self._success < self._count:
offset = (batch_no - 1) * limit
self.stream(offset)
batch_no += 1
def run(self):
self.push()
logger.success("Data transfer completed")