Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Suppress various spammy trace events #56

Merged
merged 3 commits into from
Mar 9, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion fdbclient/FileBackupAgent.actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1539,7 +1539,7 @@ namespace fileBackup {
.detail("ScheduledVersion", scheduledVersion)
.detail("BeginKey", range.begin.printable())
.detail("EndKey", range.end.printable())
.suppressFor(2, true);
.suppressFor(2);
}
else {
// This shouldn't happen because if the transaction was already done or if another execution
Expand Down
4 changes: 2 additions & 2 deletions fdbrpc/AsyncFileEIO.actor.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ class AsyncFileEIO : public IAsyncFile, public ReferenceCounted<AsyncFileEIO> {
TraceEvent(notFound ? SevWarn : SevWarnAlways, "FileOpenError").error(e).GetLastError().detail("File", filename).detail("Flags", flags).detail("Mode", mode);
throw e;
}
TraceEvent("AsyncFileOpened").detail("Filename", filename).detail("fd", r->result).detail("Flags", flags);
TraceEvent("AsyncFileOpened").detail("Filename", filename).detail("fd", r->result).detail("Flags", flags).suppressFor(1.0);

if ((flags & OPEN_LOCK) && !lock_fd(r->result)) {
TraceEvent(SevError, "UnableToLockFile").detail("filename", filename).GetLastError();
Expand Down Expand Up @@ -264,7 +264,7 @@ class AsyncFileEIO : public IAsyncFile, public ReferenceCounted<AsyncFileEIO> {
state eio_req* r = eio_close(fd, 0, eio_callback, &p);
Void _ = wait( p.getFuture() );
if (r->result) error( "CloseError", fd, r );
TraceEvent("AsyncFileClosed").detail("fd", fd);
TraceEvent("AsyncFileClosed").detail("fd", fd).suppressFor(1.0);
}

ACTOR static Future<int> read_impl( int fd, void* data, int length, int64_t offset ) {
Expand Down
2 changes: 1 addition & 1 deletion fdbrpc/FailureMonitor.actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ void SimpleFailureMonitor::setStatus( NetworkAddress const& address, FailureStat

void SimpleFailureMonitor::endpointNotFound( Endpoint const& endpoint ) {
// SOMEDAY: Expiration (this "leaks" memory)
TraceEvent("EndpointNotFound").detail("Address", endpoint.address).detail("Token", endpoint.token);
TraceEvent("EndpointNotFound").detail("Address", endpoint.address).detail("Token", endpoint.token).suppressFor(1.0);
endpointKnownFailed.set( endpoint, true );
}

Expand Down
16 changes: 8 additions & 8 deletions fdbrpc/FlowTransport.actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -369,14 +369,14 @@ struct Peer : NonCopyable {
Void _ = wait( delayJittered( std::max(0.0, self->lastConnectTime+self->reconnectionDelay - now()) ) ); // Don't connect() to the same peer more than once per 2 sec
self->lastConnectTime = now();

TraceEvent("ConnectingTo", conn ? conn->getDebugID() : UID()).detail("PeerAddr", self->destination);
TraceEvent("ConnectingTo", conn ? conn->getDebugID() : UID()).detail("PeerAddr", self->destination).suppressFor(1.0);
Reference<IConnection> _conn = wait( timeout( INetworkConnections::net()->connect(self->destination), FLOW_KNOBS->CONNECTION_MONITOR_TIMEOUT, Reference<IConnection>() ) );
if (_conn) {
conn = _conn;
TraceEvent("ConnectionExchangingConnectPacket", conn->getDebugID()).detail("PeerAddr", self->destination);
TraceEvent("ConnectionExchangingConnectPacket", conn->getDebugID()).detail("PeerAddr", self->destination).suppressFor(1.0);
self->prependConnectPacket();
} else {
TraceEvent("ConnectionTimedOut", conn ? conn->getDebugID() : UID()).detail("PeerAddr", self->destination);
TraceEvent("ConnectionTimedOut", conn ? conn->getDebugID() : UID()).detail("PeerAddr", self->destination).suppressFor(1.0);
throw connection_failed();
}

Expand Down Expand Up @@ -408,7 +408,7 @@ struct Peer : NonCopyable {
bool ok = e.code() == error_code_connection_failed || e.code() == error_code_actor_cancelled || ( g_network->isSimulated() && e.code() == error_code_checksum_failed );

if(self->compatible) {
TraceEvent(ok ? SevInfo : SevError, "ConnectionClosed", conn ? conn->getDebugID() : UID()).detail("PeerAddr", self->destination).error(e, true);
TraceEvent(ok ? SevInfo : SevWarnAlways, "ConnectionClosed", conn ? conn->getDebugID() : UID()).detail("PeerAddr", self->destination).error(e, true).suppressFor(1.0);
}
else {
TraceEvent(ok ? SevInfo : SevError, "IncompatibleConnectionClosed", conn ? conn->getDebugID() : UID()).detail("PeerAddr", self->destination).error(e, true);
Expand Down Expand Up @@ -637,7 +637,7 @@ ACTOR static Future<Void> connectionReader(
compatible = true;
TraceEvent("ConnectionEstablished", conn->getDebugID())
.detail("Peer", conn->getPeerAddress())
.detail("ConnectionId", connectionId);
.detail("ConnectionId", connectionId).suppressFor(1.0);
}

if(connectionId > 1) {
Expand All @@ -649,7 +649,7 @@ ACTOR static Future<Void> connectionReader(
peerProtocolVersion = p->protocolVersion;
if (peer != nullptr) {
// Outgoing connection; port information should be what we expect
TraceEvent("ConnectedOutgoing").detail("PeerAddr", NetworkAddress( p->canonicalRemoteIp, p->canonicalRemotePort ) );
TraceEvent("ConnectedOutgoing").detail("PeerAddr", NetworkAddress( p->canonicalRemoteIp, p->canonicalRemotePort ) ).suppressFor(1.0);
peer->compatible = compatible;
if (!compatible)
peer->transport->numIncompatibleConnections++;
Expand Down Expand Up @@ -710,7 +710,7 @@ ACTOR static Future<Void> connectionIncoming( TransportData* self, Reference<ICo
}
return Void();
} catch (Error& e) {
TraceEvent("IncomingConnectionError", conn->getDebugID()).error(e).detail("FromAddress", conn->getPeerAddress());
TraceEvent("IncomingConnectionError", conn->getDebugID()).error(e).detail("FromAddress", conn->getPeerAddress()).suppressFor(1.0);
conn->close();
return Void();
}
Expand All @@ -722,7 +722,7 @@ ACTOR static Future<Void> listen( TransportData* self, NetworkAddress listenAddr
try {
loop {
Reference<IConnection> conn = wait( listener->accept() );
TraceEvent("ConnectionFrom", conn->getDebugID()).detail("FromAddress", conn->getPeerAddress());
TraceEvent("ConnectionFrom", conn->getDebugID()).detail("FromAddress", conn->getPeerAddress()).suppressFor(1.0);
incoming.add( connectionIncoming(self, conn) );
}
} catch (Error& e) {
Expand Down
5 changes: 2 additions & 3 deletions fdbserver/DataDistribution.actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -611,7 +611,6 @@ struct DDTeamCollection {
}

if( foundExact || (req.wantsTrueBest && bestOption.present() ) ) {
TraceEvent("getTeam").detail("wantsVariety", req.wantsNewServers).detail("bestOption", bestOption.get()->getDesc());
ASSERT( bestOption.present() );
req.reply.send( bestOption );
return Void();
Expand Down Expand Up @@ -1708,7 +1707,7 @@ ACTOR Future<Void> storageRecruiter( DDTeamCollection *self, Reference<AsyncVar<
for(auto s = self->server_info.begin(); s != self->server_info.end(); ++s) {
auto serverStatus = self->server_status.get( s->second->lastKnownInterface.id() );
if( serverStatus.excludeOnRecruit() ) {
TraceEvent("DDRecruitExcl1").detail("Excluding", s->second->lastKnownInterface.address());
TraceEvent(SevDebug, "DDRecruitExcl1").detail("Excluding", s->second->lastKnownInterface.address());
auto addr = s->second->lastKnownInterface.address();
exclusions.insert( AddressExclusion( addr.ip, addr.port ) );
}
Expand All @@ -1720,7 +1719,7 @@ ACTOR Future<Void> storageRecruiter( DDTeamCollection *self, Reference<AsyncVar<
auto excl = self->excludedServers.getKeys();
for(auto& s : excl)
if (self->excludedServers.get(s)) {
TraceEvent("DDRecruitExcl2").detail("Excluding", s.toString());
TraceEvent(SevDebug, "DDRecruitExcl2").detail("Excluding", s.toString());
exclusions.insert( s );
}
rsr.criticalRecruitment = self->healthyTeamCount == 0;
Expand Down
18 changes: 15 additions & 3 deletions fdbserver/DataDistributionQueue.actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,8 @@ struct DDQueueData {
PromiseStream<GetMetricsRequest> getShardMetrics;

double* lastLimited;
double lastInterval;
int suppressIntervals;

DDQueueData( MasterInterface mi, MoveKeysLock lock, Database cx, TeamCollectionInterface teamCollection,
Reference<ShardsAffectedByTeamFailure> sABTF, PromiseStream<Promise<int64_t>> getAverageShardBytes,
Expand All @@ -202,7 +204,7 @@ struct DDQueueData {
shardsAffectedByTeamFailure( sABTF ), getAverageShardBytes( getAverageShardBytes ), mi( mi ), lock( lock ),
cx( cx ), teamSize( teamSize ), durableStorageQuorum( durableStorageQuorum ), input( input ),
getShardMetrics( getShardMetrics ), startMoveKeysParallelismLock( SERVER_KNOBS->DD_MOVE_KEYS_PARALLELISM ),
finishMoveKeysParallelismLock( SERVER_KNOBS->DD_MOVE_KEYS_PARALLELISM ), lastLimited(lastLimited) {}
finishMoveKeysParallelismLock( SERVER_KNOBS->DD_MOVE_KEYS_PARALLELISM ), lastLimited(lastLimited), suppressIntervals(0), lastInterval(0) {}

void validate() {
if( EXPENSIVE_VALIDATION ) {
Expand Down Expand Up @@ -652,9 +654,19 @@ ACTOR Future<Void> dataDistributionRelocator( DDQueueData *self, RelocateData rd
state Reference<IDataDistributionTeam> destination;

try {
if(now() - self->lastInterval < 1.0) {
relocateShardInterval.severity = SevDebug;
self->suppressIntervals++;
}

TraceEvent(relocateShardInterval.begin(), masterId)
.detail("KeyBegin", printable(rd.keys.begin)).detail("KeyEnd", printable(rd.keys.end))
.detail("Priority", rd.priority).detail("RelocationID", relocateShardInterval.pairID);
.detail("Priority", rd.priority).detail("RelocationID", relocateShardInterval.pairID).detail("SuppressedEventCount", self->suppressIntervals);

if(relocateShardInterval.severity != SevDebug) {
self->lastInterval = now();
self->suppressIntervals = 0;
}

state StorageMetrics metrics = wait( brokenPromiseToNever( self->getShardMetrics.getReply( GetMetricsRequest( rd.keys ) ) ) );

Expand Down Expand Up @@ -690,7 +702,7 @@ ACTOR Future<Void> dataDistributionRelocator( DDQueueData *self, RelocateData rd

destination->addDataInFlightToTeam( +metrics.bytes );

TraceEvent("RelocateShardHasDestination", masterId)
TraceEvent(relocateShardInterval.severity, "RelocateShardHasDestination", masterId)
.detail("PairId", relocateShardInterval.pairID)
.detail("DestinationTeam", destination->getDesc());

Expand Down
6 changes: 2 additions & 4 deletions fdbserver/storageserver.actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1840,7 +1840,7 @@ ACTOR Future<Void> fetchKeys( StorageServer *data, AddingShard* shard ) {

break;
} catch (Error& e) {
TraceEvent("FKBlockFail", data->thisServerID).detail("FKID", interval.pairID).error(e,true);
TraceEvent("FKBlockFail", data->thisServerID).detail("FKID", interval.pairID).error(e,true).suppressFor(1.0);
if (e.code() == error_code_transaction_too_old){
TEST(true); // A storage server has forgotten the history data we are fetching
Void _ = wait( delayJittered( FLOW_KNOBS->PREVENT_FAST_SPIN_DELAY ) );
Expand Down Expand Up @@ -2290,9 +2290,7 @@ ACTOR Future<Void> update( StorageServer* data, bool* pReceivedUpdate )
if (now() - waitStartT >= .1) {
TraceEvent(SevWarn, "StorageServerUpdateLag", data->thisServerID)
.detail("Version", data->version.get())
.detail("DurableVersion", data->durableVersion.get())
//.detail("ExtraBytes", usedBytesOlderThanDesiredDurableVersion(data))
;
.detail("DurableVersion", data->durableVersion.get()).suppressFor(1.0);
waitStartT = now();
}

Expand Down
12 changes: 4 additions & 8 deletions fdbserver/workloads/ConsistencyCheck.actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -682,7 +682,6 @@ struct ConsistencyCheckWorkload : TestWorkload
state Key lastSampleKey;
state Key lastStartSampleKey;
state int64_t totalReadAmount = 0;
state int64_t rangeBytes = 0;

state KeySelector begin = firstGreaterOrEqual(range.begin);

Expand Down Expand Up @@ -726,9 +725,6 @@ struct ConsistencyCheckWorkload : TestWorkload
{
state GetKeyValuesReply current = rangeResult.get();
totalReadAmount += current.data.expectedSize();
if (j == 0) {
rangeBytes += current.data.expectedSize();
}
//If we haven't encountered a valid storage server yet, then mark this as the baseline to compare against
if(firstValidServer == -1)
firstValidServer = j;
Expand Down Expand Up @@ -841,7 +837,7 @@ struct ConsistencyCheckWorkload : TestWorkload
else if(!isRelocating)
{
TraceEvent("ConsistencyCheck_StorageServerUnavailable").detail("StorageServer", storageServers[j]).detail("ShardBegin", printable(range.begin)).detail("ShardEnd", printable(range.end))
.detail("Address", storageServerInterfaces[j].address()).detail("GetKeyValuesToken", storageServerInterfaces[j].getKeyValues.getEndpoint().token);
.detail("Address", storageServerInterfaces[j].address()).detail("GetKeyValuesToken", storageServerInterfaces[j].getKeyValues.getEndpoint().token).suppressFor(1.0);

//All shards should be available in quiscence
if(self->performQuiescentChecks)
Expand Down Expand Up @@ -915,8 +911,6 @@ struct ConsistencyCheckWorkload : TestWorkload
}
}

TraceEvent("ConsistencyCheck_CheckedRange").detail("Range", printable(range)).detail("Bytes", rangeBytes);

canSplit = canSplit && sampledBytes - splitBytes >= shardBounds.min.bytes && sampledBytes > splitBytes;

//Update the size of all storage servers containing this shard
Expand Down Expand Up @@ -983,7 +977,9 @@ struct ConsistencyCheckWorkload : TestWorkload
}
}

TraceEvent("ConsistencyCheck_ReadRange").detail("range", printable(range)).detail("bytesRead", bytesReadInRange);
if(bytesReadInRange > 0) {
TraceEvent("ConsistencyCheck_ReadRange").detail("range", printable(range)).detail("bytesRead", bytesReadInRange);
}
}

//SOMEDAY: when background data distribution is implemented, include this test
Expand Down
6 changes: 3 additions & 3 deletions flow/Net2.actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -412,15 +412,15 @@ class Connection : public IConnection, ReferenceCounted<Connection> {
boost::system::error_code error;
socket.close(error);
if (error)
TraceEvent(SevWarn, "N2_CloseError", id).detail("Message", error.value());
TraceEvent(SevWarn, "N2_CloseError", id).detail("Message", error.value()).suppressFor(1.0);
}

void onReadError( const boost::system::error_code& error ) {
TraceEvent(SevWarn, "N2_ReadError", id).detail("Message", error.value());
TraceEvent(SevWarn, "N2_ReadError", id).detail("Message", error.value()).suppressFor(1.0);
closeSocket();
}
void onWriteError( const boost::system::error_code& error ) {
TraceEvent(SevWarn, "N2_WriteError", id).detail("Message", error.value());
TraceEvent(SevWarn, "N2_WriteError", id).detail("Message", error.value()).suppressFor(1.0);
closeSocket();
}
};
Expand Down
2 changes: 1 addition & 1 deletion flow/Trace.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -650,7 +650,7 @@ TraceEvent::TraceEvent(Severity severity, const char* type, const Optional<Stand
}

TraceEvent::TraceEvent( TraceInterval& interval, UID id ) : type(""), id(id) {
init(SevInfo, interval);
init(interval.severity, interval);
detail("ID", id);
}

Expand Down
3 changes: 2 additions & 1 deletion flow/Trace.h
Original file line number Diff line number Diff line change
Expand Up @@ -203,14 +203,15 @@ struct TraceEvent {
#endif

struct TraceInterval {
TraceInterval( const char* type ) : count(-1), type(type) {}
TraceInterval( const char* type ) : count(-1), type(type), severity(SevInfo) {}

TraceInterval& begin();
TraceInterval& end() { return *this; }

const char* type;
UID pairID;
int count;
Severity severity;
};

struct LatestEventCache {
Expand Down