From 0a51d058d86ba8d9644737846c3d2cc0b24260d2 Mon Sep 17 00:00:00 2001 From: Trevor Date: Fri, 4 Oct 2024 09:55:07 -0700 Subject: [PATCH] Make validator work with //pubsub/ project spec (#977) --- .wordlist.txt | 2 + .../java/com/google/udmi/util/Common.java | 4 +- docs/tools/project_spec.md | 45 ++++++++++ docs/tools/readme.md | 5 ++ .../google/daq/mqtt/registrar/Registrar.java | 7 +- .../google/daq/mqtt/util/CloudIotManager.java | 49 ++++++----- .../google/daq/mqtt/util/IotMockProvider.java | 1 + .../daq/mqtt/util/IotReflectorClient.java | 1 + .../daq/mqtt/util/MessagePublisher.java | 15 ++-- .../google/daq/mqtt/util/PubSubClient.java | 82 +++++++++++++++++-- .../google/daq/mqtt/validator/Validator.java | 26 ++++-- .../{daq/mqtt => udmi}/util/IotProvider.java | 2 +- 12 files changed, 190 insertions(+), 49 deletions(-) create mode 100644 docs/tools/project_spec.md rename validator/src/main/java/com/google/{daq/mqtt => udmi}/util/IotProvider.java (98%) diff --git a/.wordlist.txt b/.wordlist.txt index d38c986f66..cea0b8e201 100644 --- a/.wordlist.txt +++ b/.wordlist.txt @@ -93,6 +93,7 @@ md modbus MQTT mydomain +namespace niversal npm nterface @@ -118,6 +119,7 @@ proxying PUBACKs pubber PubSub +pubsub QoS Readme repo diff --git a/common/src/main/java/com/google/udmi/util/Common.java b/common/src/main/java/com/google/udmi/util/Common.java index 4099fa5a70..ac16e89d7f 100644 --- a/common/src/main/java/com/google/udmi/util/Common.java +++ b/common/src/main/java/com/google/udmi/util/Common.java @@ -58,7 +58,7 @@ public abstract class Common { public static final String SOURCE_SEPARATOR = "+"; public static final String SOURCE_SEPARATOR_REGEX = "\\" + SOURCE_SEPARATOR; - private static final String PREFIX_SEPARATOR = "~"; + public static final String NAMESPACE_SEPARATOR = "~"; private static final String UDMI_VERSION_ENV = "UDMI_TOOLS"; public static final int EXIT_CODE_ERROR = 1; @@ -179,6 +179,6 @@ private static String getExceptionMessage(Throwable exception, Class containe } public static String getNamespacePrefix(String udmiNamespace) { - return Strings.isNullOrEmpty(udmiNamespace) ? "" : udmiNamespace + PREFIX_SEPARATOR; + return Strings.isNullOrEmpty(udmiNamespace) ? "" : udmiNamespace + NAMESPACE_SEPARATOR; } } diff --git a/docs/tools/project_spec.md b/docs/tools/project_spec.md new file mode 100644 index 0000000000..deee1abd91 --- /dev/null +++ b/docs/tools/project_spec.md @@ -0,0 +1,45 @@ +[**UDMI**](../../) / [**Docs**](../) / [**Tools**](./) / [Project Spec](#) + +# Project Specification + +`//provider/project[/namespace][+user]` + +* `provider` + * `gbos` + * `pref` + * `mqtt` + * `pubsub` +* `project` + * GCP project + * IoT Core project +* `namespace` + * default + * k8s namespace + * pubsub namespace +* `user` + * canonical users + * individual users + +* Examples + +`//gbos/bos-platform-dev` +`//gbos/bos-platform-dev/peringknife` +`//mqtt/localhost` +`//pubsub/bos-platform-dev/peringknife+debug` +`//pref/bos-platform-dev` + +* Tool Support + +* registrar + * not `pubsub` +* validator + * allows `pubsub` +* sequencer + * +* pull_messages + * `mqtt` + * `pubsub` + + + + diff --git a/docs/tools/readme.md b/docs/tools/readme.md index d7e15fc93c..d3dd6e6fcf 100644 --- a/docs/tools/readme.md +++ b/docs/tools/readme.md @@ -1,5 +1,10 @@ [**UDMI**](../../) / [**Docs**](../) / [Tools](#) +# Common Arguments + +Most tools take a [site model](../specs/site_model.md) as the first argument. +Many tools take a [project spec](project_spec.md) as the second argument. + # Tools - [keygen](keygen.md) - a script to generate an RSA or ES key for single devices diff --git a/validator/src/main/java/com/google/daq/mqtt/registrar/Registrar.java b/validator/src/main/java/com/google/daq/mqtt/registrar/Registrar.java index dfa2147bf8..113a6bfbfe 100644 --- a/validator/src/main/java/com/google/daq/mqtt/registrar/Registrar.java +++ b/validator/src/main/java/com/google/daq/mqtt/registrar/Registrar.java @@ -1,6 +1,7 @@ package com.google.daq.mqtt.registrar; import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Preconditions.checkState; import static com.google.common.collect.Sets.difference; import static com.google.common.collect.Sets.intersection; import static com.google.daq.mqtt.util.ConfigUtil.UDMI_ROOT; @@ -42,7 +43,6 @@ import com.github.fge.jsonschema.main.JsonSchema; import com.github.fge.jsonschema.main.JsonSchemaFactory; import com.google.common.base.Joiner; -import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Sets; @@ -470,6 +470,9 @@ private void initializeCloudProject() { cloudIotManager.getCloudRegion(), cloudIotManager.getRegistryId()); + checkState(cloudIotManager.canUpdateCloud(), + "iot provider not properly initialized, can not update cloud"); + if (cloudIotManager.getUpdateTopic() != null) { updatePusher = new PubSubPusher(projectId, cloudIotManager.getUpdateTopic()); } @@ -1011,7 +1014,7 @@ private Set> getBindings(Set deviceSet, LocalDevic System.err.printf("Binding devices to %s, already bound: %s%n", gatewayId, JOIN_CSV.join(boundDevices)); int total = cloudModels.size() != 0 ? cloudModels.size() : localDevices.size(); - Preconditions.checkState(boundDevices.size() != total, + checkState(boundDevices.size() != total, "all devices including the gateway can't be bound to one gateway!"); return localDevice.getSettings().proxyDevices.stream() .filter(proxyDevice -> deviceSet == null || deviceSet.contains(proxyDevice)) diff --git a/validator/src/main/java/com/google/daq/mqtt/util/CloudIotManager.java b/validator/src/main/java/com/google/daq/mqtt/util/CloudIotManager.java index 5e02e655cb..a37b51bdfe 100644 --- a/validator/src/main/java/com/google/daq/mqtt/util/CloudIotManager.java +++ b/validator/src/main/java/com/google/daq/mqtt/util/CloudIotManager.java @@ -19,6 +19,7 @@ import com.google.common.collect.ImmutableList; import com.google.udmi.util.GeneralUtils; +import com.google.udmi.util.IotProvider; import com.google.udmi.util.MetadataMapKeys; import com.google.udmi.util.SiteModel; import java.io.File; @@ -173,7 +174,9 @@ private static Resource_type gatewayIfTrue(boolean isGateway) { private void initializeIotProvider() { try { iotProvider = makeIotProvider(); - System.err.println("Created service for project " + projectId); + System.err.printf( + "Instantiated iot provider %s as %s%n", executionConfiguration.iot_provider, + ofNullable(iotProvider).map(p -> p.getClass().getSimpleName()).orElse("undefined")); } catch (Exception e) { throw new RuntimeException("While initializing Cloud IoT project " + projectId, e); } @@ -217,7 +220,7 @@ public boolean registerDevice(String deviceId, CloudDeviceSettings settings) { } else { exceptions.capture("updating", () -> updateDevice(deviceId, settings, device)); } - + if (settings.config != null) { exceptions.capture("configuring", () -> writeDeviceConfig(deviceId, settings.config)); } @@ -254,11 +257,11 @@ public CloudModel getRegisteredDevice(String deviceId) { } private void writeDeviceConfig(String deviceId, String config) { - iotProvider.updateConfig(deviceId, SubFolder.UPDATE, config); + getIotProvider().updateConfig(deviceId, SubFolder.UPDATE, config); } public void modifyConfig(String deviceId, SubFolder subFolder, String config) { - iotProvider.updateConfig(deviceId, subFolder, config); + getIotProvider().updateConfig(deviceId, subFolder, config); } /** @@ -268,7 +271,7 @@ public void modifyConfig(String deviceId, SubFolder subFolder, String config) { * @param blocked should this device be blocked? */ public void blockDevice(String deviceId, boolean blocked) { - iotProvider.setBlocked(deviceId, blocked); + getIotProvider().setBlocked(deviceId, blocked); } private CloudModel makeDevice(CloudDeviceSettings settings, CloudModel oldDevice) { @@ -304,14 +307,14 @@ private List getCredentials(CloudDeviceSettings settings) { private void createDevice(String deviceId, CloudDeviceSettings settings) { CloudModel newDevice = makeDevice(settings, null); limitValueSizes(newDevice.metadata); - iotProvider.createResource(deviceId, newDevice); + getIotProvider().createResource(deviceId, newDevice); deviceMap.put(deviceId, newDevice); } private void updateDevice(String deviceId, CloudDeviceSettings settings, CloudModel oldDevice) { CloudModel device = makeDevice(settings, oldDevice); limitValueSizes(device.metadata); - iotProvider.updateDevice(deviceId, device); + getIotProvider().updateDevice(deviceId, device); } /** @@ -320,7 +323,7 @@ private void updateDevice(String deviceId, CloudDeviceSettings settings, CloudMo public void modifyDevice(String deviceId, CloudModel update) { limitValueSizes(update.metadata); update.operation = Operation.MODIFY; - iotProvider.updateDevice(deviceId, update); + getIotProvider().updateDevice(deviceId, update); } private void limitValueSizes(Map metadata) { @@ -330,7 +333,7 @@ private void limitValueSizes(Map metadata) { } public SetupUdmiConfig getVersionInformation() { - return iotProvider.getVersionInformation(); + return getIotProvider().getVersionInformation(); } /** @@ -339,7 +342,7 @@ public SetupUdmiConfig getVersionInformation() { * @return registered device list */ public Map fetchCloudModels() { - return iotProvider.fetchCloudModels(null); + return getIotProvider().fetchCloudModels(null); } /** @@ -348,7 +351,7 @@ public Map fetchCloudModels() { * @return registered device list */ public Set fetchBoundDevices(String gatewayId) { - return ifNotNullGet(iotProvider.fetchCloudModels(gatewayId), Map::keySet); + return ifNotNullGet(getIotProvider().fetchCloudModels(gatewayId), Map::keySet); } public CloudModel fetchDevice(String deviceId) { @@ -356,7 +359,7 @@ public CloudModel fetchDevice(String deviceId) { } private CloudModel fetchDeviceRaw(String deviceId) { - return iotProvider.fetchDevice(deviceId); + return getIotProvider().fetchDevice(deviceId); } /** @@ -405,19 +408,19 @@ public Object getCloudRegion() { } public void bindDevice(String proxyDeviceId, String gatewayDeviceId) { - iotProvider.bindDeviceToGateway(proxyDeviceId, gatewayDeviceId); + getIotProvider().bindDeviceToGateway(proxyDeviceId, gatewayDeviceId); } public List getMockActions() { - return iotProvider.getMockActions(); + return getIotProvider().getMockActions(); } public void shutdown() { - iotProvider.shutdown(); + ifNotNullThen(iotProvider, IotProvider::shutdown); } public void deleteDevice(String deviceId) { - iotProvider.deleteDevice(deviceId); + getIotProvider().deleteDevice(deviceId); deviceMap.remove(deviceId); } @@ -427,8 +430,8 @@ public void deleteDevice(String deviceId) { public String createRegistry(String suffix) { CloudModel settings = new CloudModel(); settings.resource_type = Resource_type.REGISTRY; - settings.credentials = List.of(iotProvider.getCredential()); - iotProvider.createResource(suffix, settings); + settings.credentials = List.of(getIotProvider().getCredential()); + getIotProvider().createResource(suffix, settings); return requireNonNull(settings.num_id, "Missing registry name in reply"); } @@ -445,10 +448,18 @@ public void updateRegistry(SiteMetadata siteMetadata) { registryModel.metadata.put( MetadataMapKeys.UDMI_METADATA, toJsonString(siteMetadata) ); - iotProvider.updateRegistry(registryModel); + getIotProvider().updateRegistry(registryModel); } public String getSiteDir() { return executionConfiguration.site_model; } + + private IotProvider getIotProvider() { + return checkNotNull(iotProvider, "iot provider not properly initialized"); + } + + public boolean canUpdateCloud() { + return iotProvider != null; + } } diff --git a/validator/src/main/java/com/google/daq/mqtt/util/IotMockProvider.java b/validator/src/main/java/com/google/daq/mqtt/util/IotMockProvider.java index 831644e257..050154df04 100644 --- a/validator/src/main/java/com/google/daq/mqtt/util/IotMockProvider.java +++ b/validator/src/main/java/com/google/daq/mqtt/util/IotMockProvider.java @@ -10,6 +10,7 @@ import static com.google.daq.mqtt.util.IotMockProvider.ActionType.UPDATE_REGISTRY_ACTION; import static udmi.schema.CloudModel.Resource_type.GATEWAY; +import com.google.udmi.util.IotProvider; import com.google.udmi.util.SiteModel; import java.util.ArrayList; import java.util.HashMap; diff --git a/validator/src/main/java/com/google/daq/mqtt/util/IotReflectorClient.java b/validator/src/main/java/com/google/daq/mqtt/util/IotReflectorClient.java index 8d9ed3a2e5..2fc46b3287 100644 --- a/validator/src/main/java/com/google/daq/mqtt/util/IotReflectorClient.java +++ b/validator/src/main/java/com/google/daq/mqtt/util/IotReflectorClient.java @@ -22,6 +22,7 @@ import com.google.daq.mqtt.util.MessagePublisher.QuerySpeed; import com.google.daq.mqtt.validator.Validator; import com.google.daq.mqtt.validator.Validator.MessageBundle; +import com.google.udmi.util.IotProvider; import com.google.udmi.util.SiteModel; import java.io.File; import java.io.PrintWriter; diff --git a/validator/src/main/java/com/google/daq/mqtt/util/MessagePublisher.java b/validator/src/main/java/com/google/daq/mqtt/util/MessagePublisher.java index 46776ace21..43e0cc1d27 100644 --- a/validator/src/main/java/com/google/daq/mqtt/util/MessagePublisher.java +++ b/validator/src/main/java/com/google/daq/mqtt/util/MessagePublisher.java @@ -4,7 +4,6 @@ import static java.util.Optional.ofNullable; import static udmi.schema.IotAccess.IotProvider.IMPLICIT; import static udmi.schema.IotAccess.IotProvider.JWT; -import static udmi.schema.IotAccess.IotProvider.PREF; import com.google.bos.iot.core.proxy.MqttPublisher; import com.google.daq.mqtt.validator.Validator.MessageBundle; @@ -13,7 +12,7 @@ import java.util.function.Consumer; import udmi.schema.Credential; import udmi.schema.ExecutionConfiguration; -import udmi.schema.IotAccess; +import udmi.schema.IotAccess.IotProvider; import udmi.schema.SetupUdmiConfig; /** @@ -34,14 +33,16 @@ static String getRegistryId(ExecutionConfiguration config) { */ static MessagePublisher from(ExecutionConfiguration iotConfig, BiConsumer messageHandler, Consumer errorHandler) { - IotAccess.IotProvider iotProvider = ofNullable(iotConfig.iot_provider).orElse(JWT); + IotProvider iotProvider = ofNullable(iotConfig.iot_provider).orElse(JWT); if (iotConfig.reflector_endpoint != null && iotProvider != IMPLICIT) { iotConfig.reflector_endpoint = null; } - if (iotProvider == PREF) { - return PubSubReflector.from(iotConfig, messageHandler, errorHandler); - } - return MqttPublisher.from(iotConfig, messageHandler, errorHandler); + return switch (iotProvider) { + case PREF -> PubSubReflector.from(iotConfig, messageHandler, errorHandler); + case MQTT, JWT, GBOS -> MqttPublisher.from(iotConfig, messageHandler, errorHandler); + case PUBSUB -> PubSubClient.from(iotConfig, messageHandler, errorHandler); + default -> throw new RuntimeException("Unsupported iot provider " + iotProvider); + }; } String publish(String deviceId, String topic, String data); diff --git a/validator/src/main/java/com/google/daq/mqtt/util/PubSubClient.java b/validator/src/main/java/com/google/daq/mqtt/util/PubSubClient.java index 2d141dd109..015c82297d 100644 --- a/validator/src/main/java/com/google/daq/mqtt/util/PubSubClient.java +++ b/validator/src/main/java/com/google/daq/mqtt/util/PubSubClient.java @@ -1,12 +1,19 @@ package com.google.daq.mqtt.util; import static com.google.api.client.util.Preconditions.checkNotNull; +import static com.google.udmi.util.Common.NAMESPACE_SEPARATOR; import static com.google.udmi.util.Common.PUBLISH_TIME_KEY; -import static com.google.udmi.util.GeneralUtils.encodeBase64; +import static com.google.udmi.util.Common.SOURCE_SEPARATOR; import static com.google.udmi.util.GeneralUtils.getTimestamp; +import static com.google.udmi.util.GeneralUtils.ifNotNullGet; +import static com.google.udmi.util.GeneralUtils.ifNotNullThrow; import static com.google.udmi.util.JsonUtil.isoConvert; import static com.google.udmi.util.JsonUtil.stringify; +import static com.google.udmi.util.JsonUtil.toStringMap; +import static com.google.udmi.util.PubSubReflector.USER_NAME_DEFAULT; +import static java.lang.String.format; import static java.time.Instant.ofEpochSecond; +import static java.util.Optional.ofNullable; import com.fasterxml.jackson.annotation.JsonInclude.Include; import com.fasterxml.jackson.core.JsonProcessingException; @@ -16,6 +23,7 @@ import com.google.api.core.ApiFuture; import com.google.api.gax.rpc.NotFoundException; import com.google.bos.iot.core.proxy.IotReflectorClient; +import com.google.cloud.Tuple; import com.google.cloud.pubsub.v1.AckReplyConsumer; import com.google.cloud.pubsub.v1.MessageReceiver; import com.google.cloud.pubsub.v1.Publisher; @@ -33,6 +41,7 @@ import com.google.pubsub.v1.PubsubMessage; import com.google.pubsub.v1.SeekRequest; import com.google.udmi.util.Common; +import com.google.udmi.util.GeneralUtils; import com.google.udmi.util.JsonUtil; import java.util.AbstractMap.SimpleEntry; import java.util.Arrays; @@ -44,9 +53,11 @@ import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.BiConsumer; +import java.util.function.Consumer; import udmi.schema.Envelope; import udmi.schema.Envelope.SubFolder; import udmi.schema.Envelope.SubType; +import udmi.schema.ExecutionConfiguration; import udmi.schema.SetupUdmiConfig; import udmi.schema.SystemState; @@ -64,6 +75,7 @@ public class PubSubClient implements MessagePublisher, MessageHandler { private static final long SUBSCRIPTION_RACE_DELAY_MS = 10000; private static final String WAS_BASE_64 = "wasBase64"; + public static final String SUBSCRIPTION_ROOT = "udmi_target"; private final AtomicBoolean active = new AtomicBoolean(); private final BlockingQueue messages = new LinkedBlockingDeque<>(); @@ -78,6 +90,7 @@ public class PubSubClient implements MessagePublisher, MessageHandler { private final Map> handlers = new HashMap<>(); private final BiMap> typeClasses = HashBiMap.create(); private final Map, SimpleEntry> classTypes = new HashMap<>(); + private final boolean useReflector; /** * Create a simple proxy instance. @@ -86,7 +99,7 @@ public class PubSubClient implements MessagePublisher, MessageHandler { * @param subscription target subscription name */ public PubSubClient(String projectId, String subscription) { - this(projectId, null, subscription, null); + this(projectId, null, subscription, null, false, false); } /** @@ -96,10 +109,11 @@ public PubSubClient(String projectId, String subscription) { * @param registryId target registry id * @param subscription target subscription name * @param updateTopic output PubSub topic for updates (else null) + * @param reflect if output messages should be encapsulated */ public PubSubClient(String projectId, String registryId, String subscription, - String updateTopic) { - this(projectId, registryId, subscription, updateTopic, true); + String updateTopic, boolean reflect) { + this(projectId, registryId, subscription, updateTopic, reflect, true); } /** @@ -109,13 +123,15 @@ public PubSubClient(String projectId, String registryId, String subscription, * @param registryId target registry id * @param subscription target subscription name * @param updateTopic output PubSub topic for updates (else null) + * @param reflect if output messages should be encapsulated * @param reset if the connection should be reset before use */ public PubSubClient(String projectId, String registryId, String subscription, String updateTopic, - boolean reset) { + boolean reflect, boolean reset) { try { this.projectId = checkNotNull(projectId, "project id not defined"); this.registryId = registryId; + this.useReflector = reflect; ProjectSubscriptionName subscriptionName = ProjectSubscriptionName.of(projectId, subscription); this.flushSubscription = reset; @@ -137,10 +153,33 @@ public PubSubClient(String projectId, String registryId, String subscription, St active.set(true); } catch (Exception e) { - throw new RuntimeException(String.format(CONNECT_ERROR_FORMAT, projectId), e); + throw new RuntimeException(format(CONNECT_ERROR_FORMAT, projectId), e); } } + /** + * Factory method for a client from a configuration. + */ + public static MessagePublisher from(ExecutionConfiguration iotConfig, + BiConsumer messageHandler, Consumer errorHandler) { + ifNotNullThrow(messageHandler, "message handler should be null"); + ifNotNullThrow(errorHandler, "error handler should be null"); + Tuple t = getFeedInfo(iotConfig); + return new PubSubClient(iotConfig.project_id, iotConfig.registry_id, t.x(), t.y(), false); + } + + /** + * Get the information for pubsub subscription and topic, extracted from the configuration. + */ + public static Tuple getFeedInfo(ExecutionConfiguration iotConfig) { + String namespace = ofNullable(iotConfig.udmi_namespace).map(p -> p + NAMESPACE_SEPARATOR) + .orElse(""); + String topic = namespace + SUBSCRIPTION_ROOT; + String userName = SOURCE_SEPARATOR + ofNullable(iotConfig.user_name).orElse(USER_NAME_DEFAULT); + String subscription = topic + userName; + return Tuple.of(subscription, topic); + } + private void initializeHandlerTypes() { Arrays.stream(SubType.values()).forEach(type -> Arrays.stream(SubFolder.values()) .forEach(folder -> registerHandlerType(type, folder))); @@ -289,6 +328,31 @@ private String getMapKey(SubType subType, SubFolder subFolder) { @Override public String publish(String deviceId, String topic, String data) { + return useReflector ? publishReflector(deviceId, topic, data) + : publishDirect(deviceId, topic, data); + } + + private String publishDirect(String deviceId, String topic, String data) { + try { + if (deviceId == null) { + System.err.printf("Refusing to publish to %s due to unspecified device%n", topic); + return null; + } + Envelope envelopedData = makeReflectorMessage(deviceId, topic, null); + PubsubMessage message = PubsubMessage.newBuilder() + .setData(ByteString.copyFromUtf8(data)) + .putAllAttributes(toStringMap(envelopedData)) + .build(); + ApiFuture publish = publisher.publish(message); + publish.get(); // Wait for publish to complete. + System.err.printf("Published to %s/%s/%s%n", registryId, deviceId, topic); + return null; + } catch (Exception e) { + throw new RuntimeException("While publishing direct message", e); + } + } + + private String publishReflector(String deviceId, String topic, String data) { try { if (deviceId == null) { System.err.printf("Refusing to publish to %s due to unspecified device%n", topic); @@ -308,7 +372,7 @@ public String publish(String deviceId, String topic, String data) { System.err.printf("Published to %s/%s/%s%n", registryId, deviceId, topic); return null; } catch (Exception e) { - throw new RuntimeException("While publishing message", e); + throw new RuntimeException("While publishing reflector message", e); } } @@ -316,7 +380,7 @@ private Envelope makeReflectorMessage(String deviceId, String topic, String data Envelope envelope = new Envelope(); envelope.deviceRegistryId = checkNotNull(registryId, "registry id not defined"); envelope.deviceId = deviceId; - envelope.payload = encodeBase64(data); + envelope.payload = ifNotNullGet(data, GeneralUtils::encodeBase64); String[] parts = topic.split("/"); envelope.subFolder = SubFolder.fromValue(parts[0]); envelope.subType = SubType.fromValue(parts[1]); @@ -346,7 +410,7 @@ private void resetSubscription(ProjectSubscriptionName subscriptionName) { throw new RuntimeException("Missing subscription for " + subscriptionName); } catch (Exception e) { throw new RuntimeException( - String.format(SUBSCRIPTION_ERROR_FORMAT, subscriptionName), e); + format(SUBSCRIPTION_ERROR_FORMAT, subscriptionName), e); } } diff --git a/validator/src/main/java/com/google/daq/mqtt/validator/Validator.java b/validator/src/main/java/com/google/daq/mqtt/validator/Validator.java index 60cef515f2..da5da9c732 100644 --- a/validator/src/main/java/com/google/daq/mqtt/validator/Validator.java +++ b/validator/src/main/java/com/google/daq/mqtt/validator/Validator.java @@ -5,9 +5,11 @@ import static com.google.common.collect.ImmutableList.toImmutableList; import static com.google.daq.mqtt.registrar.Registrar.BASE_DIR; import static com.google.daq.mqtt.sequencer.SequenceBase.EMPTY_MESSAGE; +import static com.google.daq.mqtt.util.ConfigUtil.UDMI_ROOT; import static com.google.daq.mqtt.util.ConfigUtil.UDMI_TOOLS; import static com.google.daq.mqtt.util.ConfigUtil.UDMI_VERSION; import static com.google.daq.mqtt.util.ConfigUtil.readExeConfig; +import static com.google.daq.mqtt.util.PubSubClient.getFeedInfo; import static com.google.daq.mqtt.validator.ReportingDevice.typeFolderPairKey; import static com.google.udmi.util.Common.ERROR_KEY; import static com.google.udmi.util.Common.EXCEPTION_KEY; @@ -36,6 +38,7 @@ import static com.google.udmi.util.SiteModel.DEVICES_DIR; import static java.lang.String.format; import static java.util.Optional.ofNullable; +import static udmi.schema.IotAccess.IotProvider.PUBSUB; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.JsonNode; @@ -49,6 +52,7 @@ import com.github.fge.jsonschema.main.JsonSchemaFactory; import com.google.bos.iot.core.proxy.IotReflectorClient; import com.google.bos.iot.core.proxy.NullPublisher; +import com.google.cloud.Tuple; import com.google.common.base.Preconditions; import com.google.common.base.Strings; import com.google.common.collect.ImmutableList; @@ -231,7 +235,11 @@ Validator processArgs(List argListRaw) { void execute() { if (!Strings.isNullOrEmpty(config.feed_name)) { - validatePubSub(config.feed_name); + validatePubSub(config.feed_name, true); + } + if (config.iot_provider == PUBSUB) { + Tuple tuple = getFeedInfo(config); + validatePubSub(format("%s/%s", tuple.x(), tuple.y()), false); } if (client == null) { validateReflector(); @@ -308,7 +316,7 @@ private List postProcessArgs(List argList) { case "-p" -> setProjectId(removeNextArg(argList)); case "-s" -> setSiteDir(removeNextArg(argList)); case "-a" -> setSchemaSpec(removeNextArg(argList)); - case "-t" -> validatePubSub(removeNextArg(argList)); + case "-t" -> validatePubSub(removeNextArg(argList), true); case "-f" -> validateFilesOutput(removeNextArg(argList)); case "-u" -> forceUpgrade = true; case "-r" -> validateMessageTrace(removeNextArg(argList)); @@ -327,7 +335,7 @@ private List postProcessArgs(List argList) { return argList; } finally { if (schemaMap == null) { - setSchemaSpec("schema"); + setSchemaSpec(new File(UDMI_ROOT, "schema").getAbsolutePath()); } } } @@ -342,15 +350,15 @@ private void setProjectId(String projectId) { config.project_id = parts[1]; } - private void validatePubSub(String pubSubCombo) { + private void validatePubSub(String pubSubCombo, boolean reflect) { String[] parts = pubSubCombo.split("/"); Preconditions.checkArgument(parts.length <= 2, "Too many parts in pubsub path " + pubSubCombo); - String instName = parts[0]; + String subscriptionId = parts[0]; CloudIotManager cloudIotManager = new CloudIotManager(config.project_id, - new File(config.site_model), null, config.registry_suffix, IotProvider.PUBSUB); + new File(config.site_model), null, config.registry_suffix, PUBSUB); String registryId = getRegistryId(); String updateTopic = parts.length > 1 ? parts[1] : cloudIotManager.getUpdateTopic(); - client = new PubSubClient(config.project_id, registryId, instName, updateTopic); + client = new PubSubClient(config.project_id, registryId, subscriptionId, updateTopic, reflect); if (updateTopic == null) { outputLogger.warn("Not sending to update topic because PubSub update_topic not defined"); } else { @@ -850,8 +858,8 @@ private void writeMessageCapture(Object message, Map attributes) private boolean shouldProcessMessage(Map attributes) { String registryId = attributes.get(DEVICE_REGISTRY_ID_KEY); - if (!registryId.equals(getRegistryId())) { - if (ignoredRegistries.add(registryId)) { + if (registryId == null || !registryId.equals(getRegistryId())) { + if (registryId != null && ignoredRegistries.add(registryId)) { outputLogger.warn("Ignoring data for not-configured registry " + registryId); } return false; diff --git a/validator/src/main/java/com/google/daq/mqtt/util/IotProvider.java b/validator/src/main/java/com/google/udmi/util/IotProvider.java similarity index 98% rename from validator/src/main/java/com/google/daq/mqtt/util/IotProvider.java rename to validator/src/main/java/com/google/udmi/util/IotProvider.java index 2b1246b470..b64debe87f 100644 --- a/validator/src/main/java/com/google/daq/mqtt/util/IotProvider.java +++ b/validator/src/main/java/com/google/udmi/util/IotProvider.java @@ -1,4 +1,4 @@ -package com.google.daq.mqtt.util; +package com.google.udmi.util; import java.util.List; import java.util.Map;