Skip to content

Commit

Permalink
Fixing message filter
Browse files Browse the repository at this point in the history
  • Loading branch information
grafnu committed Oct 8, 2024
1 parent 6e9d1df commit 723dd66
Show file tree
Hide file tree
Showing 5 changed files with 21 additions and 12 deletions.
2 changes: 1 addition & 1 deletion bin/pull_pubsub
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ echo Pulling from subscription $subscription
pull_limit=100

while true; do
date
date -u -Is
gcloud --format=json --project=$project_id pubsub subscriptions pull $subscription --limit $pull_limit --auto-ack > $TMP_FILE || true

for index in $(seq 0 $((pull_limit-1))); do
Expand Down
5 changes: 5 additions & 0 deletions common/src/main/java/com/google/udmi/util/JsonUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.google.gson.internal.bind.util.ISO8601Utils;
import java.io.File;
import java.nio.file.Files;
import java.time.Instant;
Expand Down Expand Up @@ -205,6 +206,10 @@ public static String isoConvert(Date timestamp) {
}
}

public static String currentIsoMs() {
return ISO8601Utils.format(new Date(), true);
}

/**
* Load a file to given type.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ public void notice(String message, Object... args) {
@Override
public void output(Level level, String message) {
PrintStream printStream = level.value() >= Level.WARNING.value() ? System.err : System.out;
printStream.printf("%s %s %s: %s %s%n", JsonUtil.isoConvert(), getExecutionContext(),
printStream.printf("%s %s %s: %s %s%n", JsonUtil.currentIsoMs(), getExecutionContext(),
level.name().charAt(0), getSimpleName(), message);
printStream.flush();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -396,8 +396,13 @@ private static IotReflectorClient getAlternateClient() {
altConfiguration.udmi_namespace = exeConfig.udmi_namespace;
altConfiguration.alt_registry = null;

return getReflectorClient(altConfiguration);
}

private static IotReflectorClient getReflectorClient(ExecutionConfiguration altConfiguration) {
try {
return new IotReflectorClient(altConfiguration, getRequiredFunctionsVersion());
return new IotReflectorClient(altConfiguration, getRequiredFunctionsVersion(),
SequenceBase::messageFilter);
} catch (Exception e) {
System.err.println(
"Could not connect to alternate registry, disabling: " + friendlyStackTrace(e));
Expand All @@ -408,6 +413,10 @@ private static IotReflectorClient getAlternateClient() {
}
}

private static boolean messageFilter(Envelope envelope) {
return true;
}

private static int getRequiredFunctionsVersion() {
return SequenceRunner.processStage(ALPHA) ? SEQUENCER_FUNCTIONS_ALPHA
: SEQUENCER_FUNCTIONS_VERSION;
Expand Down Expand Up @@ -561,7 +570,7 @@ private static MessagePublisher getReflectorClient() {
format("IoT Provider '%s' not supported, should be one of: %s", exeConfig.iot_provider,
CSV_JOINER.join(SEQUENCER_PROVIDERS)));
}
return new IotReflectorClient(exeConfig, getRequiredFunctionsVersion());
return getReflectorClient(exeConfig);
}

private static MessagePublisher altReflector() {
Expand Down Expand Up @@ -1191,6 +1200,7 @@ protected void updateConfig(String reason) {

private void updateConfig(String reason, boolean force) {
assertConfigIsNotPending();

// Add a forced sleep to make sure second-quantized timestamps are unique.
safeSleep(CONFIG_BARRIER_MS);

Expand Down Expand Up @@ -1774,11 +1784,8 @@ private synchronized void handleUpdateMessage(String subTypeRaw,
}
List<DiffEntry> changes = updateDeviceConfig(config);
debug(format("Updated config %s %s", isoConvert(config.timestamp), txnId));
if (updateCount == 1) {
info(format("Initial config #%03d", updateCount), stringify(deviceConfig));
} else {
info(format("Updated config #%03d", updateCount), changedLines(changes));
}
String changeUpdate = updateCount == 1 ? stringify(deviceConfig) : changedLines(changes);
info(format("Updated config #%03d", updateCount), changeUpdate);
} else if (converted instanceof State convertedState) {
String timestamp = isoConvert(convertedState.timestamp);
if (convertedState.timestamp == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,7 @@
import static com.google.udmi.util.GeneralUtils.ifNotNullThen;
import static com.google.udmi.util.JsonUtil.isoConvert;
import static com.google.udmi.util.JsonUtil.stringify;
import static com.google.udmi.util.JsonUtil.stringifyTerse;
import static com.google.udmi.util.JsonUtil.toStringMap;
import static com.google.udmi.util.JsonUtil.writeFile;
import static java.lang.String.format;
import static java.time.Instant.ofEpochSecond;
import static java.util.Objects.requireNonNull;
Expand Down Expand Up @@ -43,7 +41,6 @@
import com.google.pubsub.v1.ProjectTopicName;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.SeekRequest;
import java.io.File;
import java.util.HashMap;
import java.util.Map;
import java.util.TreeMap;
Expand Down

0 comments on commit 723dd66

Please sign in to comment.