Skip to content

Commit

Permalink
Make validator work with //pubsub/ project spec (#977)
Browse files Browse the repository at this point in the history
  • Loading branch information
grafnu authored Oct 4, 2024
1 parent a49d383 commit 0a51d05
Show file tree
Hide file tree
Showing 12 changed files with 190 additions and 49 deletions.
2 changes: 2 additions & 0 deletions .wordlist.txt
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ md
modbus
MQTT
mydomain
namespace
niversal
npm
nterface
Expand All @@ -118,6 +119,7 @@ proxying
PUBACKs
pubber
PubSub
pubsub
QoS
Readme
repo
Expand Down
4 changes: 2 additions & 2 deletions common/src/main/java/com/google/udmi/util/Common.java
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public abstract class Common {
public static final String SOURCE_SEPARATOR = "+";
public static final String SOURCE_SEPARATOR_REGEX = "\\" + SOURCE_SEPARATOR;

private static final String PREFIX_SEPARATOR = "~";
public static final String NAMESPACE_SEPARATOR = "~";
private static final String UDMI_VERSION_ENV = "UDMI_TOOLS";
public static final int EXIT_CODE_ERROR = 1;

Expand Down Expand Up @@ -179,6 +179,6 @@ private static String getExceptionMessage(Throwable exception, Class<?> containe
}

public static String getNamespacePrefix(String udmiNamespace) {
return Strings.isNullOrEmpty(udmiNamespace) ? "" : udmiNamespace + PREFIX_SEPARATOR;
return Strings.isNullOrEmpty(udmiNamespace) ? "" : udmiNamespace + NAMESPACE_SEPARATOR;
}
}
45 changes: 45 additions & 0 deletions docs/tools/project_spec.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
[**UDMI**](../../) / [**Docs**](../) / [**Tools**](./) / [Project Spec](#)

# Project Specification

`//provider/project[/namespace][+user]`

* `provider`
* `gbos`
* `pref`
* `mqtt`
* `pubsub`
* `project`
* GCP project
* IoT Core project
* `namespace`
* default
* k8s namespace
* pubsub namespace
* `user`
* canonical users
* individual users

* Examples

`//gbos/bos-platform-dev`
`//gbos/bos-platform-dev/peringknife`
`//mqtt/localhost`
`//pubsub/bos-platform-dev/peringknife+debug`
`//pref/bos-platform-dev`

* Tool Support

* registrar
* not `pubsub`
* validator
* allows `pubsub`
* sequencer
*
* pull_messages
* `mqtt`
* `pubsub`




5 changes: 5 additions & 0 deletions docs/tools/readme.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
[**UDMI**](../../) / [**Docs**](../) / [Tools](#)

# Common Arguments

Most tools take a [site model](../specs/site_model.md) as the first argument.
Many tools take a [project spec](project_spec.md) as the second argument.

# Tools

- [keygen](keygen.md) - a script to generate an RSA or ES key for single devices
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.google.daq.mqtt.registrar;

import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.collect.Sets.difference;
import static com.google.common.collect.Sets.intersection;
import static com.google.daq.mqtt.util.ConfigUtil.UDMI_ROOT;
Expand Down Expand Up @@ -42,7 +43,6 @@
import com.github.fge.jsonschema.main.JsonSchema;
import com.github.fge.jsonschema.main.JsonSchemaFactory;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
Expand Down Expand Up @@ -470,6 +470,9 @@ private void initializeCloudProject() {
cloudIotManager.getCloudRegion(),
cloudIotManager.getRegistryId());

checkState(cloudIotManager.canUpdateCloud(),
"iot provider not properly initialized, can not update cloud");

if (cloudIotManager.getUpdateTopic() != null) {
updatePusher = new PubSubPusher(projectId, cloudIotManager.getUpdateTopic());
}
Expand Down Expand Up @@ -1011,7 +1014,7 @@ private Set<Entry<String, String>> getBindings(Set<String> deviceSet, LocalDevic
System.err.printf("Binding devices to %s, already bound: %s%n",
gatewayId, JOIN_CSV.join(boundDevices));
int total = cloudModels.size() != 0 ? cloudModels.size() : localDevices.size();
Preconditions.checkState(boundDevices.size() != total,
checkState(boundDevices.size() != total,
"all devices including the gateway can't be bound to one gateway!");
return localDevice.getSettings().proxyDevices.stream()
.filter(proxyDevice -> deviceSet == null || deviceSet.contains(proxyDevice))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import com.google.common.collect.ImmutableList;
import com.google.udmi.util.GeneralUtils;
import com.google.udmi.util.IotProvider;
import com.google.udmi.util.MetadataMapKeys;
import com.google.udmi.util.SiteModel;
import java.io.File;
Expand Down Expand Up @@ -173,7 +174,9 @@ private static Resource_type gatewayIfTrue(boolean isGateway) {
private void initializeIotProvider() {
try {
iotProvider = makeIotProvider();
System.err.println("Created service for project " + projectId);
System.err.printf(
"Instantiated iot provider %s as %s%n", executionConfiguration.iot_provider,
ofNullable(iotProvider).map(p -> p.getClass().getSimpleName()).orElse("undefined"));
} catch (Exception e) {
throw new RuntimeException("While initializing Cloud IoT project " + projectId, e);
}
Expand Down Expand Up @@ -217,7 +220,7 @@ public boolean registerDevice(String deviceId, CloudDeviceSettings settings) {
} else {
exceptions.capture("updating", () -> updateDevice(deviceId, settings, device));
}

if (settings.config != null) {
exceptions.capture("configuring", () -> writeDeviceConfig(deviceId, settings.config));
}
Expand Down Expand Up @@ -254,11 +257,11 @@ public CloudModel getRegisteredDevice(String deviceId) {
}

private void writeDeviceConfig(String deviceId, String config) {
iotProvider.updateConfig(deviceId, SubFolder.UPDATE, config);
getIotProvider().updateConfig(deviceId, SubFolder.UPDATE, config);
}

public void modifyConfig(String deviceId, SubFolder subFolder, String config) {
iotProvider.updateConfig(deviceId, subFolder, config);
getIotProvider().updateConfig(deviceId, subFolder, config);
}

/**
Expand All @@ -268,7 +271,7 @@ public void modifyConfig(String deviceId, SubFolder subFolder, String config) {
* @param blocked should this device be blocked?
*/
public void blockDevice(String deviceId, boolean blocked) {
iotProvider.setBlocked(deviceId, blocked);
getIotProvider().setBlocked(deviceId, blocked);
}

private CloudModel makeDevice(CloudDeviceSettings settings, CloudModel oldDevice) {
Expand Down Expand Up @@ -304,14 +307,14 @@ private List<Credential> getCredentials(CloudDeviceSettings settings) {
private void createDevice(String deviceId, CloudDeviceSettings settings) {
CloudModel newDevice = makeDevice(settings, null);
limitValueSizes(newDevice.metadata);
iotProvider.createResource(deviceId, newDevice);
getIotProvider().createResource(deviceId, newDevice);
deviceMap.put(deviceId, newDevice);
}

private void updateDevice(String deviceId, CloudDeviceSettings settings, CloudModel oldDevice) {
CloudModel device = makeDevice(settings, oldDevice);
limitValueSizes(device.metadata);
iotProvider.updateDevice(deviceId, device);
getIotProvider().updateDevice(deviceId, device);
}

/**
Expand All @@ -320,7 +323,7 @@ private void updateDevice(String deviceId, CloudDeviceSettings settings, CloudMo
public void modifyDevice(String deviceId, CloudModel update) {
limitValueSizes(update.metadata);
update.operation = Operation.MODIFY;
iotProvider.updateDevice(deviceId, update);
getIotProvider().updateDevice(deviceId, update);
}

private void limitValueSizes(Map<String, String> metadata) {
Expand All @@ -330,7 +333,7 @@ private void limitValueSizes(Map<String, String> metadata) {
}

public SetupUdmiConfig getVersionInformation() {
return iotProvider.getVersionInformation();
return getIotProvider().getVersionInformation();
}

/**
Expand All @@ -339,7 +342,7 @@ public SetupUdmiConfig getVersionInformation() {
* @return registered device list
*/
public Map<String, CloudModel> fetchCloudModels() {
return iotProvider.fetchCloudModels(null);
return getIotProvider().fetchCloudModels(null);
}

/**
Expand All @@ -348,15 +351,15 @@ public Map<String, CloudModel> fetchCloudModels() {
* @return registered device list
*/
public Set<String> fetchBoundDevices(String gatewayId) {
return ifNotNullGet(iotProvider.fetchCloudModels(gatewayId), Map::keySet);
return ifNotNullGet(getIotProvider().fetchCloudModels(gatewayId), Map::keySet);
}

public CloudModel fetchDevice(String deviceId) {
return deviceMap.computeIfAbsent(deviceId, this::fetchDeviceRaw);
}

private CloudModel fetchDeviceRaw(String deviceId) {
return iotProvider.fetchDevice(deviceId);
return getIotProvider().fetchDevice(deviceId);
}

/**
Expand Down Expand Up @@ -405,19 +408,19 @@ public Object getCloudRegion() {
}

public void bindDevice(String proxyDeviceId, String gatewayDeviceId) {
iotProvider.bindDeviceToGateway(proxyDeviceId, gatewayDeviceId);
getIotProvider().bindDeviceToGateway(proxyDeviceId, gatewayDeviceId);
}

public List<Object> getMockActions() {
return iotProvider.getMockActions();
return getIotProvider().getMockActions();
}

public void shutdown() {
iotProvider.shutdown();
ifNotNullThen(iotProvider, IotProvider::shutdown);
}

public void deleteDevice(String deviceId) {
iotProvider.deleteDevice(deviceId);
getIotProvider().deleteDevice(deviceId);
deviceMap.remove(deviceId);
}

Expand All @@ -427,8 +430,8 @@ public void deleteDevice(String deviceId) {
public String createRegistry(String suffix) {
CloudModel settings = new CloudModel();
settings.resource_type = Resource_type.REGISTRY;
settings.credentials = List.of(iotProvider.getCredential());
iotProvider.createResource(suffix, settings);
settings.credentials = List.of(getIotProvider().getCredential());
getIotProvider().createResource(suffix, settings);
return requireNonNull(settings.num_id, "Missing registry name in reply");
}

Expand All @@ -445,10 +448,18 @@ public void updateRegistry(SiteMetadata siteMetadata) {
registryModel.metadata.put(
MetadataMapKeys.UDMI_METADATA, toJsonString(siteMetadata)
);
iotProvider.updateRegistry(registryModel);
getIotProvider().updateRegistry(registryModel);
}

public String getSiteDir() {
return executionConfiguration.site_model;
}

private IotProvider getIotProvider() {
return checkNotNull(iotProvider, "iot provider not properly initialized");
}

public boolean canUpdateCloud() {
return iotProvider != null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import static com.google.daq.mqtt.util.IotMockProvider.ActionType.UPDATE_REGISTRY_ACTION;
import static udmi.schema.CloudModel.Resource_type.GATEWAY;

import com.google.udmi.util.IotProvider;
import com.google.udmi.util.SiteModel;
import java.util.ArrayList;
import java.util.HashMap;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.google.daq.mqtt.util.MessagePublisher.QuerySpeed;
import com.google.daq.mqtt.validator.Validator;
import com.google.daq.mqtt.validator.Validator.MessageBundle;
import com.google.udmi.util.IotProvider;
import com.google.udmi.util.SiteModel;
import java.io.File;
import java.io.PrintWriter;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import static java.util.Optional.ofNullable;
import static udmi.schema.IotAccess.IotProvider.IMPLICIT;
import static udmi.schema.IotAccess.IotProvider.JWT;
import static udmi.schema.IotAccess.IotProvider.PREF;

import com.google.bos.iot.core.proxy.MqttPublisher;
import com.google.daq.mqtt.validator.Validator.MessageBundle;
Expand All @@ -13,7 +12,7 @@
import java.util.function.Consumer;
import udmi.schema.Credential;
import udmi.schema.ExecutionConfiguration;
import udmi.schema.IotAccess;
import udmi.schema.IotAccess.IotProvider;
import udmi.schema.SetupUdmiConfig;

/**
Expand All @@ -34,14 +33,16 @@ static String getRegistryId(ExecutionConfiguration config) {
*/
static MessagePublisher from(ExecutionConfiguration iotConfig,
BiConsumer<String, String> messageHandler, Consumer<Throwable> errorHandler) {
IotAccess.IotProvider iotProvider = ofNullable(iotConfig.iot_provider).orElse(JWT);
IotProvider iotProvider = ofNullable(iotConfig.iot_provider).orElse(JWT);
if (iotConfig.reflector_endpoint != null && iotProvider != IMPLICIT) {
iotConfig.reflector_endpoint = null;
}
if (iotProvider == PREF) {
return PubSubReflector.from(iotConfig, messageHandler, errorHandler);
}
return MqttPublisher.from(iotConfig, messageHandler, errorHandler);
return switch (iotProvider) {
case PREF -> PubSubReflector.from(iotConfig, messageHandler, errorHandler);
case MQTT, JWT, GBOS -> MqttPublisher.from(iotConfig, messageHandler, errorHandler);
case PUBSUB -> PubSubClient.from(iotConfig, messageHandler, errorHandler);
default -> throw new RuntimeException("Unsupported iot provider " + iotProvider);
};
}

String publish(String deviceId, String topic, String data);
Expand Down
Loading

0 comments on commit 0a51d05

Please sign in to comment.