Skip to content

Commit

Permalink
Schedule event delivery when event buffer is overfilled (#12781)
Browse files Browse the repository at this point in the history
  • Loading branch information
yunhanw-google authored Dec 16, 2021
1 parent 9a595a4 commit 41700bd
Show file tree
Hide file tree
Showing 7 changed files with 106 additions and 29 deletions.
28 changes: 14 additions & 14 deletions src/app/EventManagement.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -460,12 +460,12 @@ CHIP_ERROR EventManagement::LogEventPrivate(EventLoggingDelegate * apDelegate, E
EventNumber & aEventNumber)
{
CircularTLVWriter writer;
CHIP_ERROR err = CHIP_NO_ERROR;
uint32_t requestSize = 0;
aEventNumber = 0;
CircularEventBuffer checkpoint = *mpEventBuffer;
CircularEventBuffer * buffer = nullptr;
EventLoadOutContext ctxt = EventLoadOutContext(writer, aEventOptions.mPriority, mLastEventNumber);
CHIP_ERROR err = CHIP_NO_ERROR;
uint32_t requestSize = 0;
aEventNumber = 0;
CircularTLVWriter checkpoint = writer;
CircularEventBuffer * buffer = nullptr;
EventLoadOutContext ctxt = EventLoadOutContext(writer, aEventOptions.mPriority, mLastEventNumber);
EventOptions opts;
#if CHIP_CONFIG_EVENT_LOGGING_UTC_TIMESTAMPS & CHIP_SYSTEM_CONFIG_PLATFORM_PROVIDES_TIME
Timestamp timestamp;
Expand Down Expand Up @@ -525,7 +525,8 @@ CHIP_ERROR EventManagement::LogEventPrivate(EventLoggingDelegate * apDelegate, E
exit:
if (err != CHIP_NO_ERROR)
{
*mpEventBuffer = checkpoint;
ChipLogError(EventLogging, "Log event with error %s", ErrorStr(err));
writer = checkpoint;
}
else if (opts.mPriority >= CHIP_CONFIG_EVENT_GLOBAL_PRIORITY)
{
Expand All @@ -534,17 +535,15 @@ CHIP_ERROR EventManagement::LogEventPrivate(EventLoggingDelegate * apDelegate, E
mLastEventTimestamp = timestamp;
#if CHIP_CONFIG_EVENT_LOGGING_VERBOSE_DEBUG_LOGS
ChipLogDetail(EventLogging,
"LogEvent event number: 0x" ChipLogFormatX64 " schema priority: %u, endpoint id: 0x%" PRIx16
"LogEvent event number: 0x" ChipLogFormatX64 " priority: %u, endpoint id: 0x%" PRIx16
" cluster id: " ChipLogFormatMEI " event id: 0x%" PRIx32 " %s timestamp: 0x" ChipLogFormatX64,
ChipLogValueX64(aEventNumber), static_cast<unsigned>(opts.mPriority), opts.mPath.mEndpointId,
ChipLogValueMEI(opts.mPath.mClusterId), opts.mPath.mEventId,
opts.mTimestamp.mType == Timestamp::Type::kSystem ? "Sys" : "Epoch", ChipLogValueX64(opts.mTimestamp.mValue));
#endif // CHIP_CONFIG_EVENT_LOGGING_VERBOSE_DEBUG_LOGS

if (opts.mUrgent == EventOptions::Type::kUrgent)
{
err = InteractionModelEngine::GetInstance()->GetReportingEngine().ScheduleUrgentEventDelivery(opts.mPath);
}
err = InteractionModelEngine::GetInstance()->GetReportingEngine().ScheduleEventDelivery(opts.mPath, opts.mUrgent,
mBytesWritten);
}

return err;
Expand Down Expand Up @@ -792,13 +791,14 @@ CHIP_ERROR EventManagement::EvictEvent(CHIPCircularTLVBuffer & apBuffer, void *
return CHIP_END_OF_TLV;
}

void EventManagement::SetScheduledEventNumber(EventNumber & aEventNumber)
void EventManagement::SetScheduledEventInfo(EventNumber & aEventNumber, uint32_t & aInitialWrittenEventBytes)
{
#if !CHIP_SYSTEM_CONFIG_NO_LOCKING
ScopedLock lock(sInstance);
#endif // !CHIP_SYSTEM_CONFIG_NO_LOCKING

aEventNumber = mLastEventNumber;
aEventNumber = mLastEventNumber;
aInitialWrittenEventBytes = mBytesWritten;
}

void CircularEventBuffer::Init(uint8_t * apBuffer, uint32_t aBufferLength, CircularEventBuffer * apPrev,
Expand Down
4 changes: 2 additions & 2 deletions src/app/EventManagement.h
Original file line number Diff line number Diff line change
Expand Up @@ -355,9 +355,9 @@ class EventManagement
bool IsValid(void) { return EventManagementStates::Shutdown != mState; };

/**
* Logger would save last logged event number for each logger buffer into schedule event number array
* Logger would save last logged event number and initial written event bytes number into schedule event number array
*/
void SetScheduledEventNumber(EventNumber & aEventNumber);
void SetScheduledEventInfo(EventNumber & aEventNumber, uint32_t & aInitialWrittenEventBytes);

private:
void VendEventNumber();
Expand Down
24 changes: 13 additions & 11 deletions src/app/ReadHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,16 +51,17 @@ CHIP_ERROR ReadHandler::Init(Messaging::ExchangeManager * apExchangeMgr, Interac
mLastScheduledEventNumber = 0;
mIsPrimingReports = true;
MoveToState(HandlerState::Initialized);
mpDelegate = apDelegate;
mSubscriptionId = 0;
mHoldReport = false;
mDirty = false;
mActiveSubscription = false;
mIsChunkedReport = false;
mInteractionType = aInteractionType;
mInitiatorNodeId = apExchangeContext->GetSessionHandle().GetPeerNodeId();
mSubjectDescriptor = apExchangeContext->GetSessionHandle().GetSubjectDescriptor();
mHoldSync = false;
mpDelegate = apDelegate;
mSubscriptionId = 0;
mHoldReport = false;
mDirty = false;
mActiveSubscription = false;
mIsChunkedReport = false;
mInteractionType = aInteractionType;
mInitiatorNodeId = apExchangeContext->GetSessionHandle().GetPeerNodeId();
mSubjectDescriptor = apExchangeContext->GetSessionHandle().GetSubjectDescriptor();
mHoldSync = false;
mLastWrittenEventsBytes = 0;
if (apExchangeContext != nullptr)
{
apExchangeContext->SetDelegate(this);
Expand Down Expand Up @@ -115,6 +116,7 @@ void ReadHandler::Shutdown(ShutdownOptions aOptions)
mIsChunkedReport = false;
mInitiatorNodeId = kUndefinedNodeId;
mHoldSync = false;
mLastWrittenEventsBytes = 0;
}

CHIP_ERROR ReadHandler::OnReadInitialRequest(System::PacketBufferHandle && aPayload)
Expand Down Expand Up @@ -530,7 +532,7 @@ bool ReadHandler::CheckEventClean(EventManagement & aEventManager)
if ((lastEventNumber != 0) && (mEventMin <= lastEventNumber))
{
// We have more events. snapshot last event number
aEventManager.SetScheduledEventNumber(mLastScheduledEventNumber);
aEventManager.SetScheduledEventInfo(mLastScheduledEventNumber, mLastWrittenEventsBytes);
return false;
}
}
Expand Down
2 changes: 2 additions & 0 deletions src/app/ReadHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ class ReadHandler : public Messaging::ExchangeDelegate

const AttributeValueEncoder::AttributeEncodeState & GetAttributeEncodeState() const { return mAttributeEncoderState; }
void SetAttributeEncodeState(const AttributeValueEncoder::AttributeEncodeState & aState) { mAttributeEncoderState = aState; }
uint32_t GetLastWrittenEventsBytes() { return mLastWrittenEventsBytes; }

private:
friend class TestReadInteraction;
Expand Down Expand Up @@ -226,6 +227,7 @@ class ReadHandler : public Messaging::ExchangeDelegate
AttributePathExpandIterator mAttributePathExpandIterator = AttributePathExpandIterator(nullptr);
bool mIsFabricFiltered = false;
bool mHoldSync = false;
uint32_t mLastWrittenEventsBytes = 0;
SubjectDescriptor mSubjectDescriptor;
// The detailed encoding state for a single attribute, used by list chunking feature.
AttributeValueEncoder::AttributeEncodeState mAttributeEncoderState;
Expand Down
49 changes: 49 additions & 0 deletions src/app/reporting/Engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -560,23 +560,72 @@ void Engine::OnReportConfirm()
ChipLogDetail(DataManagement, "<RE> OnReportConfirm: NumReports = %" PRIu32, mNumReportsInFlight);
}

void Engine::GetMinEventLogPosition(uint32_t & aMinLogPosition)
{
for (auto & handler : InteractionModelEngine::GetInstance()->mReadHandlers)
{
if (handler.IsFree() || handler.IsReadType())
{
continue;
}

uint32_t initialWrittenEventsBytes = handler.GetLastWrittenEventsBytes();
if (initialWrittenEventsBytes < aMinLogPosition)
{
aMinLogPosition = initialWrittenEventsBytes;
}
}
}

CHIP_ERROR Engine::ScheduleBufferPressureEventDelivery(uint32_t aBytesWritten)
{
uint32_t minEventLogPosition = aBytesWritten;
GetMinEventLogPosition(minEventLogPosition);
if (aBytesWritten - minEventLogPosition > CHIP_CONFIG_EVENT_LOGGING_BYTE_THRESHOLD)
{
ChipLogProgress(DataManagement, "<RE> Buffer overfilled CHIP_CONFIG_EVENT_LOGGING_BYTE_THRESHOLD %d, schedule engine run",
CHIP_CONFIG_EVENT_LOGGING_BYTE_THRESHOLD);
return ScheduleRun();
}
return CHIP_NO_ERROR;
}

CHIP_ERROR Engine::ScheduleUrgentEventDelivery(ConcreteEventPath & aPath)
{
for (auto & handler : InteractionModelEngine::GetInstance()->mReadHandlers)
{
if (handler.IsFree() || handler.IsReadType())
{
continue;
}

for (auto clusterInfo = handler.GetEventClusterInfolist(); clusterInfo != nullptr; clusterInfo = clusterInfo->mpNext)
{
if (clusterInfo->IsEventPathSupersetOf(aPath))
{
ChipLogProgress(DataManagement, "<RE> Unblock Urgent Event Delivery for readHandler[%d]",
InteractionModelEngine::GetInstance()->GetReadHandlerArrayIndex(&handler));
handler.UnblockUrgentEventDelivery();
break;
}
}
}
return ScheduleRun();
}

CHIP_ERROR Engine::ScheduleEventDelivery(ConcreteEventPath & aPath, EventOptions::Type aUrgent, uint32_t aBytesWritten)
{
if (aUrgent != EventOptions::Type::kUrgent)
{
return ScheduleBufferPressureEventDelivery(aBytesWritten);
}
else
{
return ScheduleUrgentEventDelivery(aPath);
}
return CHIP_NO_ERROR;
}

}; // namespace reporting
} // namespace app
} // namespace chip
Expand Down
8 changes: 6 additions & 2 deletions src/app/reporting/Engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,10 +91,10 @@ class Engine

/**
* @brief
* Schedule the urgent event delivery
* Schedule the event delivery
*
*/
CHIP_ERROR ScheduleUrgentEventDelivery(ConcreteEventPath & aPath);
CHIP_ERROR ScheduleEventDelivery(ConcreteEventPath & aPath, EventOptions::Type aUrgent, uint32_t aBytesWritten);

private:
friend class TestReportingEngine;
Expand Down Expand Up @@ -131,6 +131,10 @@ class Engine
*/
static void Run(System::Layer * aSystemLayer, void * apAppState);

CHIP_ERROR ScheduleUrgentEventDelivery(ConcreteEventPath & aPath);
CHIP_ERROR ScheduleBufferPressureEventDelivery(uint32_t aBytesWritten);
void GetMinEventLogPosition(uint32_t & aMinLogPosition);

/**
* Boolean to indicate if ScheduleRun is pending. This flag is used to prevent calling ScheduleRun multiple times
* within the same execution context to avoid applying too much pressure on platforms that use small, fixed size event queues.
Expand Down
20 changes: 20 additions & 0 deletions src/lib/core/CHIPConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -2759,6 +2759,26 @@ extern const char CHIP_NON_PRODUCTION_MARKER[];
#define CHIP_CONFIG_CASE_SESSION_RESUME_CACHE_SIZE 4
#endif

/**
* @def CHIP_CONFIG_EVENT_LOGGING_BYTE_THRESHOLD
*
* @brief The number of bytes written to the event logging system that
* will trigger Report Delivery.
*
* The configuration captures the number of bytes written to the event
* logging subsystem needed to trigger a report. For example, if an application wants to offload all DEBUG events
* reliably, the threshold should be set to less than the size of the
* DEBUG buffer (plus a slop factor to account for events generated
* during the scheduling and event offload). Similarly, if the
* application does not want to drop INFO events, the threshold should
* be set to the sum of DEBUG and INFO buffers (with the same
* correction).
*
*/
#ifndef CHIP_CONFIG_EVENT_LOGGING_BYTE_THRESHOLD
#define CHIP_CONFIG_EVENT_LOGGING_BYTE_THRESHOLD 512
#endif /* CHIP_CONFIG_EVENT_LOGGING_BYTE_THRESHOLD */

/**
* @def CHIP_CONFIG_ENABLE_SERVER_IM_EVENT
*
Expand Down

0 comments on commit 41700bd

Please sign in to comment.