forked from citusdata/cstore_fdw
-
Notifications
You must be signed in to change notification settings - Fork 0
/
cstore_compression.c
171 lines (141 loc) · 4.86 KB
/
cstore_compression.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
/*-------------------------------------------------------------------------
*
* cstore_compression.c
*
* This file contains compression/decompression functions definitions
* used in cstore_fdw.
*
* Copyright (c) 2016, Citus Data, Inc.
*
* $Id$
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "cstore_fdw.h"
#if PG_VERSION_NUM >= 90500
#include "common/pg_lzcompress.h"
#else
#include "utils/pg_lzcompress.h"
#endif
#if PG_VERSION_NUM >= 90500
/*
* The information at the start of the compressed data. This decription is taken
* from pg_lzcompress in pre-9.5 version of PostgreSQL.
*/
typedef struct CStoreCompressHeader
{
int32 vl_len_; /* varlena header (do not touch directly!) */
int32 rawsize;
} CStoreCompressHeader;
/*
* Utilities for manipulation of header information for compressed data
*/
#define CSTORE_COMPRESS_HDRSZ ((int32) sizeof(CStoreCompressHeader))
#define CSTORE_COMPRESS_RAWSIZE(ptr) (((CStoreCompressHeader *) (ptr))->rawsize)
#define CSTORE_COMPRESS_RAWDATA(ptr) (((char *) (ptr)) + CSTORE_COMPRESS_HDRSZ)
#define CSTORE_COMPRESS_SET_RAWSIZE(ptr, len) (((CStoreCompressHeader *) (ptr))->rawsize = (len))
#else
#define CSTORE_COMPRESS_HDRSZ (0)
#define CSTORE_COMPRESS_RAWSIZE(ptr) (PGLZ_RAW_SIZE((PGLZ_Header *) buffer->data))
#define CSTORE_COMPRESS_RAWDATA(ptr) (((PGLZ_Header *) (ptr)))
#define CSTORE_COMPRESS_SET_RAWSIZE(ptr, len) (((CStoreCompressHeader *) (ptr))->rawsize = (len))
#endif
/*
* CompressBuffer compresses the given buffer with the given compression type
* outputBuffer enlarged to contain compressed data. The function returns true
* if compression is done, returns false if compression is not done.
* outputBuffer is valid only if the function returns true.
*/
bool
CompressBuffer(StringInfo inputBuffer, StringInfo outputBuffer,
CompressionType compressionType)
{
uint64 maximumLength = PGLZ_MAX_OUTPUT(inputBuffer->len) + CSTORE_COMPRESS_HDRSZ;
bool compressionResult = false;
#if PG_VERSION_NUM >= 90500
int32 compressedByteCount = 0;
#endif
if (compressionType != COMPRESSION_PG_LZ)
{
return false;
}
resetStringInfo(outputBuffer);
enlargeStringInfo(outputBuffer, maximumLength);
#if PG_VERSION_NUM >= 90500
compressedByteCount = pglz_compress((const char *) inputBuffer->data,
inputBuffer->len,
CSTORE_COMPRESS_RAWDATA(outputBuffer->data),
PGLZ_strategy_always);
if (compressedByteCount >= 0)
{
CSTORE_COMPRESS_SET_RAWSIZE(outputBuffer->data, inputBuffer->len);
SET_VARSIZE_COMPRESSED(outputBuffer->data,
compressedByteCount + CSTORE_COMPRESS_HDRSZ);
compressionResult = true;
}
#else
compressionResult = pglz_compress(inputBuffer->data, inputBuffer->len,
CSTORE_COMPRESS_RAWDATA(outputBuffer->data),
PGLZ_strategy_always);
#endif
if (compressionResult)
{
outputBuffer->len = VARSIZE(outputBuffer->data);
}
return compressionResult;
}
/*
* DecompressBuffer decompresses the given buffer with the given compression
* type. This function returns the buffer as-is when no compression is applied.
*/
StringInfo
DecompressBuffer(StringInfo buffer, CompressionType compressionType)
{
StringInfo decompressedBuffer = NULL;
Assert(compressionType == COMPRESSION_NONE || compressionType == COMPRESSION_PG_LZ);
if (compressionType == COMPRESSION_NONE)
{
/* in case of no compression, return buffer */
decompressedBuffer = buffer;
}
else if (compressionType == COMPRESSION_PG_LZ)
{
uint32 compressedDataSize = VARSIZE(buffer->data) - CSTORE_COMPRESS_HDRSZ;
uint32 decompressedDataSize = CSTORE_COMPRESS_RAWSIZE(buffer->data);
char *decompressedData = NULL;
#if PG_VERSION_NUM >= 90500
int32 decompressedByteCount = 0;
#endif
if (compressedDataSize + CSTORE_COMPRESS_HDRSZ != buffer->len)
{
ereport(ERROR, (errmsg("cannot decompress the buffer"),
errdetail("Expected %u bytes, but received %u bytes",
compressedDataSize, buffer->len)));
}
decompressedData = palloc0(decompressedDataSize);
#if PG_VERSION_NUM >= 90500
#if PG_VERSION_NUM >= 120000
decompressedByteCount = pglz_decompress(CSTORE_COMPRESS_RAWDATA(buffer->data),
compressedDataSize, decompressedData,
decompressedDataSize, true);
#else
decompressedByteCount = pglz_decompress(CSTORE_COMPRESS_RAWDATA(buffer->data),
compressedDataSize, decompressedData,
decompressedDataSize);
#endif
if (decompressedByteCount < 0)
{
ereport(ERROR, (errmsg("cannot decompress the buffer"),
errdetail("compressed data is corrupted")));
}
#else
pglz_decompress((PGLZ_Header *) buffer->data, decompressedData);
#endif
decompressedBuffer = palloc0(sizeof(StringInfoData));
decompressedBuffer->data = decompressedData;
decompressedBuffer->len = decompressedDataSize;
decompressedBuffer->maxlen = decompressedDataSize;
}
return decompressedBuffer;
}