Skip to content

Commit

Permalink
Make recordio simple
Browse files Browse the repository at this point in the history
  • Loading branch information
reyoung authored and Xin Pan committed Mar 11, 2018
1 parent 0fc7d3c commit 6272b57
Show file tree
Hide file tree
Showing 25 changed files with 202 additions and 2,711 deletions.
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ include(external/pybind11) # download pybind11
include(external/cares)
include(external/grpc)
include(external/snappy) # download snappy
include(external/snappystream)

include(cudnn) # set cudnn libraries, must before configure
include(cupti)
Expand Down
58 changes: 58 additions & 0 deletions cmake/external/snappystream.cmake
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

IF(MOBILE_INFERENCE)
return()
ENDIF()

include (ExternalProject)

# NOTE: snappy is needed when linking with recordio

SET(SNAPPYSTREAM_SOURCES_DIR ${THIRD_PARTY_PATH}/snappy_stream)
SET(SNAPPYSTREAM_INSTALL_DIR ${THIRD_PARTY_PATH}/install/snappy_stream)
SET(SNAPPYSTREAM_INCLUDE_DIR "${SNAPPYSTREAM_INSTALL_DIR}/include/" CACHE PATH "snappy stream include directory." FORCE)

ExternalProject_Add(
extern_snappystream
GIT_REPOSITORY "https://github.com/hoxnox/snappystream.git"
GIT_TAG "0.2.8"
PREFIX ${SNAPPYSTREAM_SOURCES_DIR}
UPDATE_COMMAND ""
CMAKE_ARGS -DCMAKE_CXX_COMPILER=${CMAKE_CXX_COMPILER}
-DCMAKE_C_COMPILER=${CMAKE_C_COMPILER}
-DCMAKE_CXX_FLAGS=${CMAKE_CXX_FLAGS}
-DCMAKE_C_FLAGS=${CMAKE_C_FLAGS}
-DCMAKE_INSTALL_PREFIX=${SNAPPY_INSTALL_DIR}
-DCMAKE_INSTALL_LIBDIR=${SNAPPY_INSTALL_DIR}/lib
-DCMAKE_POSITION_INDEPENDENT_CODE=ON
-DCMAKE_BUILD_TYPE=${THIRD_PARTY_BUILD_TYPE}
-DSNAPPY_ROOT=${SNAPPY_INSTALL_DIR}
${EXTERNAL_OPTIONAL_ARGS}
CMAKE_CACHE_ARGS
-DCMAKE_INSTALL_PREFIX:PATH=${SNAPPYSTREAM_INSTALL_DIR}
-DCMAKE_INSTALL_LIBDIR:PATH=${SNAPPYSTREAM_INSTALL_DIR}/lib
-DCMAKE_BUILD_TYPE:STRING=${THIRD_PARTY_BUILD_TYPE}
BUILD_COMMAND make -j8
INSTALL_COMMAND make install
DEPENDS snappy
)

add_library(snappystream STATIC IMPORTED GLOBAL)
set_property(TARGET snappystream PROPERTY IMPORTED_LOCATION
"${SNAPPYSTREAM_INSTALL_DIR}/lib/libsnappystream.a")

include_directories(${SNAPPYSTREAM_INCLUDE_DIR})
add_dependencies(snappystream extern_snappystream)
14 changes: 3 additions & 11 deletions paddle/fluid/recordio/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,14 +1,6 @@
# internal library.
cc_library(io SRCS io.cc DEPS stringpiece)
cc_test(io_test SRCS io_test.cc DEPS io)
cc_library(header SRCS header.cc DEPS io)
cc_library(header SRCS header.cc)
cc_test(header_test SRCS header_test.cc DEPS header)
cc_library(chunk SRCS chunk.cc DEPS snappy)
cc_library(chunk SRCS chunk.cc DEPS snappystream snappy header zlib)
cc_test(chunk_test SRCS chunk_test.cc DEPS chunk)
cc_library(range_scanner SRCS range_scanner.cc DEPS io chunk)
cc_test(range_scanner_test SRCS range_scanner_test.cc DEPS range_scanner)
cc_library(scanner SRCS scanner.cc DEPS range_scanner)
cc_test(scanner_test SRCS scanner_test.cc DEPS scanner)
# exported library.
cc_library(recordio SRCS recordio.cc DEPS scanner chunk header)
cc_test(recordio_test SRCS recordio_test.cc DEPS scanner)
cc_library(recordio DEPS chunk header)
156 changes: 89 additions & 67 deletions paddle/fluid/recordio/chunk.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,97 +14,119 @@

#include "paddle/fluid/recordio/chunk.h"

#include <cstring>
#include <memory>
#include <sstream>
#include <utility>

#include "snappy.h"

