forked from snowflakedb/snowflake-connector-python
-
Notifications
You must be signed in to change notification settings - Fork 0
/
converter_issue23517.py
101 lines (75 loc) · 2.79 KB
/
converter_issue23517.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Copyright (c) 2012-2019 Snowflake Computing Inc. All right reserved.
#
from datetime import timedelta
from logging import getLogger
import pytz
from .converter import (
SnowflakeConverter,
ZERO_EPOCH,
_generate_tzinfo_from_tzoffset)
logger = getLogger(__name__)
class SnowflakeConverterIssue23517(SnowflakeConverter):
"""
Converter for Python 3.4.3 and 3.5.0
This is to address http://bugs.python.org/issue23517
"""
def __init__(self, **kwargs):
super(SnowflakeConverterIssue23517, self).__init__(**kwargs)
logger.debug('initialized')
def _TIMESTAMP_TZ_to_python(self, ctx):
"""
TIMESTAMP TZ to datetime
The timezone offset is piggybacked.
"""
scale = ctx['scale']
def conv0(encoded_value):
value, tz = encoded_value.split()
tzinfo = _generate_tzinfo_from_tzoffset(int(tz) - 1440)
microseconds = float(value)
t = ZERO_EPOCH + timedelta(seconds=microseconds)
if pytz.utc != tzinfo:
t += tzinfo.utcoffset(t)
return t.replace(tzinfo=tzinfo)
def conv(encoded_value):
value, tz = encoded_value.split()
tzinfo = _generate_tzinfo_from_tzoffset(int(tz) - 1440)
microseconds = float(value[0:-scale + 6])
t = ZERO_EPOCH + timedelta(seconds=microseconds)
if pytz.utc != tzinfo:
t += tzinfo.utcoffset(t)
return t.replace(tzinfo=tzinfo)
return conv if scale > 6 else conv0
def _TIMESTAMP_NTZ_to_python(self, ctx):
"""
TIMESTAMP NTZ to datetime
No timezone info is attached.
"""
scale = ctx['scale']
def conv0(value):
logger.debug('timestamp_ntz: %s', value)
return ZERO_EPOCH + timedelta(seconds=(float(value)))
def conv(value):
logger.debug('timestamp_ntz: %s', value)
microseconds = float(value[0:-scale + 6])
return ZERO_EPOCH + timedelta(seconds=(microseconds))
return conv if scale > 6 else conv0
def _TIMESTAMP_LTZ_to_python(self, ctx):
def conv(value):
t, _ = self._pre_TIMESTAMP_LTZ_to_python(value, ctx)
return t
return conv
def _TIME_to_python(self, ctx):
"""
TIME to formatted string, SnowflakeDateTime, or datetime.time
No timezone is attached.
"""
scale = ctx['scale']
conv0 = lambda value: (
ZERO_EPOCH + timedelta(seconds=(float(value)))).time()
def conv(value):
microseconds = float(value[0:-scale + 6])
return (ZERO_EPOCH + timedelta(seconds=(microseconds))).time()
return conv if scale > 6 else conv0