Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve shard concurrency #3251

Closed
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion Builds/CMake/RippleConfig.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ find_dependency (Boost 1.70
filesystem
program_options
regex
serialization
system
thread)
#[=========================================================[
Expand Down
1 change: 1 addition & 0 deletions Builds/CMake/RippledCore.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -513,6 +513,7 @@ target_sources (rippled PRIVATE
src/ripple/nodestore/impl/ManagerImp.cpp
src/ripple/nodestore/impl/NodeObject.cpp
src/ripple/nodestore/impl/Shard.cpp
src/ripple/nodestore/impl/TaskQueue.cpp
#[===============================[
main sources:
subdir: overlay
Expand Down
4 changes: 1 addition & 3 deletions Builds/CMake/RippledInterface.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,7 @@ target_compile_definitions (opts
>
$<$<BOOL:${beast_no_unit_test_inline}>:BEAST_NO_UNIT_TEST_INLINE=1>
$<$<BOOL:${beast_disable_autolink}>:BEAST_DONT_AUTOLINK_TO_WIN32_LIBRARIES=1>
$<$<BOOL:${single_io_service_thread}>:RIPPLE_SINGLE_IO_SERVICE_THREAD=1>
# doesn't currently compile ? :
$<$<BOOL:${verify_nodeobject_keys}>:RIPPLE_VERIFY_NODEOBJECT_KEYS=1>)
$<$<BOOL:${single_io_service_thread}>:RIPPLE_SINGLE_IO_SERVICE_THREAD=1>)
target_compile_options (opts
INTERFACE
$<$<AND:$<BOOL:${is_gcc}>,$<COMPILE_LANGUAGE:CXX>>:-Wsuggest-override>
Expand Down
6 changes: 0 additions & 6 deletions Builds/CMake/RippledSettings.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -100,12 +100,6 @@ option (have_package_container
option (beast_no_unit_test_inline
"Prevents unit test definitions from being inserted into global table"
OFF)
# NOTE - THIS OPTION CURRENTLY DOES NOT COMPILE :
# TODO: fix or remove
option (verify_nodeobject_keys
"This verifies that the hash of node objects matches the payload. \
This check is expensive - use with caution."
OFF)
option (single_io_service_thread
"Restricts the number of threads calling io_service::run to one. \
This can be useful when debugging."
Expand Down
2 changes: 0 additions & 2 deletions Builds/CMake/deps/Boost.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ find_package (Boost 1.70 REQUIRED
filesystem
program_options
regex
serialization
system
thread)

Expand All @@ -69,7 +68,6 @@ target_link_libraries (ripple_boost
Boost::filesystem
Boost::program_options
Boost::regex
Boost::serialization
Boost::system
Boost::thread)
if (Boost_COMPILER)
Expand Down
1 change: 0 additions & 1 deletion Builds/containers/shared/install_boost.sh
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ else
BLDARGS+=(--with-filesystem)
BLDARGS+=(--with-program_options)
BLDARGS+=(--with-regex)
BLDARGS+=(--with-serialization)
BLDARGS+=(--with-system)
BLDARGS+=(--with-atomic)
BLDARGS+=(--with-thread)
Expand Down
2 changes: 1 addition & 1 deletion src/ripple/app/ledger/impl/InboundLedger.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ InboundLedger::init(ScopedLockType& collectionLock)
if (mFailed)
return;
}
else if (shardStore && mSeq >= shardStore->earliestSeq())
else if (shardStore && mSeq >= shardStore->earliestLedgerSeq())
{
if (auto l = shardStore->fetchLedger(mHash, mSeq))
{
Expand Down
4 changes: 2 additions & 2 deletions src/ripple/app/ledger/impl/InboundLedgers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ class InboundLedgersImp
if (reason == InboundLedger::Reason::HISTORY)
{
if (inbound->getLedger()->stateMap().family().isShardBacked())
app_.getNodeStore().copyLedger(inbound->getLedger());
app_.getNodeStore().storeLedger(inbound->getLedger());
}
else if (reason == InboundLedger::Reason::SHARD)
{
Expand All @@ -120,7 +120,7 @@ class InboundLedgersImp
if (inbound->getLedger()->stateMap().family().isShardBacked())
shardStore->setStored(inbound->getLedger());
else
shardStore->copyLedger(inbound->getLedger());
shardStore->storeLedger(inbound->getLedger());
}
return inbound->getLedger();
}
Expand Down
8 changes: 4 additions & 4 deletions src/ripple/app/ledger/impl/LedgerMaster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1742,7 +1742,7 @@ LedgerMaster::fetchForHistory(
*hash, missing, reason);
if (!ledger &&
missing != fetch_seq_ &&
missing > app_.getNodeStore().earliestSeq())
missing > app_.getNodeStore().earliestLedgerSeq())
{
JLOG(m_journal.trace())
<< "fetchForHistory want fetch pack " << missing;
Expand Down Expand Up @@ -1771,7 +1771,7 @@ LedgerMaster::fetchForHistory(
mShardLedger = ledger;
}
if (!ledger->stateMap().family().isShardBacked())
app_.getShardStore()->copyLedger(ledger);
app_.getShardStore()->storeLedger(ledger);
}
else
{
Expand Down Expand Up @@ -1807,7 +1807,7 @@ LedgerMaster::fetchForHistory(
else
// Do not fetch ledger sequences lower
// than the earliest ledger sequence
fetchSz = app_.getNodeStore().earliestSeq();
fetchSz = app_.getNodeStore().earliestLedgerSeq();
fetchSz = missing >= fetchSz ?
std::min(ledger_fetch_size_, (missing - fetchSz) + 1) : 0;
try
Expand Down Expand Up @@ -1867,7 +1867,7 @@ void LedgerMaster::doAdvance (std::unique_lock<std::recursive_mutex>& sl)
std::lock_guard sll(mCompleteLock);
missing = prevMissing(mCompleteLedgers,
mPubLedger->info().seq,
app_.getNodeStore().earliestSeq());
app_.getNodeStore().earliestLedgerSeq());
}
if (missing)
{
Expand Down
63 changes: 32 additions & 31 deletions src/ripple/app/main/Application.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -345,9 +345,9 @@ class ApplicationImp
// These are Stoppable-related
std::unique_ptr <JobQueue> m_jobQueue;
std::unique_ptr <NodeStore::Database> m_nodeStore;
std::unique_ptr <NodeStore::DatabaseShard> shardStore_;
detail::AppFamily family_;
std::unique_ptr <detail::AppFamily> sFamily_;
std::unique_ptr <NodeStore::DatabaseShard> shardStore_;
std::unique_ptr <detail::AppFamily> shardFamily_;
// VFALCO TODO Make OrderBookDB abstract
OrderBookDB m_orderBookDB;
std::unique_ptr <PathRequests> m_pathRequests;
Expand Down Expand Up @@ -463,18 +463,18 @@ class ApplicationImp
m_collectorManager->group ("jobq"), m_nodeStoreScheduler,
logs_->journal("JobQueue"), *logs_, *perfLog_))

, m_nodeStore(m_shaMapStore->makeNodeStore("NodeStore.main", 4))
, m_nodeStore (m_shaMapStore->makeNodeStore ("NodeStore.main", 4))

, family_ (*this, *m_nodeStore, *m_collectorManager)

// The shard store is optional and make_ShardStore can return null.
, shardStore_(make_ShardStore(
, shardStore_ (make_ShardStore (
*this,
*m_jobQueue,
m_nodeStoreScheduler,
4,
logs_->journal("ShardStore")))

, family_ (*this, *m_nodeStore, *m_collectorManager)

, m_orderBookDB (*this, *m_jobQueue)

, m_pathRequests (std::make_unique<PathRequests> (
Expand Down Expand Up @@ -558,14 +558,6 @@ class ApplicationImp
logs_->journal("Application"), std::chrono::milliseconds (100), get_io_service())
, grpcServer_(std::make_unique<GRPCServer>(*this))
{
if (shardStore_)
{
sFamily_ = std::make_unique<detail::AppFamily>(
*this,
*shardStore_,
*m_collectorManager);
}

add (m_resourceManager.get ());

//
Expand Down Expand Up @@ -626,7 +618,7 @@ class ApplicationImp

Family* shardFamily() override
{
return sFamily_.get();
return shardFamily_.get();
}

TimeKeeper&
Expand Down Expand Up @@ -943,7 +935,7 @@ class ApplicationImp
}

bool
initNodeStoreDBs()
initNodeStore()
{
if (config_->doImport)
{
Expand All @@ -961,12 +953,12 @@ class ApplicationImp

JLOG(j.warn()) <<
"Starting node import from '" << source->getName() <<
"' to '" << getNodeStore().getName() << "'.";
"' to '" << m_nodeStore->getName() << "'.";

using namespace std::chrono;
auto const start = steady_clock::now();

getNodeStore().import(*source);
m_nodeStore->import(*source);

auto const elapsed = duration_cast <seconds>
(steady_clock::now() - start);
Expand All @@ -990,14 +982,6 @@ class ApplicationImp
family().treecache().setTargetAge(
seconds{config_->getValueFor(SizedItem::treeCacheAge)});

if (sFamily_)
{
sFamily_->treecache().setTargetSize(
config_->getValueFor(SizedItem::treeCacheSize));
sFamily_->treecache().setTargetAge(
seconds{config_->getValueFor(SizedItem::treeCacheAge)});
}

return true;
}

Expand Down Expand Up @@ -1252,8 +1236,8 @@ class ApplicationImp
// have listeners register for "onSweep ()" notification.

family().fullbelow().sweep();
if (sFamily_)
sFamily_->fullbelow().sweep();
if (shardFamily_)
shardFamily_->fullbelow().sweep();
getMasterTransaction().sweep();
getNodeStore().sweep();
if (shardStore_)
Expand All @@ -1264,8 +1248,8 @@ class ApplicationImp
getInboundLedgers().sweep();
m_acceptedLedgerCache.sweep();
family().treecache().sweep();
if (sFamily_)
sFamily_->treecache().sweep();
if (shardFamily_)
shardFamily_->treecache().sweep();
cachedSLEs_.expire();

// Set timer to do another sweep later.
Expand Down Expand Up @@ -1350,9 +1334,26 @@ bool ApplicationImp::setup()
if (!config_->standalone())
timeKeeper_->run(config_->SNTP_SERVERS);

if (!initSQLiteDBs() || !initNodeStoreDBs())
if (!initSQLiteDBs() || !initNodeStore())
return false;

if (shardStore_)
{
shardFamily_ = std::make_unique<detail::AppFamily>(
*this,
*shardStore_,
*m_collectorManager);

using namespace std::chrono;
shardFamily_->treecache().setTargetSize(
config_->getValueFor(SizedItem::treeCacheSize));
shardFamily_->treecache().setTargetAge(
seconds{config_->getValueFor(SizedItem::treeCacheAge)});

if (!shardStore_->init())
return false;
}

if (!peerReservations_->load(getWalletDB()))
{
JLOG(m_journal.fatal()) << "Cannot find peer reservations!";
Expand Down
41 changes: 31 additions & 10 deletions src/ripple/app/main/DBInit.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,16 @@ namespace ripple {
////////////////////////////////////////////////////////////////////////////////

// Ledger database holds ledgers and ledger confirmations
static constexpr auto LgrDBName {"ledger.db"};
inline constexpr auto LgrDBName {"ledger.db"};

static constexpr
inline constexpr
std::array<char const*, 3> LgrDBPragma {{
"PRAGMA synchronous=NORMAL;",
"PRAGMA journal_mode=WAL;",
"PRAGMA journal_size_limit=1582080;"
}};

static constexpr
inline constexpr
std::array<char const*, 5> LgrDBInit {{
"BEGIN TRANSACTION;",

Expand All @@ -63,9 +63,9 @@ std::array<char const*, 5> LgrDBInit {{
////////////////////////////////////////////////////////////////////////////////

// Transaction database holds transactions and public keys
static constexpr auto TxDBName {"transaction.db"};
inline constexpr auto TxDBName {"transaction.db"};

static constexpr
inline constexpr
#if (ULONG_MAX > UINT_MAX) && !defined (NO_SQLITE_MMAP)
std::array<char const*, 6> TxDBPragma {{
#else
Expand All @@ -81,7 +81,7 @@ static constexpr
#endif
}};

static constexpr
inline constexpr
std::array<char const*, 8> TxDBInit {{
"BEGIN TRANSACTION;",

Expand Down Expand Up @@ -116,18 +116,39 @@ std::array<char const*, 8> TxDBInit {{

////////////////////////////////////////////////////////////////////////////////

// Temporary database used with an incomplete shard that is being acquired
inline constexpr auto AcquireShardDBName {"acquire.db"};

inline constexpr
std::array<char const*, 3> AcquireShardDBPragma {{
"PRAGMA synchronous=NORMAL;",
"PRAGMA journal_mode=WAL;",
"PRAGMA journal_size_limit=1582080;"
}};

inline constexpr
std::array<char const*, 1> AcquireShardDBInit {{
"CREATE TABLE IF NOT EXISTS Shard ( \
ShardIndex INTEGER PRIMARY KEY, \
LastLedgerHash CHARACTER(64), \
StoredLedgerSeqs BLOB \
);"
}};

////////////////////////////////////////////////////////////////////////////////

// Pragma for Ledger and Transaction databases with complete shards
static constexpr
std::array<char const*, 2> CompleteShardDBPragma {{
inline constexpr
std::array<char const*, 2> CompleteShardDBPragma{{
"PRAGMA synchronous=OFF;",
"PRAGMA journal_mode=OFF;"
}};

////////////////////////////////////////////////////////////////////////////////

static constexpr auto WalletDBName {"wallet.db"};
inline constexpr auto WalletDBName {"wallet.db"};

static constexpr
inline constexpr
std::array<char const*, 6> WalletDBInit {{
"BEGIN TRANSACTION;",

Expand Down
2 changes: 1 addition & 1 deletion src/ripple/app/main/Main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ void printHelp (const po::options_description& desc)
" connect <ip> [<port>]\n"
" consensus_info\n"
" deposit_authorized <source_account> <destination_account> [<ledger>]\n"
" download_shard [[<index> <url>]] <validate>\n"
" download_shard [[<index> <url>]]\n"
" feature [<feature> [accept|reject]]\n"
" fetch_info [clear]\n"
" gateway_balances [<ledger>] <issuer_account> [ <hotwallet> [ <hotwallet> ]]\n"
Expand Down
5 changes: 3 additions & 2 deletions src/ripple/app/misc/SHAMapStoreImp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -449,15 +449,16 @@ SHAMapStoreImp::run()
std::string nextArchiveDir =
dbRotating_->getWritableBackend()->getName();
lastRotated = validatedSeq;
std::unique_ptr<NodeStore::Backend> oldBackend;
std::shared_ptr<NodeStore::Backend> oldBackend;
{
std::lock_guard lock (dbRotating_->peekMutex());

state_db_.setState (SavedState {newBackend->getName(),
nextArchiveDir, lastRotated});
clearCaches (validatedSeq);
oldBackend = dbRotating_->rotateBackends(
std::move(newBackend));
std::move(newBackend),
lock);
}
JLOG(journal_.warn()) << "finished rotation " << validatedSeq;

Expand Down
Loading