#include "paddle/fluid/recordio/crc32.h"
#include "paddle/fluid/platform/enforce.h"
#include "snappystream.hpp"
#include "zlib.h"

namespace paddle {
namespace recordio {
constexpr size_t kMaxBufSize = 1024;

void Chunk::Add(const char* record, size_t length) {
records_.emplace_after(std::string(record, length));
num_bytes_ += s.size() * sizeof(char);
template <typename Callback>
static void ReadStreamByBuf(std::istream& in, int limit, Callback callback) {
char buf[kMaxBufSize];
std::streamsize actual_size;
size_t counter = 0;
do {
auto actual_max =
limit > 0 ? std::min(limit - counter, kMaxBufSize) : kMaxBufSize;
actual_size = in.readsome(buf, actual_max);
if (actual_size == 0) {
break;
}
callback(buf, actual_size);
if (limit > 0) {
counter += actual_size;
}
} while (actual_size == kMaxBufSize);
}

bool Chunk::Dump(Stream* fo, Compressor ct) {
static void PipeStream(std::istream& in, std::ostream& os) {
ReadStreamByBuf(
in, -1, [&os](const char* buf, size_t len) { os.write(buf, len); });
}
static uint32_t Crc32Stream(std::istream& in, int limit = -1) {
auto crc = crc32(0, nullptr, 0);
ReadStreamByBuf(in, limit, [&crc](const char* buf, size_t len) {
crc = crc32(crc, reinterpret_cast<const Bytef*>(buf), len);
});
return crc;
}

bool Chunk::Write(std::ostream& os, Compressor ct) const {
// NOTE(dzhwinter): don't check records.numBytes instead, because
// empty records are allowed.
if (records_.size() == 0) return false;
if (records_.empty()) {
return false;
}
std::stringstream sout;
std::unique_ptr<std::ostream> compressed_stream;
switch (ct) {
case Compressor::kNoCompress:
break;
case Compressor::kSnappy:
compressed_stream.reset(new snappy::oSnappyStream(sout));
break;
default:
PADDLE_THROW("Not implemented");
}

std::ostream& buf_stream = compressed_stream ? *compressed_stream : sout;

// pack the record into consecutive memory for compress
std::ostringstream os;
for (auto& record : records_) {
os.write(record.size(), sizeof(size_t));
os.write(record.data(), static_cast<std::streamsize>(record.size()));
size_t sz = record.size();
buf_stream.write(reinterpret_cast<const char*>(&sz), sizeof(uint32_t))
.write(record.data(), record.size());
}

std::unique_ptr<char[]> buffer(new char[num_bytes_]);
size_t compressed =
CompressData(os.str().c_str(), num_bytes_, ct, buffer.get());
uint32_t checksum = Crc32(buffer.get(), compressed);
Header hdr(records_.size(), checksum, ct, static_cast<uint32_t>(compressed));
hdr.Write(fo);
fo.Write(buffer.get(), compressed);
// clear the content
records_.clear();
num_bytes_ = 0;
if (compressed_stream) {
compressed_stream.reset();
}

auto end_pos = sout.tellg();
sout.seekg(0, std::ios::beg);
uint32_t len = static_cast<uint32_t>(end_pos - sout.tellg());
uint32_t crc = Crc32Stream(sout);
sout.seekg(0, std::ios::beg);

Header hdr(static_cast<uint32_t>(records_.size()), crc, ct, len);
hdr.Write(os);
PipeStream(sout, os);
return true;
}

void Chunk::Parse(Stream* fi, size_t offset) {
fi->Seek(offset);
void Chunk::Parse(std::istream& sin) {
Header hdr;
hdr.Parse(fi);

size_t size = static_cast<size_t>(hdr.CompressSize());
std::unique_ptr<char[]> buffer(new char[size]);
fi->Read(buffer.get(), size);
size_t deflated_size = 0;
snappy::GetUncompressedLength(buffer.get(), size, &deflated_size);
std::unique_ptr<char[]> deflated_buffer(new char[deflated_size]);
DeflateData(buffer.get(), size, hdr.CompressType(), deflated_buffer.get());
std::istringstream deflated(
std::string(deflated_buffer.get(), deflated_size));
for (size_t i = 0; i < hdr.NumRecords(); ++i) {
size_t rs;
deflated.read(&rs, sizeof(size_t));
std::string record(rs, '\0');
deflated.read(&record[0], rs);
records_.emplace_back(record);
num_bytes_ += record.size();
}
}
hdr.Parse(sin);
auto beg_pos = sin.tellg();
auto crc = Crc32Stream(sin, hdr.CompressSize());
PADDLE_ENFORCE_EQ(hdr.Checksum(), crc);

size_t CompressData(const char* in,
size_t in_length,
Compressor ct,
char* out) {
size_t compressd_size = 0;
switch (ct) {
Clear();

sin.seekg(beg_pos, std::ios::beg);
std::unique_ptr<std::istream> compressed_stream;
switch (hdr.CompressType()) {
case Compressor::kNoCompress:
// do nothing
memcpy(out, in, in_length);
compressd_size = in_length;
break;
case Compressor::kSnappy:
snappy::RawCompress(in, in_length, out, &compressd_size);
compressed_stream.reset(new snappy::iSnappyStream(sin));
break;
default:
PADDLE_THROW("Not implemented");
}
return compressd_size;
}

void DeflateData(const char* in, size_t in_length, Compressor ct, char* out) {
switch (c) {
case Compressor::kNoCompress:
memcpy(out, in, in_length);
break;
case Compressor::kSnappy:
snappy::RawUncompress(in, in_length, out);
break;
std::istream& stream = compressed_stream ? *compressed_stream : sin;

for (uint32_t i = 0; i < hdr.NumRecords(); ++i) {
uint32_t rec_len;
stream.read(reinterpret_cast<char*>(&rec_len), sizeof(uint32_t));
std::string buf;
buf.resize(rec_len);
stream.read(&buf[0], rec_len);
Add(buf);
}
}

Expand Down
21 changes: 14 additions & 7 deletions paddle/fluid/recordio/chunk.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@
// limitations under the License.

#pragma once
#include <forward_list>
#include <string>
#include <vector>

#include "paddle/fluid/platform/macros.h"
#include "paddle/fluid/recordio/header.h"
#include "paddle/fluid/recordio/io.h"

namespace paddle {
namespace recordio {
Expand All @@ -26,16 +26,23 @@ namespace recordio {
class Chunk {
public:
Chunk() : num_bytes_(0) {}
void Add(const char* record, size_t size);
void Add(std::string buf) {
records_.push_back(buf);
num_bytes_ += buf.size();
}
// dump the chunk into w, and clears the chunk and makes it ready for
// the next add invocation.
bool Dump(Stream* fo, Compressor ct);
void Parse(Stream* fi, size_t offset);
bool Write(std::ostream& fo, Compressor ct) const;
void Clear() {
records_.clear();
num_bytes_ = 0;
}
void Parse(std::istream& sin);
size_t NumBytes() { return num_bytes_; }
const std::string Record(int i) { return records_[i]; }
const std::string& Record(int i) const { return records_[i]; }

private:
std::forward_list<const std::string> records_;
std::vector<std::string> records_;
// sum of record lengths in bytes.
size_t num_bytes_;
DISABLE_COPY_AND_ASSIGN(Chunk);
Expand Down
46 changes: 20 additions & 26 deletions paddle/fluid/recordio/chunk_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,34 +22,28 @@ using namespace paddle::recordio;

TEST(Chunk, SaveLoad) {
Chunk ch;
ch.Add("12345", 6);
ch.Add("123", 4);
{
Stream* fs = Stream::Open("/tmp/record_11", "w");
ch.Dump(fs, Compressor::kNoCompress);
EXPECT_EQ(ch.NumBytes(), 0);
}
{
Stream* fs = Stream::Open("/tmp/record_11", "r");
ch.Parse(fs, 0);
EXPECT_EQ(ch.NumBytes(), 10);
}
ch.Add(std::string("12345", 6));
ch.Add(std::string("123", 4));
std::stringstream ss;
ch.Write(ss, Compressor::kNoCompress);
ch.Clear();
ch.Parse(ss);
ASSERT_EQ(ch.NumBytes(), 10U);
}

TEST(Chunk, Compressor) {
Chunk ch;
ch.Add("12345", 6);
ch.Add("123", 4);
ch.Add("123", 4);
ch.Add("123", 4);
{
Stream* fs = Stream::Open("/tmp/record_12", "w");
ch.Dump(fs, Compressor::kSnappy);
EXPECT_EQ(ch.NumBytes(), 0);
}
{
Stream* fs = Stream::Open("/tmp/record_12", "r");
ch.Parse(fs, 0);
EXPECT_EQ(ch.NumBytes(), 10);
}
ch.Add(std::string("12345", 6));
ch.Add(std::string("123", 4));
ch.Add(std::string("123", 4));
ch.Add(std::string("123", 4));
std::stringstream ss;
ch.Write(ss, Compressor::kSnappy);
std::stringstream ss2;
ch.Write(ss2, Compressor::kNoCompress);
ASSERT_LE(ss.tellp(), ss2.tellp()); // Compress should contain less data;

ch.Clear();
ch.Parse(ss);
ASSERT_EQ(ch.NumBytes(), 18);
}
Loading

0 comments on commit 6272b57

Please sign in to comment.