diff --git a/test/test_rosbag_storage/CMakeLists.txt b/test/test_rosbag_storage/CMakeLists.txt index 0ced5bf4ea..fa2438d452 100644 --- a/test/test_rosbag_storage/CMakeLists.txt +++ b/test/test_rosbag_storage/CMakeLists.txt @@ -15,4 +15,8 @@ if(CATKIN_ENABLE_TESTING) if(TARGET create_and_iterate_bag) target_link_libraries(create_and_iterate_bag ${catkin_LIBRARIES}) endif() + catkin_add_gtest(swap_bags src/swap_bags.cpp) + if(TARGET swap_bags) + target_link_libraries(swap_bags ${catkin_LIBRARIES}) + endif() endif() diff --git a/test/test_rosbag_storage/src/swap_bags.cpp b/test/test_rosbag_storage/src/swap_bags.cpp new file mode 100644 index 0000000000..3940f18711 --- /dev/null +++ b/test/test_rosbag_storage/src/swap_bags.cpp @@ -0,0 +1,101 @@ +#include "ros/time.h" +#include "rosbag/bag.h" +#include "rosbag/view.h" +#include "std_msgs/Int32.h" + +#include "boost/foreach.hpp" +#include + +void writeBags(rosbag::CompressionType a, rosbag::CompressionType b) { + using std::swap; + rosbag::Bag bag1("/tmp/swap1.bag", rosbag::bagmode::Write); + rosbag::Bag bag2("/tmp/swap2.bag", rosbag::bagmode::Write); + + // In the end "/tmp/swap1.bag" should have CompressionType a and contain two messages of value a. + // "/tmp/swap2.bag" should have CompressionType b and contain two messages of value b. + // We use these pointers to track the bags accordingly. + + rosbag::Bag* a_bag = &bag1; + rosbag::Bag* b_bag = &bag2; + + std_msgs::Int32 a_msg, b_msg; + a_msg.data = a; + b_msg.data = b; + + swap(bag1, bag2); + swap(a_bag, b_bag); + + a_bag->setCompression(a); + b_bag->setCompression(b); + + swap(bag1, bag2); + swap(a_bag, b_bag); + + a_bag->write("/data", ros::Time::now(), a_msg); + b_bag->write("/data", ros::Time::now(), b_msg); + + swap(bag1, bag2); + swap(a_bag, b_bag); + + a_bag->write("/data", ros::Time::now(), a_msg); + b_bag->write("/data", ros::Time::now(), b_msg); + + swap(bag1, bag2); + + bag1.close(); + bag2.close(); + + swap(bag1, bag2); +} + +void readBags(rosbag::CompressionType a, rosbag::CompressionType b) { + using std::swap; + rosbag::Bag bag1("/tmp/swap1.bag", rosbag::bagmode::Read); + rosbag::Bag bag2("/tmp/swap2.bag", rosbag::bagmode::Read); + + rosbag::Bag* a_bag = &bag1; + rosbag::Bag* b_bag = &bag2; + + swap(bag1, bag2); + swap(a_bag, b_bag); + + // only valid when writing + //EXPECT_EQ(a_bag->getCompression(), a); + //EXPECT_EQ(b_bag->getCompression(), b); + + std::vector topics; + topics.push_back("data"); + + rosbag::View a_view(*a_bag, rosbag::TopicQuery(topics)); + rosbag::View b_view(*b_bag, rosbag::TopicQuery(topics)); + + BOOST_FOREACH(rosbag::MessageInstance const m, a_view) + { + std_msgs::Int32::ConstPtr i = m.instantiate(); + ASSERT_TRUE(i); + EXPECT_EQ(i->data, a); + } + BOOST_FOREACH(rosbag::MessageInstance const m, b_view) + { + std_msgs::Int32::ConstPtr i = m.instantiate(); + ASSERT_TRUE(i); + EXPECT_EQ(i->data, b); + } +} + +TEST(rosbag_storage, swap_bags) +{ + for (int i = 0; i < 3; ++i) { + for (int j = 0; j < 3; ++j) { + writeBags(rosbag::CompressionType(i), rosbag::CompressionType(j)); + readBags(rosbag::CompressionType(i), rosbag::CompressionType(j)); + } + } +} + +int main(int argc, char **argv) { + ros::Time::init(); + + testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/tools/rosbag_storage/include/rosbag/bag.h b/tools/rosbag_storage/include/rosbag/bag.h index e8a346ef8b..14ad8a2c5d 100644 --- a/tools/rosbag_storage/include/rosbag/bag.h +++ b/tools/rosbag_storage/include/rosbag/bag.h @@ -171,7 +171,12 @@ class ROSBAG_DECL Bag void write(std::string const& topic, ros::Time const& time, boost::shared_ptr const& msg, boost::shared_ptr connection_header = boost::shared_ptr()); + void swap(Bag&); + private: + Bag(const Bag&); + Bag& operator=(const Bag&); + // This helper function actually does the write with an arbitrary serializable message template void doWrite(std::string const& topic, ros::Time const& time, T const& msg, boost::shared_ptr const& connection_header); @@ -621,6 +626,10 @@ void Bag::writeMessageDataRecord(uint32_t conn_id, ros::Time const& time, T cons curr_chunk_info_.start_time = time; } +inline void swap(Bag& a, Bag& b) { + a.swap(b); +} + } // namespace rosbag #endif diff --git a/tools/rosbag_storage/include/rosbag/buffer.h b/tools/rosbag_storage/include/rosbag/buffer.h index 5f9cf1a784..aef6c99e4b 100644 --- a/tools/rosbag_storage/include/rosbag/buffer.h +++ b/tools/rosbag_storage/include/rosbag/buffer.h @@ -51,8 +51,11 @@ class ROSBAG_DECL Buffer uint32_t getSize() const; void setSize(uint32_t size); + void swap(Buffer& other); private: + Buffer(const Buffer&); + Buffer& operator=(const Buffer&); void ensureCapacity(uint32_t capacity); private: @@ -61,6 +64,10 @@ class ROSBAG_DECL Buffer uint32_t size_; }; +inline void swap(Buffer& a, Buffer& b) { + a.swap(b); +} + } // namespace rosbag #endif diff --git a/tools/rosbag_storage/include/rosbag/chunked_file.h b/tools/rosbag_storage/include/rosbag/chunked_file.h index c1c8cda4b4..722aeca3ea 100644 --- a/tools/rosbag_storage/include/rosbag/chunked_file.h +++ b/tools/rosbag_storage/include/rosbag/chunked_file.h @@ -79,8 +79,12 @@ class ROSBAG_DECL ChunkedFile bool truncate(uint64_t length); void seek(uint64_t offset, int origin = std::ios_base::beg); //!< seek to given offset from origin void decompress(CompressionType compression, uint8_t* dest, unsigned int dest_len, uint8_t* source, unsigned int source_len); + void swap(ChunkedFile& other); private: + ChunkedFile(const ChunkedFile&); + ChunkedFile& operator=(const ChunkedFile&); + void open(std::string const& filename, std::string const& mode); void clearUnused(); @@ -98,6 +102,10 @@ class ROSBAG_DECL ChunkedFile boost::shared_ptr write_stream_; }; +inline void swap(ChunkedFile& a, ChunkedFile& b) { + a.swap(b); +} + } // namespace rosbag #endif diff --git a/tools/rosbag_storage/include/rosbag/stream.h b/tools/rosbag_storage/include/rosbag/stream.h index 1f563819eb..2ef7fac3ea 100644 --- a/tools/rosbag_storage/include/rosbag/stream.h +++ b/tools/rosbag_storage/include/rosbag/stream.h @@ -63,8 +63,11 @@ typedef compression::CompressionType CompressionType; class ChunkedFile; +class FileAccessor; + class ROSBAG_DECL Stream { + friend class FileAccessor; public: Stream(ChunkedFile* file); virtual ~Stream(); @@ -110,6 +113,13 @@ class ROSBAG_DECL StreamFactory boost::shared_ptr lz4_stream_; }; +class FileAccessor { + friend class ChunkedFile; + static void setFile(Stream& a, ChunkedFile* file) { + a.file_ = file; + } +}; + class ROSBAG_DECL UncompressedStream : public Stream { public: @@ -173,6 +183,8 @@ class ROSBAG_DECL LZ4Stream : public Stream void decompress(uint8_t* dest, unsigned int dest_len, uint8_t* source, unsigned int source_len); private: + LZ4Stream(const LZ4Stream&); + LZ4Stream operator=(const LZ4Stream&); void writeStream(int action); char *buff_; diff --git a/tools/rosbag_storage/src/bag.cpp b/tools/rosbag_storage/src/bag.cpp index 81df861c0d..281f326ce7 100644 --- a/tools/rosbag_storage/src/bag.cpp +++ b/tools/rosbag_storage/src/bag.cpp @@ -1117,4 +1117,35 @@ void Bag::write(char const* s, std::streamsize n) { file_.write((char*) s, n); void Bag::read(char* b, std::streamsize n) const { file_.read(b, n); } void Bag::seek(uint64_t pos, int origin) const { file_.seek(pos, origin); } +void Bag::swap(Bag& other) { + using std::swap; + swap(mode_, other.mode_); + swap(file_, other.file_); + swap(version_, other.version_); + swap(compression_, other.compression_); + swap(chunk_threshold_, other.chunk_threshold_); + swap(bag_revision_, other.bag_revision_); + swap(file_size_, other.file_size_); + swap(file_header_pos_, other.file_header_pos_); + swap(index_data_pos_, other.index_data_pos_); + swap(connection_count_, other.connection_count_); + swap(chunk_count_, other.chunk_count_); + swap(chunk_open_, other.chunk_open_); + swap(curr_chunk_info_, other.curr_chunk_info_); + swap(curr_chunk_data_pos_, other.curr_chunk_data_pos_); + swap(topic_connection_ids_, other.topic_connection_ids_); + swap(header_connection_ids_, other.header_connection_ids_); + swap(connections_, other.connections_); + swap(chunks_, other.chunks_); + swap(connection_indexes_, other.connection_indexes_); + swap(curr_chunk_connection_indexes_, other.curr_chunk_connection_indexes_); + swap(header_buffer_, other.header_buffer_); + swap(record_buffer_, other.record_buffer_); + swap(chunk_buffer_, other.chunk_buffer_); + swap(decompress_buffer_, other.decompress_buffer_); + swap(outgoing_chunk_buffer_, other.outgoing_chunk_buffer_); + swap(current_buffer_, other.current_buffer_); + swap(decompressed_chunk_, other.decompressed_chunk_); +} + } // namespace rosbag diff --git a/tools/rosbag_storage/src/buffer.cpp b/tools/rosbag_storage/src/buffer.cpp index b3d217a8fc..f51e624cad 100644 --- a/tools/rosbag_storage/src/buffer.cpp +++ b/tools/rosbag_storage/src/buffer.cpp @@ -34,6 +34,7 @@ #include #include +#include #include "rosbag/buffer.h" @@ -71,4 +72,11 @@ void Buffer::ensureCapacity(uint32_t capacity) { assert(buffer_); } +void Buffer::swap(Buffer& other) { + using std::swap; + swap(buffer_, other.buffer_); + swap(capacity_, other.capacity_); + swap(size_, other.size_); +} + } // namespace rosbag diff --git a/tools/rosbag_storage/src/chunked_file.cpp b/tools/rosbag_storage/src/chunked_file.cpp index 533c37b78a..9eb665826f 100644 --- a/tools/rosbag_storage/src/chunked_file.cpp +++ b/tools/rosbag_storage/src/chunked_file.cpp @@ -223,4 +223,33 @@ void ChunkedFile::clearUnused() { nUnused_ = 0; } +void ChunkedFile::swap(ChunkedFile& other) { + using std::swap; + using boost::swap; + swap(filename_, other.filename_); + swap(file_, other.file_); + swap(offset_, other.offset_); + swap(compressed_in_, other.compressed_in_); + swap(unused_, other.unused_); + swap(nUnused_, other.nUnused_); + + swap(stream_factory_, other.stream_factory_); + + FileAccessor::setFile(*stream_factory_->getStream(compression::Uncompressed), this); + FileAccessor::setFile(*stream_factory_->getStream(compression::BZ2), this); + FileAccessor::setFile(*stream_factory_->getStream(compression::LZ4), this); + + FileAccessor::setFile(*other.stream_factory_->getStream(compression::Uncompressed), &other); + FileAccessor::setFile(*other.stream_factory_->getStream(compression::BZ2), &other); + FileAccessor::setFile(*other.stream_factory_->getStream(compression::LZ4), &other); + + swap(read_stream_, other.read_stream_); + FileAccessor::setFile(*read_stream_, this); + FileAccessor::setFile(*other.read_stream_, &other); + + swap(write_stream_, other.write_stream_); + FileAccessor::setFile(*write_stream_, this); + FileAccessor::setFile(*other.write_stream_, &other); +} + } // namespace rosbag