Skip to content

Commit

Permalink
Merge pull request #1078 from mattrjacobs/move-deduping-logic-from-js…
Browse files Browse the repository at this point in the history
…on-to-core

Move logic for deduplicating commands in a request from the JSON serialization to the model object
  • Loading branch information
mattrjacobs committed Jan 30, 2016
2 parents fbed295 + 3c4bda7 commit 6b97ee5
Show file tree
Hide file tree
Showing 2 changed files with 175 additions and 150 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,14 @@
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonGenerator;
import com.netflix.hystrix.ExecutionResult;
import com.netflix.hystrix.HystrixCollapserKey;
import com.netflix.hystrix.HystrixCommandKey;
import com.netflix.hystrix.HystrixEventType;
import com.netflix.hystrix.HystrixInvokableInfo;
import com.netflix.hystrix.metric.HystrixRequestEvents;
import com.netflix.hystrix.metric.HystrixRequestEventsStream;
import rx.Observable;

import java.io.IOException;
import java.io.StringWriter;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

Expand Down Expand Up @@ -66,69 +61,19 @@ public static String convertRequestToJson(HystrixRequestEvents request) throws I

private static void writeRequestAsJson(JsonGenerator json, HystrixRequestEvents request) throws IOException {
json.writeStartArray();
Map<CommandAndCacheKey, Integer> cachingDetector = new HashMap<CommandAndCacheKey, Integer>();
List<HystrixInvokableInfo<?>> nonCachedExecutions = new ArrayList<HystrixInvokableInfo<?>>(request.getExecutions().size());
for (HystrixInvokableInfo<?> execution: request.getExecutions()) {
if (execution.getPublicCacheKey() != null) {
//eligible for caching - might be the initial, or might be from cache
CommandAndCacheKey key = new CommandAndCacheKey(execution.getCommandKey().name(), execution.getPublicCacheKey());
Integer count = cachingDetector.get(key);
if (count != null) {
//key already seen
cachingDetector.put(key, count + 1);
} else {
//key not seen yet
cachingDetector.put(key, 0);
}
}
if (!execution.isResponseFromCache()) {
nonCachedExecutions.add(execution);
}
}

Map<ExecutionSignature, List<Integer>> commandDeduper = new HashMap<ExecutionSignature, List<Integer>>();
for (HystrixInvokableInfo<?> execution: nonCachedExecutions) {
int cachedCount = 0;
String cacheKey = null;
if (execution.getPublicCacheKey() != null) {
cacheKey = execution.getPublicCacheKey();
CommandAndCacheKey key = new CommandAndCacheKey(execution.getCommandKey().name(), cacheKey);
cachedCount = cachingDetector.get(key);
}
ExecutionSignature signature;
HystrixCollapserKey collapserKey = execution.getOriginatingCollapserKey();
int collapserBatchCount = execution.getNumberCollapsed();
if (cachedCount > 0) {
//this has a RESPONSE_FROM_CACHE and needs to get split off
signature = ExecutionSignature.from(execution, cacheKey, cachedCount);
} else {
//nothing cached from this, can collapse further
signature = ExecutionSignature.from(execution);
}
List<Integer> currentLatencyList = commandDeduper.get(signature);
if (currentLatencyList != null) {
currentLatencyList.add(execution.getExecutionTimeInMilliseconds());
} else {
List<Integer> newLatencyList = new ArrayList<Integer>();
newLatencyList.add(execution.getExecutionTimeInMilliseconds());
commandDeduper.put(signature, newLatencyList);
}
}

for (Map.Entry<ExecutionSignature, List<Integer>> entry: commandDeduper.entrySet()) {
ExecutionSignature executionSignature = entry.getKey();
List<Integer> latencies = entry.getValue();
convertExecutionToJson(json, executionSignature, latencies);
for (Map.Entry<HystrixRequestEvents.ExecutionSignature, List<Integer>> entry: request.getExecutionsMappedToLatencies().entrySet()) {
convertExecutionToJson(json, entry.getKey(), entry.getValue());
}

json.writeEndArray();
}

private static void convertExecutionToJson(JsonGenerator json, ExecutionSignature executionSignature, List<Integer> latencies) throws IOException {
private static void convertExecutionToJson(JsonGenerator json, HystrixRequestEvents.ExecutionSignature executionSignature, List<Integer> latencies) throws IOException {
json.writeStartObject();
json.writeStringField("name", executionSignature.commandName);
json.writeStringField("name", executionSignature.getCommandName());
json.writeArrayFieldStart("events");
ExecutionResult.EventCounts eventCounts = executionSignature.eventCounts;
ExecutionResult.EventCounts eventCounts = executionSignature.getEventCounts();
for (HystrixEventType eventType: HystrixEventType.values()) {
if (!eventType.equals(HystrixEventType.COLLAPSED)) {
if (eventCounts.contains(eventType)) {
Expand All @@ -150,99 +95,15 @@ private static void convertExecutionToJson(JsonGenerator json, ExecutionSignatur
json.writeNumber(latency);
}
json.writeEndArray();
if (executionSignature.cachedCount > 0) {
json.writeNumberField("cached", executionSignature.cachedCount);
if (executionSignature.getCachedCount() > 0) {
json.writeNumberField("cached", executionSignature.getCachedCount());
}
if (executionSignature.eventCounts.contains(HystrixEventType.COLLAPSED)) {
if (executionSignature.getEventCounts().contains(HystrixEventType.COLLAPSED)) {
json.writeObjectFieldStart("collapsed");
json.writeStringField("name", executionSignature.collapserKey.name());
json.writeNumberField("count", executionSignature.collapserBatchSize);
json.writeStringField("name", executionSignature.getCollapserKey().name());
json.writeNumberField("count", executionSignature.getCollapserBatchSize());
json.writeEndObject();
}
json.writeEndObject();
}

private static class CommandAndCacheKey {
private final String commandName;
private final String cacheKey;

public CommandAndCacheKey(String commandName, String cacheKey) {
this.commandName = commandName;
this.cacheKey = cacheKey;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;

CommandAndCacheKey that = (CommandAndCacheKey) o;

if (!commandName.equals(that.commandName)) return false;
return cacheKey.equals(that.cacheKey);

}

@Override
public int hashCode() {
int result = commandName.hashCode();
result = 31 * result + cacheKey.hashCode();
return result;
}

@Override
public String toString() {
return "CommandAndCacheKey{" +
"commandName='" + commandName + '\'' +
", cacheKey='" + cacheKey + '\'' +
'}';
}
}

private static class ExecutionSignature {
private final String commandName;
private final ExecutionResult.EventCounts eventCounts;
private final String cacheKey;
private final int cachedCount;
private final HystrixCollapserKey collapserKey;
private final int collapserBatchSize;

private ExecutionSignature(HystrixCommandKey commandKey, ExecutionResult.EventCounts eventCounts, String cacheKey, int cachedCount, HystrixCollapserKey collapserKey, int collapserBatchSize) {
this.commandName = commandKey.name();
this.eventCounts = eventCounts;
this.cacheKey = cacheKey;
this.cachedCount = cachedCount;
this.collapserKey = collapserKey;
this.collapserBatchSize = collapserBatchSize;
}

public static ExecutionSignature from(HystrixInvokableInfo<?> execution) {
return new ExecutionSignature(execution.getCommandKey(), execution.getEventCounts(), null, 0, execution.getOriginatingCollapserKey(), execution.getNumberCollapsed());
}

public static ExecutionSignature from(HystrixInvokableInfo<?> execution, String cacheKey, int cachedCount) {
return new ExecutionSignature(execution.getCommandKey(), execution.getEventCounts(), cacheKey, cachedCount, execution.getOriginatingCollapserKey(), execution.getNumberCollapsed());
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;

ExecutionSignature that = (ExecutionSignature) o;

if (!commandName.equals(that.commandName)) return false;
if (!eventCounts.equals(that.eventCounts)) return false;
return !(cacheKey != null ? !cacheKey.equals(that.cacheKey) : that.cacheKey != null);

}

@Override
public int hashCode() {
int result = commandName.hashCode();
result = 31 * result + eventCounts.hashCode();
result = 31 * result + (cacheKey != null ? cacheKey.hashCode() : 0);
return result;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,18 @@
*/
package com.netflix.hystrix.metric;

import com.netflix.hystrix.ExecutionResult;
import com.netflix.hystrix.HystrixCollapserKey;
import com.netflix.hystrix.HystrixCommandKey;
import com.netflix.hystrix.HystrixInvokableInfo;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class HystrixRequestEvents {

private final Collection<HystrixInvokableInfo<?>> executions;

public HystrixRequestEvents(Collection<HystrixInvokableInfo<?>> executions) {
Expand All @@ -30,4 +36,162 @@ public HystrixRequestEvents(Collection<HystrixInvokableInfo<?>> executions) {
public Collection<HystrixInvokableInfo<?>> getExecutions() {
return executions;
}

public Map<ExecutionSignature, List<Integer>> getExecutionsMappedToLatencies() {
Map<CommandAndCacheKey, Integer> cachingDetector = new HashMap<CommandAndCacheKey, Integer>();
List<HystrixInvokableInfo<?>> nonCachedExecutions = new ArrayList<HystrixInvokableInfo<?>>(executions.size());
for (HystrixInvokableInfo<?> execution: executions) {
if (execution.getPublicCacheKey() != null) {
//eligible for caching - might be the initial, or might be from cache
CommandAndCacheKey key = new CommandAndCacheKey(execution.getCommandKey().name(), execution.getPublicCacheKey());
Integer count = cachingDetector.get(key);
if (count != null) {
//key already seen
cachingDetector.put(key, count + 1);
} else {
//key not seen yet
cachingDetector.put(key, 0);
}
}
if (!execution.isResponseFromCache()) {
nonCachedExecutions.add(execution);
}
}

Map<ExecutionSignature, List<Integer>> commandDeduper = new HashMap<ExecutionSignature, List<Integer>>();
for (HystrixInvokableInfo<?> execution: nonCachedExecutions) {
int cachedCount = 0;
String cacheKey = null;
if (execution.getPublicCacheKey() != null) {
cacheKey = execution.getPublicCacheKey();
CommandAndCacheKey key = new CommandAndCacheKey(execution.getCommandKey().name(), cacheKey);
cachedCount = cachingDetector.get(key);
}
ExecutionSignature signature;
HystrixCollapserKey collapserKey = execution.getOriginatingCollapserKey();
int collapserBatchCount = execution.getNumberCollapsed();
if (cachedCount > 0) {
//this has a RESPONSE_FROM_CACHE and needs to get split off
signature = ExecutionSignature.from(execution, cacheKey, cachedCount);
} else {
//nothing cached from this, can collapse further
signature = ExecutionSignature.from(execution);
}
List<Integer> currentLatencyList = commandDeduper.get(signature);
if (currentLatencyList != null) {
currentLatencyList.add(execution.getExecutionTimeInMilliseconds());
} else {
List<Integer> newLatencyList = new ArrayList<Integer>();
newLatencyList.add(execution.getExecutionTimeInMilliseconds());
commandDeduper.put(signature, newLatencyList);
}
}

return commandDeduper;
}

private static class CommandAndCacheKey {
private final String commandName;
private final String cacheKey;

public CommandAndCacheKey(String commandName, String cacheKey) {
this.commandName = commandName;
this.cacheKey = cacheKey;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;

CommandAndCacheKey that = (CommandAndCacheKey) o;

if (!commandName.equals(that.commandName)) return false;
return cacheKey.equals(that.cacheKey);

}

@Override
public int hashCode() {
int result = commandName.hashCode();
result = 31 * result + cacheKey.hashCode();
return result;
}

@Override
public String toString() {
return "CommandAndCacheKey{" +
"commandName='" + commandName + '\'' +
", cacheKey='" + cacheKey + '\'' +
'}';
}
}

public static class ExecutionSignature {
private final String commandName;
private final ExecutionResult.EventCounts eventCounts;
private final String cacheKey;
private final int cachedCount;
private final HystrixCollapserKey collapserKey;
private final int collapserBatchSize;

private ExecutionSignature(HystrixCommandKey commandKey, ExecutionResult.EventCounts eventCounts, String cacheKey, int cachedCount, HystrixCollapserKey collapserKey, int collapserBatchSize) {
this.commandName = commandKey.name();
this.eventCounts = eventCounts;
this.cacheKey = cacheKey;
this.cachedCount = cachedCount;
this.collapserKey = collapserKey;
this.collapserBatchSize = collapserBatchSize;
}

public static ExecutionSignature from(HystrixInvokableInfo<?> execution) {
return new ExecutionSignature(execution.getCommandKey(), execution.getEventCounts(), null, 0, execution.getOriginatingCollapserKey(), execution.getNumberCollapsed());
}

public static ExecutionSignature from(HystrixInvokableInfo<?> execution, String cacheKey, int cachedCount) {
return new ExecutionSignature(execution.getCommandKey(), execution.getEventCounts(), cacheKey, cachedCount, execution.getOriginatingCollapserKey(), execution.getNumberCollapsed());
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;

ExecutionSignature that = (ExecutionSignature) o;

if (!commandName.equals(that.commandName)) return false;
if (!eventCounts.equals(that.eventCounts)) return false;
return !(cacheKey != null ? !cacheKey.equals(that.cacheKey) : that.cacheKey != null);

}

@Override
public int hashCode() {
int result = commandName.hashCode();
result = 31 * result + eventCounts.hashCode();
result = 31 * result + (cacheKey != null ? cacheKey.hashCode() : 0);
return result;
}

public String getCommandName() {
return commandName;
}

public ExecutionResult.EventCounts getEventCounts() {
return eventCounts;
}

public int getCachedCount() {
return cachedCount;
}


public HystrixCollapserKey getCollapserKey() {
return collapserKey;
}

public int getCollapserBatchSize() {
return collapserBatchSize;
}
}
}

0 comments on commit 6b97ee5

Please sign in to comment.