-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
6 changed files
with
313 additions
and
277 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,20 @@ | ||
"""BINN (https://github.com/liteserver/binn) serialization/deserialization module""" | ||
|
||
__version__ = '0.9.1' | ||
__all__ = [ | ||
'dumps', 'loads', | ||
'BINNDecoder', 'BINNEncoder', | ||
] | ||
|
||
__author__ = 'Miron Jakubowski <mijakubowski@gmail.com>' | ||
|
||
from .encoder import BINNEncoder | ||
from .decoder import BINNDecoder | ||
|
||
def dumps(value): | ||
"""Serialize ``value`` to BINN data""" | ||
return BINNEncoder().encode(value) | ||
|
||
def loads(buffer): | ||
"""Deserialize buffer to object""" | ||
return BINNDecoder(buffer).decode() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,26 @@ | ||
"""Common types for encoder and decoder""" | ||
|
||
DATETIME_FORMAT = "%Y-%m-%d %H:%M:%S" | ||
|
||
# Data Formats | ||
BINN_OBJECT = b'\xe2' | ||
BINN_LIST = b'\xe0' | ||
|
||
BINN_STRING = b'\xa0' | ||
BINN_DATETIME = b'\xa1' # in DATETIME_FORMAT format | ||
|
||
BINN_BLOB = b'\xc0' | ||
|
||
BINN_NULL = b'\x00' | ||
BINN_TRUE = b'\x01' | ||
BINN_FALSE = b'\x02' | ||
|
||
BINN_UINT8 = b'\x20' | ||
BINN_INT8 = b'\x21' | ||
BINN_UINT16 = b'\x40' | ||
BINN_INT16 = b'\x41' | ||
BINN_UINT32 = b'\x60' | ||
BINN_INT32 = b'\x61' | ||
BINN_UINT64 = b'\x80' | ||
BINN_INT64 = b'\x81' | ||
BINN_FLOAT64 = b'\x82' |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,96 @@ | ||
"""Implementation of BINNDecoder""" | ||
|
||
import io | ||
from struct import unpack | ||
from time import strptime | ||
|
||
import pybinn.datatypes as types | ||
|
||
class BINNDecoder(object): | ||
"""BINN <https://github.com/liteserver/binn> decoder for Python""" | ||
def __init__(self, buffer): | ||
self._buffer = io.BytesIO(buffer) | ||
|
||
def decode(self): | ||
"""Decode date from buffer""" | ||
binntype = self._buffer.read(1) | ||
if binntype == types.BINN_STRING: | ||
return self._decode_str() | ||
if binntype == types.BINN_UINT8: | ||
return unpack('B', self._buffer.read(1))[0] | ||
if binntype == types.BINN_INT8: | ||
return unpack('b', self._buffer.read(1))[0] | ||
if binntype == types.BINN_UINT16: | ||
return unpack('H', self._buffer.read(2))[0] | ||
if binntype == types.BINN_INT16: | ||
return unpack('h', self._buffer.read(2))[0] | ||
if binntype == types.BINN_UINT32: | ||
return unpack('I', self._buffer.read(4))[0] | ||
if binntype == types.BINN_INT32: | ||
return unpack('i', self._buffer.read(4))[0] | ||
if binntype == types.BINN_UINT64: | ||
return unpack('L', self._buffer.read(8))[0] | ||
if binntype == types.BINN_INT64: | ||
return unpack('l', self._buffer.read(8))[0] | ||
if binntype == types.BINN_FLOAT64: | ||
return unpack('d', self._buffer.read(8))[0] | ||
if binntype == types.BINN_BLOB: | ||
return self._decode_bytes() | ||
if binntype == types.BINN_DATETIME: | ||
return self._decode_time() | ||
if binntype == types.BINN_LIST: | ||
return self._decode_list() | ||
if binntype == types.BINN_OBJECT: | ||
return self._decode_dict() | ||
if binntype == types.BINN_TRUE: | ||
return True | ||
if binntype == types.BINN_FALSE: | ||
return False | ||
if binntype == types.BINN_NULL: | ||
return None | ||
raise TypeError("Invalid Binn data format: {}".format(type)) | ||
|
||
def _decode_str(self): | ||
size = self._from_varint() | ||
value = str(self._buffer.read(size), 'utf8') | ||
# Ready null terminator byte to advance position | ||
self._buffer.read(1) | ||
return value | ||
|
||
def _decode_bytes(self): | ||
size = unpack('I', self._buffer.read(4))[0] | ||
return self._buffer.read(size) | ||
|
||
def _decode_time(self): | ||
time_str = self._decode_str() | ||
# decode time in UTC timezone | ||
time_str += "+GMT" | ||
return strptime(time_str, types.DATETIME_FORMAT + "+%Z") | ||
|
||
def _decode_list(self): | ||
# read container size | ||
self._from_varint() | ||
count = self._from_varint() | ||
result = [] | ||
for i in range(count): | ||
result.append(self.decode()) | ||
return result | ||
|
||
def _decode_dict(self): | ||
# read container size | ||
self._from_varint() | ||
count = self._from_varint() | ||
result = {} | ||
for i in range(count): | ||
key_size = unpack('B', self._buffer.read(1))[0] | ||
key = str(self._buffer.read(key_size), 'utf8') | ||
result[key] = self.decode() | ||
return result | ||
|
||
def _from_varint(self): | ||
value = unpack('B', self._buffer.read(1))[0] | ||
if value & 0x80: | ||
self._buffer.seek(self._buffer.tell() - 1) | ||
value = unpack('>I', self._buffer.read(4))[0] | ||
value &= 0x7FFFFFFF | ||
return value |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,149 @@ | ||
"""Implementation of BINNEncoder""" | ||
|
||
from io import BytesIO | ||
from struct import pack | ||
from time import struct_time, strftime | ||
|
||
import pybinn.datatypes as types | ||
|
||
class BINNEncoder(object): | ||
"""BINN <https://github.com/liteserver/binn> encoder for Python""" | ||
def __init__(self): | ||
self._buffer = BytesIO() | ||
|
||
def encode(self, value): | ||
"""Encode value""" | ||
self._encode(value) | ||
return self._buffer.getvalue() | ||
|
||
def _encode(self, value): | ||
value_type = type(value) | ||
if value_type == type(None): | ||
self._buffer.write(types.BINN_NULL) | ||
return | ||
if value_type is str: | ||
self._encode_str(value) | ||
return | ||
if value_type is int: | ||
self._encode_int(value) | ||
return | ||
if value_type is bool: | ||
self._encode_bool(value) | ||
return | ||
if value_type is float: | ||
self._encode_float(value) | ||
return | ||
if value_type is bytes: | ||
self._encode_bytes(value) | ||
return | ||
if value_type is struct_time: | ||
self._encode_time(value) | ||
return | ||
if value_type is list: | ||
self._encode_list(value) | ||
return | ||
if value_type is dict: | ||
self._encode_dict(value) | ||
return | ||
raise TypeError("Invalid type for encode: {}".format(value_type)) | ||
|
||
def _encode_str(self, value, data_type=types.BINN_STRING): | ||
size = len(value.encode('utf8')) | ||
self._buffer.write(data_type) | ||
self._buffer.write(self._to_varint(size)) | ||
self._buffer.write(value.encode('utf8') + b'\0') | ||
|
||
def _encode_uint(self, value): | ||
# unsigned char (byte) | ||
if value < 0x100: | ||
self._buffer.write(types.BINN_UINT8) | ||
self._buffer.write(pack('B', value)) | ||
return | ||
# unsigned short | ||
if value < 0x10000: | ||
self._buffer.write(types.BINN_UINT16) | ||
self._buffer.write(pack('H', value)) | ||
return | ||
# unsigned int | ||
if value < 0x100000000: | ||
self._buffer.write(types.BINN_UINT32) | ||
self._buffer.write(pack('I', value)) | ||
return | ||
# unsigned long | ||
if value < 0x10000000000000000: | ||
self._buffer.write(types.BINN_UINT64) | ||
self._buffer.write(pack('L', value)) | ||
return | ||
raise OverflowError("Value to big {}.".format(hex(value))) | ||
|
||
def _encode_int(self, value): | ||
if value >= 0: | ||
return self._encode_uint(value) | ||
# signed char | ||
if value >= -0x80: | ||
self._buffer.write(types.BINN_INT8) | ||
self._buffer.write(pack('b', value)) | ||
return | ||
# short | ||
if value >= -0x8000: | ||
self._buffer.write(types.BINN_INT16) | ||
self._buffer.write(pack('h', value)) | ||
return | ||
# int | ||
if value >= -0x80000000: | ||
self._buffer.write(types.BINN_INT32) | ||
self._buffer.write(pack('i', value)) | ||
return | ||
# long | ||
if value >= -0x8000000000000000: | ||
self._buffer.write(types.BINN_INT64) | ||
self._buffer.write(pack('l', value)) | ||
|
||
def _encode_bool(self, value): | ||
if value: | ||
self._buffer.write(types.BINN_TRUE) | ||
if not value: | ||
self._buffer.write(types.BINN_FALSE) | ||
|
||
def _encode_float(self, value): | ||
self._buffer.write(types.BINN_FLOAT64) | ||
self._buffer.write(pack('d', value)) | ||
|
||
def _encode_bytes(self, value): | ||
self._buffer.write(types.BINN_BLOB) | ||
self._buffer.write(pack('I', len(value))) | ||
self._buffer.write(value) | ||
|
||
def _encode_time(self, value): | ||
time_str = strftime(types.DATETIME_FORMAT, value) | ||
self._encode_str(time_str, types.BINN_DATETIME) | ||
|
||
def _encode_list(self, value): | ||
with BytesIO() as buffer: | ||
for item in value: | ||
buffer.write(BINNEncoder().encode(item)) | ||
|
||
self._buffer.write(types.BINN_LIST) | ||
self._buffer.write(self._to_varint(buffer.tell() + 3)) | ||
self._buffer.write(self._to_varint(len(value))) | ||
self._buffer.write(buffer.getvalue()) | ||
|
||
def _encode_dict(self, value): | ||
with BytesIO() as buffer: | ||
for key in value: | ||
if len(key) > 255: | ||
raise OverflowError("Key '{}' is to big. Max length is 255.".format(key)) | ||
buffer.write(pack('B', len(key))) | ||
buffer.write(key.encode('utf8')) | ||
buffer.write(BINNEncoder().encode(value[key])) | ||
|
||
self._buffer.write(types.BINN_OBJECT) | ||
self._buffer.write(self._to_varint(buffer.tell() + 3)) | ||
self._buffer.write(self._to_varint(len(value))) | ||
self._buffer.write(buffer.getvalue()) | ||
|
||
|
||
def _to_varint(self, value): | ||
if value > 127: | ||
return pack('>I', value | 0x80000000) | ||
return pack('B', value) |
Oops, something went wrong.