Skip to content

Commit

Permalink
ORC-586: [C++] fix memory leak in StructColumnReader
Browse files Browse the repository at this point in the history
Substituted raw pointers to std::unique_ptrs in StructColumnReader
in order to prevent memory leaks.

This fixes #466
  • Loading branch information
boroknagyz authored and wgtmac committed Jan 13, 2020
1 parent 20da4a1 commit 028261a
Show file tree
Hide file tree
Showing 8 changed files with 30 additions and 94 deletions.
25 changes: 4 additions & 21 deletions c++/src/ColumnPrinter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -169,22 +169,20 @@ namespace orc {
private:
const unsigned char *tags;
const uint64_t* offsets;
std::vector<ColumnPrinter*> fieldPrinter;
std::vector<std::unique_ptr<ColumnPrinter>> fieldPrinter;

public:
UnionColumnPrinter(std::string&, const Type& type);
virtual ~UnionColumnPrinter() override;
void printRow(uint64_t rowId) override;
void reset(const ColumnVectorBatch& batch) override;
};

class StructColumnPrinter: public ColumnPrinter {
private:
std::vector<ColumnPrinter*> fieldPrinter;
std::vector<std::unique_ptr<ColumnPrinter>> fieldPrinter;
std::vector<std::string> fieldNames;
public:
StructColumnPrinter(std::string&, const Type& type);
virtual ~StructColumnPrinter() override;
void printRow(uint64_t rowId) override;
void reset(const ColumnVectorBatch& batch) override;
};
Expand Down Expand Up @@ -540,14 +538,7 @@ namespace orc {
tags(nullptr),
offsets(nullptr) {
for(unsigned int i=0; i < type.getSubtypeCount(); ++i) {
fieldPrinter.push_back(createColumnPrinter(buffer, type.getSubtype(i))
.release());
}
}

UnionColumnPrinter::~UnionColumnPrinter() {
for (size_t i = 0; i < fieldPrinter.size(); i++) {
delete fieldPrinter[i];
fieldPrinter.push_back(createColumnPrinter(buffer, type.getSubtype(i)));
}
}

Expand Down Expand Up @@ -582,15 +573,7 @@ namespace orc {
): ColumnPrinter(_buffer) {
for(unsigned int i=0; i < type.getSubtypeCount(); ++i) {
fieldNames.push_back(type.getFieldName(i));
fieldPrinter.push_back(createColumnPrinter(buffer,
type.getSubtype(i))
.release());
}
}

StructColumnPrinter::~StructColumnPrinter() {
for (size_t i = 0; i < fieldPrinter.size(); i++) {
delete fieldPrinter[i];
fieldPrinter.push_back(createColumnPrinter(buffer, type.getSubtype(i)));
}
}

Expand Down
40 changes: 11 additions & 29 deletions c++/src/ColumnReader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -835,11 +835,10 @@ namespace orc {

class StructColumnReader: public ColumnReader {
private:
std::vector<ColumnReader*> children;
std::vector<std::unique_ptr<ColumnReader>> children;

public:
StructColumnReader(const Type& type, StripeStreams& stipe);
~StructColumnReader() override;

uint64_t skip(uint64_t numValues) override;

Expand Down Expand Up @@ -871,7 +870,7 @@ namespace orc {
for(unsigned int i=0; i < type.getSubtypeCount(); ++i) {
const Type& child = *type.getSubtype(i);
if (selectedColumns[static_cast<uint64_t>(child.getColumnId())]) {
children.push_back(buildReader(child, stripe).release());
children.push_back(buildReader(child, stripe));
}
}
break;
Expand All @@ -883,16 +882,10 @@ namespace orc {
}
}

StructColumnReader::~StructColumnReader() {
for (size_t i=0; i<children.size(); i++) {
delete children[i];
}
}

uint64_t StructColumnReader::skip(uint64_t numValues) {
numValues = ColumnReader::skip(numValues);
for(std::vector<ColumnReader*>::iterator ptr=children.begin(); ptr != children.end(); ++ptr) {
(*ptr)->skip(numValues);
for(auto& ptr : children) {
ptr->skip(numValues);
}
return numValues;
}
Expand All @@ -916,13 +909,12 @@ namespace orc {
ColumnReader::next(rowBatch, numValues, notNull);
uint64_t i=0;
notNull = rowBatch.hasNulls? rowBatch.notNull.data() : nullptr;
for(std::vector<ColumnReader*>::iterator ptr=children.begin();
ptr != children.end(); ++ptr, ++i) {
for(auto iter = children.begin(); iter != children.end(); ++iter, ++i) {
if (encoded) {
(*ptr)->nextEncoded(*(dynamic_cast<StructVectorBatch&>(rowBatch).fields[i]),
(*iter)->nextEncoded(*(dynamic_cast<StructVectorBatch&>(rowBatch).fields[i]),
numValues, notNull);
} else {
(*ptr)->next(*(dynamic_cast<StructVectorBatch&>(rowBatch).fields[i]),
(*iter)->next(*(dynamic_cast<StructVectorBatch&>(rowBatch).fields[i]),
numValues, notNull);
}
}
Expand All @@ -932,10 +924,8 @@ namespace orc {
std::unordered_map<uint64_t, PositionProvider>& positions) {
ColumnReader::seekToRowGroup(positions);

for(std::vector<ColumnReader*>::iterator ptr = children.begin();
ptr != children.end();
++ptr) {
(*ptr)->seekToRowGroup(positions);
for(auto& ptr : children) {
ptr->seekToRowGroup(positions);
}
}

Expand Down Expand Up @@ -1230,13 +1220,12 @@ namespace orc {
class UnionColumnReader: public ColumnReader {
private:
std::unique_ptr<ByteRleDecoder> rle;
std::vector<ColumnReader*> childrenReader;
std::vector<std::unique_ptr<ColumnReader>> childrenReader;
std::vector<int64_t> childrenCounts;
uint64_t numChildren;

public:
UnionColumnReader(const Type& type, StripeStreams& stipe);
~UnionColumnReader() override;

uint64_t skip(uint64_t numValues) override;

Expand Down Expand Up @@ -1275,18 +1264,11 @@ namespace orc {
for(unsigned int i=0; i < numChildren; ++i) {
const Type &child = *type.getSubtype(i);
if (selectedColumns[static_cast<size_t>(child.getColumnId())]) {
childrenReader[i] = buildReader(child, stripe).release();
childrenReader[i] = buildReader(child, stripe);
}
}
}

UnionColumnReader::~UnionColumnReader() {
for(std::vector<ColumnReader*>::iterator itr = childrenReader.begin();
itr != childrenReader.end(); ++itr) {
delete *itr;
}
}

uint64_t UnionColumnReader::skip(uint64_t numValues) {
numValues = ColumnReader::skip(numValues);
const uint64_t BUFFER_SIZE = 1024;
Expand Down
22 changes: 4 additions & 18 deletions c++/src/ColumnWriter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,6 @@ namespace orc {
const Type& type,
const StreamsFactory& factory,
const WriterOptions& options);
~StructColumnWriter() override;

virtual void add(ColumnVectorBatch& rowBatch,
uint64_t offset,
Expand Down Expand Up @@ -285,7 +284,7 @@ namespace orc {
virtual void reset() override;

private:
std::vector<ColumnWriter *> children;
std::vector<std::unique_ptr<ColumnWriter>> children;
};

StructColumnWriter::StructColumnWriter(
Expand All @@ -295,20 +294,14 @@ namespace orc {
ColumnWriter(type, factory, options) {
for(unsigned int i = 0; i < type.getSubtypeCount(); ++i) {
const Type& child = *type.getSubtype(i);
children.push_back(buildWriter(child, factory, options).release());
children.push_back(buildWriter(child, factory, options));
}

if (enableIndex) {
recordPosition();
}
}

StructColumnWriter::~StructColumnWriter() {
for (uint32_t i = 0; i < children.size(); ++i) {
delete children[i];
}
}

void StructColumnWriter::add(
ColumnVectorBatch& rowBatch,
uint64_t offset,
Expand Down Expand Up @@ -2666,7 +2659,6 @@ namespace orc {
UnionColumnWriter(const Type& type,
const StreamsFactory& factory,
const WriterOptions& options);
~UnionColumnWriter() override;

virtual void add(ColumnVectorBatch& rowBatch,
uint64_t offset,
Expand Down Expand Up @@ -2703,7 +2695,7 @@ namespace orc {

private:
std::unique_ptr<ByteRleEncoder> rleEncoder;
std::vector<ColumnWriter*> children;
std::vector<std::unique_ptr<ColumnWriter>> children;
};

UnionColumnWriter::UnionColumnWriter(const Type& type,
Expand All @@ -2718,20 +2710,14 @@ namespace orc {
for (uint64_t i = 0; i != type.getSubtypeCount(); ++i) {
children.push_back(buildWriter(*type.getSubtype(i),
factory,
options).release());
options));
}

if (enableIndex) {
recordPosition();
}
}

UnionColumnWriter::~UnionColumnWriter() {
for (uint32_t i = 0; i < children.size(); ++i) {
delete children[i];
}
}

void UnionColumnWriter::add(ColumnVectorBatch& rowBatch,
uint64_t offset,
uint64_t numValues,
Expand Down
6 changes: 3 additions & 3 deletions c++/src/Compression.cc
Original file line number Diff line number Diff line change
Expand Up @@ -405,8 +405,8 @@ DIAGNOSTIC_PUSH
MemoryPool& _pool
): pool(_pool),
blockSize(_blockSize),
input(std::move(inStream)),
buffer(pool, _blockSize) {
input.reset(inStream.release());
zstream.next_in = nullptr;
zstream.avail_in = 0;
zstream.zalloc = nullptr;
Expand Down Expand Up @@ -683,7 +683,8 @@ DIAGNOSTIC_POP
(std::unique_ptr<SeekableInputStream> inStream,
size_t bufferSize,
MemoryPool& _pool
) : pool(_pool),
) : input(std::move(inStream)),
pool(_pool),
inputBuffer(pool, bufferSize),
outputBuffer(pool, bufferSize),
state(DECOMPRESS_HEADER),
Expand All @@ -693,7 +694,6 @@ DIAGNOSTIC_POP
inputBufferPtr(nullptr),
inputBufferPtrEnd(nullptr),
bytesReturned(0) {
input.reset(inStream.release());
}

bool BlockDecompressionStream::Next(const void** data, int*size) {
Expand Down
8 changes: 2 additions & 6 deletions c++/src/Options.hh
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,7 @@ namespace orc {

ReaderOptions::ReaderOptions(ReaderOptions& rhs) {
// swap privateBits with rhs
ReaderOptionsPrivate* l = privateBits.release();
privateBits.reset(rhs.privateBits.release());
rhs.privateBits.reset(l);
privateBits.swap(rhs.privateBits);
}

ReaderOptions& ReaderOptions::operator=(const ReaderOptions& rhs) {
Expand Down Expand Up @@ -155,9 +153,7 @@ namespace orc {

RowReaderOptions::RowReaderOptions(RowReaderOptions& rhs) {
// swap privateBits with rhs
RowReaderOptionsPrivate* l = privateBits.release();
privateBits.reset(rhs.privateBits.release());
rhs.privateBits.reset(l);
privateBits.swap(rhs.privateBits);
}

RowReaderOptions& RowReaderOptions::operator=(const RowReaderOptions& rhs) {
Expand Down
15 changes: 4 additions & 11 deletions c++/src/TypeImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -67,19 +67,12 @@ namespace orc {
columnId = static_cast<int64_t>(root);
uint64_t current = root + 1;
for(uint64_t i=0; i < subtypeCount; ++i) {
current = dynamic_cast<TypeImpl*>(subTypes[i])->assignIds(current);
current = dynamic_cast<TypeImpl*>(subTypes[i].get())->assignIds(current);
}
maximumColumnId = static_cast<int64_t>(current) - 1;
return current;
}

TypeImpl::~TypeImpl() {
for (std::vector<Type*>::iterator it = subTypes.begin();
it != subTypes.end(); it++) {
delete (*it) ;
}
}

void TypeImpl::ensureIdAssigned() const {
if (columnId == -1) {
const TypeImpl* root = this;
Expand Down Expand Up @@ -109,7 +102,7 @@ namespace orc {
}

const Type* TypeImpl::getSubtype(uint64_t i) const {
return subTypes[i];
return subTypes[i].get();
}

const std::string& TypeImpl::getFieldName(uint64_t i) const {
Expand All @@ -134,8 +127,8 @@ namespace orc {
}

void TypeImpl::addChildType(std::unique_ptr<Type> childType) {
TypeImpl* child = dynamic_cast<TypeImpl*>(childType.release());
subTypes.push_back(child);
TypeImpl* child = dynamic_cast<TypeImpl*>(childType.get());
subTypes.push_back(std::move(childType));
if (child != nullptr) {
child->parent = this;
}
Expand Down
4 changes: 1 addition & 3 deletions c++/src/TypeImpl.hh
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ namespace orc {
mutable int64_t columnId;
mutable int64_t maximumColumnId;
TypeKind kind;
std::vector<Type*> subTypes;
std::vector<std::unique_ptr<Type>> subTypes;
std::vector<std::string> fieldNames;
uint64_t subtypeCount;
uint64_t maxLength;
Expand All @@ -58,8 +58,6 @@ namespace orc {
TypeImpl(TypeKind kind, uint64_t precision,
uint64_t scale);

virtual ~TypeImpl() override;

uint64_t getColumnId() const override;

uint64_t getMaximumColumnId() const override;
Expand Down
4 changes: 1 addition & 3 deletions c++/src/Writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,7 @@ namespace orc {

WriterOptions::WriterOptions(WriterOptions& rhs) {
// swap privateBits with rhs
WriterOptionsPrivate* l = privateBits.release();
privateBits.reset(rhs.privateBits.release());
rhs.privateBits.reset(l);
privateBits.swap(rhs.privateBits);
}

WriterOptions& WriterOptions::operator=(const WriterOptions& rhs) {
Expand Down

0 comments on commit 028261a

Please sign in to comment.