Skip to content

Commit

Permalink
[ML] Fix issues upgrading state leading to possible abort of the auto…
Browse files Browse the repository at this point in the history
…detect process (elastic#136)
  • Loading branch information
tveasey committed Jul 3, 2018
1 parent 4d0b8f0 commit 0fa69d6
Show file tree
Hide file tree
Showing 6 changed files with 75 additions and 22 deletions.
2 changes: 1 addition & 1 deletion docs/CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@
* Add control message to start background persistence {ml-pull}19[#19]
* Fail start up if state is missing {ml-pull}4[#4]
* Do not log incorrect model memory limit {ml-pull}3[#3]
* The trend decomposition state wasn't being correctly upgraded potentially causing the autodetect process to abort {ml-pull}136[#136] (issue: {ml-issue}135[#135])

=== Regressions

=== Known Issues

5 changes: 4 additions & 1 deletion include/core/CStateMachine.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

#include <cstddef>
#include <list>
#include <map>
#include <vector>

namespace ml {
Expand Down Expand Up @@ -67,6 +68,7 @@ class CORE_EXPORT CStateMachine {
using TSizeVec = std::vector<std::size_t>;
using TSizeVecVec = std::vector<TSizeVec>;
using TStrVec = std::vector<std::string>;
using TSizeSizeMap = std::map<std::size_t, std::size_t>;

public:
//! Set the number of machines we expect the program to use.
Expand All @@ -85,7 +87,8 @@ class CORE_EXPORT CStateMachine {
//! \name Persistence
//@{
//! Initialize by reading state from \p traverser.
bool acceptRestoreTraverser(CStateRestoreTraverser& traverser);
bool acceptRestoreTraverser(CStateRestoreTraverser& traverser,
const TSizeSizeMap& mapping = TSizeSizeMap());

//! Persist state by passing information to the supplied inserter.
void acceptPersistInserter(CStatePersistInserter& inserter) const;
Expand Down
22 changes: 16 additions & 6 deletions lib/core/CStateMachine.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ namespace core {
namespace {

// CStateMachine
const std::string MACHINE_TAG("a");
//const std::string MACHINE_TAG("a"); No longer used
const std::string STATE_TAG("b");

// CStateMachine::SMachine
Expand Down Expand Up @@ -88,17 +88,26 @@ CStateMachine CStateMachine::create(const TStrVec& alphabet,
return result;
}

bool CStateMachine::acceptRestoreTraverser(core::CStateRestoreTraverser& traverser) {
bool CStateMachine::acceptRestoreTraverser(core::CStateRestoreTraverser& traverser,
const TSizeSizeMap& mapping) {
do {
const std::string& name = traverser.name();
RESTORE_BUILT_IN(MACHINE_TAG, m_Machine)
RESTORE_BUILT_IN(STATE_TAG, m_State)
} while (traverser.next());
if (mapping.size() > 0) {
auto mapped = mapping.find(m_State);
if (mapped != mapping.end()) {
m_State = mapped->second;
} else {
LOG_ERROR(<< "Bad mapping '" << core::CContainerPrinter::print(mapping)
<< "' state = " << m_State);
return false;
}
}
return true;
}

void CStateMachine::acceptPersistInserter(core::CStatePersistInserter& inserter) const {
inserter.insertValue(MACHINE_TAG, m_Machine);
inserter.insertValue(STATE_TAG, m_State);
}

Expand Down Expand Up @@ -201,14 +210,15 @@ void CStateMachine::CMachineDeque::capacity(std::size_t capacity) {
m_Capacity = capacity;
}

const CStateMachine::SMachine& CStateMachine::CMachineDeque::operator[](std::size_t pos) const {
const CStateMachine::SMachine& CStateMachine::CMachineDeque::operator[](std::size_t pos_) const {
std::size_t pos{pos_};
for (const auto& machines : m_Machines) {
if (pos < machines.size()) {
return machines[pos];
}
pos -= machines.size();
}
LOG_ABORT(<< "Invalid index '" << pos << "'");
LOG_ABORT(<< "Invalid index '" << pos_ << "'");
}

std::size_t CStateMachine::CMachineDeque::size() const {
Expand Down
7 changes: 4 additions & 3 deletions lib/core/unittest/CStateMachineTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -173,10 +173,11 @@ void CStateMachineTest::testPersist() {
core::CRapidXmlStateRestoreTraverser traverser(parser);

core::CStateMachine restored = core::CStateMachine::create(
machine[1].s_Alphabet, machine[1].s_States, machine[1].s_TransitionFunction,
machine[0].s_Alphabet, machine[0].s_States, machine[0].s_TransitionFunction,
0); // initial state
traverser.traverseSubLevel(
boost::bind(&core::CStateMachine::acceptRestoreTraverser, &restored, _1));
traverser.traverseSubLevel([&restored](core::CStateRestoreTraverser& traverser_) {
return restored.acceptRestoreTraverser(traverser_);
});

CPPUNIT_ASSERT_EQUAL(original.checksum(), restored.checksum());
std::string newXml;
Expand Down
44 changes: 33 additions & 11 deletions lib/maths/CTimeSeriesDecompositionDetail.cc
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@

#include <algorithm>
#include <cmath>
#include <map>
#include <numeric>
#include <string>
#include <vector>

Expand All @@ -57,6 +59,7 @@ using TBoolVec = std::vector<bool>;
using TDoubleVec = std::vector<double>;
using TSizeVec = std::vector<std::size_t>;
using TSizeVecVec = std::vector<TSizeVec>;
using TSizeSizeMap = std::map<std::size_t, std::size_t>;
using TStrVec = std::vector<std::string>;
using TTimeVec = std::vector<core_t::TTime>;
using TTimeTimePr = std::pair<core_t::TTime, core_t::TTime>;
Expand Down Expand Up @@ -307,7 +310,7 @@ const std::string LAST_UPDATE_OLD_TAG{"j"};

//////////////////////// Upgrade to Version 6.3 ////////////////////////

const double MODEL_WEIGHT_UPGRADING_TO_VERSION_6p3{48.0};
const double MODEL_WEIGHT_UPGRADING_TO_VERSION_6_3{48.0};

bool upgradeTrendModelToVersion6p3(const core_t::TTime bucketLength,
CTrendComponent& trend,
Expand All @@ -330,7 +333,7 @@ bool upgradeTrendModelToVersion6p3(const core_t::TTime bucketLength,

// Generate some samples from the old trend model.

double weight{MODEL_WEIGHT_UPGRADING_TO_VERSION_6p3 *
double weight{MODEL_WEIGHT_UPGRADING_TO_VERSION_6_3 *
static_cast<double>(bucketLength) / static_cast<double>(4 * WEEK)};

CPRNG::CXorOShiro128Plus rng;
Expand All @@ -343,6 +346,18 @@ bool upgradeTrendModelToVersion6p3(const core_t::TTime bucketLength,
return true;
}

// This implements the mapping from restored states to their best
// equivalents; specifically:
// SC_NEW_COMPONENTS |-> SC_NEW_COMPONENTS
// SC_NORMAL |-> SC_NORMAL
// SC_FORECASTING |-> SC_NORMAL
// SC_DISABLED |-> SC_DISABLED
// SC_ERROR |-> SC_ERROR
// Note that we don't try and restore the periodicity test state
// (see CTimeSeriesDecomposition::acceptRestoreTraverser) and the
// calendar test state is unchanged.
const TSizeSizeMap SC_STATES_UPGRADING_TO_VERSION_6_3{{0, 0}, {1, 1}, {2, 1}, {3, 2}, {4, 3}};

////////////////////////////////////////////////////////////////////////

// Constants
Expand Down Expand Up @@ -482,8 +497,9 @@ bool CTimeSeriesDecompositionDetail::CPeriodicityTest::acceptRestoreTraverser(
do {
const std::string& name{traverser.name()};
RESTORE(PERIODICITY_TEST_MACHINE_6_3_TAG,
traverser.traverseSubLevel(boost::bind(
&core::CStateMachine::acceptRestoreTraverser, &m_Machine, _1)))
traverser.traverseSubLevel([this](core::CStateRestoreTraverser& traverser_) {
return m_Machine.acceptRestoreTraverser(traverser_);
}))
RESTORE_SETUP_TEARDOWN(
SHORT_WINDOW_6_3_TAG, m_Windows[E_Short].reset(this->newWindow(E_Short)),
m_Windows[E_Short] && traverser.traverseSubLevel(boost::bind(
Expand Down Expand Up @@ -759,8 +775,9 @@ bool CTimeSeriesDecompositionDetail::CCalendarTest::acceptRestoreTraverser(core:
do {
const std::string& name{traverser.name()};
RESTORE(CALENDAR_TEST_MACHINE_6_3_TAG,
traverser.traverseSubLevel(boost::bind(
&core::CStateMachine::acceptRestoreTraverser, &m_Machine, _1)))
traverser.traverseSubLevel([this](core::CStateRestoreTraverser& traverser_) {
return m_Machine.acceptRestoreTraverser(traverser_);
}))
RESTORE_BUILT_IN(LAST_MONTH_6_3_TAG, m_LastMonth);
RESTORE_SETUP_TEARDOWN(
CALENDAR_TEST_6_3_TAG, m_Test.reset(new CCalendarCyclicTest(m_DecayRate)),
Expand Down Expand Up @@ -963,8 +980,9 @@ bool CTimeSeriesDecompositionDetail::CComponents::acceptRestoreTraverser(core::C
while (traverser.next()) {
const std::string& name{traverser.name()};
RESTORE(COMPONENTS_MACHINE_6_3_TAG,
traverser.traverseSubLevel(boost::bind(
&core::CStateMachine::acceptRestoreTraverser, &m_Machine, _1)));
traverser.traverseSubLevel([this](core::CStateRestoreTraverser& traverser_) {
return m_Machine.acceptRestoreTraverser(traverser_);
}))
RESTORE_BUILT_IN(DECAY_RATE_6_3_TAG, m_DecayRate);
RESTORE(TREND_6_3_TAG,
traverser.traverseSubLevel(boost::bind(
Expand Down Expand Up @@ -995,8 +1013,10 @@ bool CTimeSeriesDecompositionDetail::CComponents::acceptRestoreTraverser(core::C
do {
const std::string& name{traverser.name()};
RESTORE(COMPONENTS_MACHINE_OLD_TAG,
traverser.traverseSubLevel(boost::bind(
&core::CStateMachine::acceptRestoreTraverser, &m_Machine, _1)));
traverser.traverseSubLevel([this](core::CStateRestoreTraverser& traverser_) {
return m_Machine.acceptRestoreTraverser(
traverser_, SC_STATES_UPGRADING_TO_VERSION_6_3);
}))
RESTORE_SETUP_TEARDOWN(TREND_OLD_TAG,
/**/,
traverser.traverseSubLevel(boost::bind(
Expand All @@ -1017,7 +1037,7 @@ bool CTimeSeriesDecompositionDetail::CComponents::acceptRestoreTraverser(core::C
/**/)
} while (traverser.next());

m_MeanVarianceScale.add(1.0, MODEL_WEIGHT_UPGRADING_TO_VERSION_6p3);
m_MeanVarianceScale.add(1.0, MODEL_WEIGHT_UPGRADING_TO_VERSION_6_3);
}
return true;
}
Expand Down Expand Up @@ -1679,6 +1699,7 @@ bool CTimeSeriesDecompositionDetail::CComponents::SSeasonal::acceptRestoreTraver
RESTORE(ERRORS_OLD_TAG,
core::CPersistUtils::restore(ERRORS_OLD_TAG, s_PredictionErrors, traverser))
} while (traverser.next());
s_PredictionErrors.resize(s_Components.size());
}
return true;
}
Expand Down Expand Up @@ -1907,6 +1928,7 @@ bool CTimeSeriesDecompositionDetail::CComponents::SCalendar::acceptRestoreTraver
RESTORE(ERRORS_OLD_TAG,
core::CPersistUtils::restore(ERRORS_OLD_TAG, s_PredictionErrors, traverser))
} while (traverser.next());
s_PredictionErrors.resize(s_Components.size());
}
return true;
}
Expand Down
17 changes: 17 additions & 0 deletions lib/maths/unittest/CTimeSeriesDecompositionTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2108,6 +2108,9 @@ void CTimeSeriesDecompositionTest::testUpgrade() {
// Check we can validly upgrade existing state.

using TStrVec = std::vector<std::string>;
using TDouble3Vec = core::CSmallVector<double, 3>;
using TDouble3VecVec = std::vector<TDouble3Vec>;

auto load = [](const std::string& name, std::string& result) {
std::ifstream file;
file.open(name);
Expand Down Expand Up @@ -2186,6 +2189,13 @@ void CTimeSeriesDecompositionTest::testUpgrade() {
CPPUNIT_ASSERT_DOUBLES_EQUAL(expectedScale.second, scale.second,
0.005 * std::max(expectedScale.second, 0.4));
}

// Check some basic operations on the upgraded model.
TDouble3VecVec forecast;
decomposition.forecast(60480000, 60480000 + WEEK, HALF_HOUR, 90.0, 1.0, forecast);
for (core_t::TTime time = 60480000; time < 60480000 + WEEK; time += HALF_HOUR) {
decomposition.addPoint(time, 10.0);
}
}

LOG_DEBUG(<< "*** Trend and Seasonal Components ***");
Expand Down Expand Up @@ -2268,6 +2278,13 @@ void CTimeSeriesDecompositionTest::testUpgrade() {
LOG_DEBUG(<< "Mean scale error = " << maths::CBasicStatistics::mean(meanScaleError));
CPPUNIT_ASSERT(maths::CBasicStatistics::mean(meanValueError) < 0.06);
CPPUNIT_ASSERT(maths::CBasicStatistics::mean(meanScaleError) < 0.07);

// Check some basic operations on the upgraded model.
TDouble3VecVec forecast;
decomposition.forecast(10366200, 10366200 + WEEK, HALF_HOUR, 90.0, 1.0, forecast);
for (core_t::TTime time = 60480000; time < 60480000 + WEEK; time += HALF_HOUR) {
decomposition.addPoint(time, 10.0);
}
}
}

Expand Down

0 comments on commit 0fa69d6

Please sign in to comment.