diff --git a/californium-core/src/main/java/org/eclipse/californium/core/network/stack/BaseCoapStack.java b/californium-core/src/main/java/org/eclipse/californium/core/network/stack/BaseCoapStack.java index a67ade402e..fbac13de4e 100644 --- a/californium-core/src/main/java/org/eclipse/californium/core/network/stack/BaseCoapStack.java +++ b/californium-core/src/main/java/org/eclipse/californium/core/network/stack/BaseCoapStack.java @@ -40,6 +40,7 @@ import org.eclipse.californium.core.network.ExchangeCompleteException; import org.eclipse.californium.core.network.Outbox; import org.eclipse.californium.core.network.stack.Layer.TopDownBuilder; +import org.eclipse.californium.core.observe.ObservationStoreException; import org.eclipse.californium.core.server.MessageDeliverer; /** @@ -84,6 +85,9 @@ public void sendRequest(final Exchange exchange, final Request request) { // delegate to top try { top.sendRequest(exchange, request); + } catch (ObservationStoreException ex) { + LOGGER.debug("error send request {} - {}", request, ex.getMessage()); + request.setSendError(ex); } catch (RuntimeException ex) { LOGGER.warn("error send request {}", request, ex); request.setSendError(ex); diff --git a/californium-core/src/main/java/org/eclipse/californium/core/observe/ObservationStore.java b/californium-core/src/main/java/org/eclipse/californium/core/observe/ObservationStore.java index 2b728a919a..d886d0868f 100644 --- a/californium-core/src/main/java/org/eclipse/californium/core/observe/ObservationStore.java +++ b/californium-core/src/main/java/org/eclipse/californium/core/observe/ObservationStore.java @@ -43,6 +43,7 @@ public interface ObservationStore { * @return the previous value associated with the specified key, or * {@code null} if there was no mapping for the key. * @throws NullPointerException if token or observation is {@code null}. + * @throws ObservationStoreException if observation isn't stored. */ Observation putIfAbsent(Token token, Observation obs); @@ -56,6 +57,7 @@ public interface ObservationStore { * @return the previous value associated with the specified key, or * {@code null} if there was no mapping for the key. * @throws NullPointerException if token or observation is {@code null}. + * @throws ObservationStoreException if observation isn't stored. */ Observation put(Token token, Observation obs); diff --git a/californium-core/src/main/java/org/eclipse/californium/core/observe/ObservationStoreException.java b/californium-core/src/main/java/org/eclipse/californium/core/observe/ObservationStoreException.java new file mode 100644 index 0000000000..48b9ffbceb --- /dev/null +++ b/californium-core/src/main/java/org/eclipse/californium/core/observe/ObservationStoreException.java @@ -0,0 +1,35 @@ +/******************************************************************************* + * Copyright (c) 2020 Bosch IO GmbH and others. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v2.0 + * and Eclipse Distribution License v1.0 which accompany this distribution. + * + * The Eclipse Public License is available at + * http://www.eclipse.org/legal/epl-v20.html + * and the Eclipse Distribution License is available at + * http://www.eclipse.org/org/documents/edl-v10.html. + * + * Contributors: + * Bosch IO GmbH - initial implementation + ******************************************************************************/ +package org.eclipse.californium.core.observe; + +/** + * Exception indicating, that a observation could not be stored. + * + * @see ObservationStore + */ +public class ObservationStoreException extends RuntimeException { + + private static final long serialVersionUID = 1L; + + /** + * Create new instance with message. + * + * @param message message + */ + public ObservationStoreException(String message) { + super(message); + } +} diff --git a/californium-core/src/test/java/org/eclipse/californium/core/test/ObserveTest.java b/californium-core/src/test/java/org/eclipse/californium/core/test/ObserveTest.java index cf02049ee0..e1f6aae786 100644 --- a/californium-core/src/test/java/org/eclipse/californium/core/test/ObserveTest.java +++ b/californium-core/src/test/java/org/eclipse/californium/core/test/ObserveTest.java @@ -31,12 +31,17 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import org.eclipse.californium.TestTools; import org.eclipse.californium.category.Medium; @@ -49,11 +54,17 @@ import org.eclipse.californium.core.coap.EmptyMessage; import org.eclipse.californium.core.coap.Request; import org.eclipse.californium.core.coap.Response; +import org.eclipse.californium.core.coap.Token; import org.eclipse.californium.core.network.CoapEndpoint; import org.eclipse.californium.core.network.EndpointManager; import org.eclipse.californium.core.network.config.NetworkConfig; import org.eclipse.californium.core.network.interceptors.MessageInterceptorAdapter; +import org.eclipse.californium.core.observe.Observation; +import org.eclipse.californium.core.observe.ObservationStore; +import org.eclipse.californium.core.observe.ObservationUtil; +import org.eclipse.californium.core.observe.ObservationStoreException; import org.eclipse.californium.core.server.resources.CoapExchange; +import org.eclipse.californium.elements.EndpointContext; import org.eclipse.californium.elements.rule.TestNameLoggerRule; import org.eclipse.californium.rule.CoapNetworkRule; import org.eclipse.californium.rule.CoapThreadsRule; @@ -117,6 +128,7 @@ public class ObserveTest { private CoapEndpoint serverEndpoint; private MyResource resourceX; private MyResource resourceY; + private MyObservationStore observations; private ClientMessageInterceptor interceptor; private final CountDownLatch waitforit = new CountDownLatch(1); @@ -185,6 +197,17 @@ public void testObserveLifecycle() throws Exception { // check that relations to resource X AND Y have been canceled assertTrue(resourceX.getObserverCount() == 0); assertTrue(resourceY.getObserverCount() == 0); + + observations.setStoreException(new ObservationStoreException("test")); + + Request requestC = Request.newGet(); + requestC.setURI(uriY); + requestC.setObserve(); + requestC.send(); + + Response responseC = requestC.waitForResponse(1000); + assertNull("Client received unexpected response", responseC); + assertNotNull("send error expected", requestC.getSendError()); } @Test @@ -384,6 +407,7 @@ private CoapServer createServer() { CoapEndpoint.Builder builder = new CoapEndpoint.Builder(); builder.setInetSocketAddress(TestTools.LOCALHOST_EPHEMERAL); builder.setNetworkConfig(config); + serverEndpoint = builder.build(); CoapServer server = new CoapServer(config); @@ -396,6 +420,17 @@ private CoapServer createServer() { uriX = TestTools.getUri(serverEndpoint, TARGET_X); uriY = TestTools.getUri(serverEndpoint, TARGET_Y); + + observations = new MyObservationStore(config); + + // setup the client endpoint using the special observation store + builder = new CoapEndpoint.Builder(); + builder.setInetSocketAddress(TestTools.LOCALHOST_EPHEMERAL); + builder.setNetworkConfig(config); + builder.setObservationStore(observations); + CoapEndpoint coapEndpoint = builder.build(); + EndpointManager.getEndpointManager().setDefaultEndpoint(coapEndpoint); + return server; } @@ -536,4 +571,112 @@ public void prepareResponse() { } } + + /** + * An observation store that keeps all observations in-memory. + */ + private static class MyObservationStore implements ObservationStore { + + private final ConcurrentMap map = new ConcurrentHashMap<>(); + private final NetworkConfig config; + private volatile AtomicReference exception = new AtomicReference(); + + public MyObservationStore(NetworkConfig config) { + this.config = config; + } + + public void setStoreException(ObservationStoreException exception) { + this.exception.set(exception); + } + + @Override + public void setExecutor(ScheduledExecutorService executor) { + } + + @Override + public Observation putIfAbsent(Token key, Observation obs) { + if (key == null) { + throw new NullPointerException("token must not be null"); + } else if (obs == null) { + throw new NullPointerException("observation must not be null"); + } else { + ObservationStoreException exception = this.exception.getAndSet(null); + if (exception != null) { + throw exception; + } + return map.putIfAbsent(key, obs); + } + } + + @Override + public Observation put(Token key, Observation obs) { + if (key == null) { + throw new NullPointerException("token must not be null"); + } else if (obs == null) { + throw new NullPointerException("observation must not be null"); + } else { + ObservationStoreException exception = this.exception.getAndSet(null); + if (exception != null) { + throw exception; + } + return map.put(key, obs); + } + } + + @Override + public Observation get(Token token) { + if (token == null) { + return null; + } else { + Observation obs = map.get(token); + // clone request in order to prevent accumulation of + // message observers on original request + return ObservationUtil.shallowClone(obs); + } + } + + @Override + public void remove(Token token) { + if (token != null) { + map.remove(token); + } + } + + /** + * Gets the number of observations currently held in this store. + * + * @return The number of observations. + */ + public int getSize() { + return map.size(); + } + + /** + * Removes all observations from this store. + */ + public void clear() { + map.clear(); + } + + @Override + public void setContext(Token token, final EndpointContext ctx) { + + if (token != null && ctx != null) { + Observation obs = map.get(token); + if (obs != null) { + map.replace(token, obs, new Observation(obs.getRequest(), ctx)); + } + } + } + + @Override + public void start() { + } + + @Override + public void stop() { + } + } + + }