Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce store exception. #1207

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.eclipse.californium.core.network.ExchangeCompleteException;
import org.eclipse.californium.core.network.Outbox;
import org.eclipse.californium.core.network.stack.Layer.TopDownBuilder;
import org.eclipse.californium.core.observe.ObservationStoreException;
import org.eclipse.californium.core.server.MessageDeliverer;

/**
Expand Down Expand Up @@ -84,6 +85,9 @@ public void sendRequest(final Exchange exchange, final Request request) {
// delegate to top
try {
top.sendRequest(exchange, request);
} catch (ObservationStoreException ex) {
LOGGER.debug("error send request {} - {}", request, ex.getMessage());
request.setSendError(ex);
} catch (RuntimeException ex) {
LOGGER.warn("error send request {}", request, ex);
request.setSendError(ex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public interface ObservationStore {
* @return the previous value associated with the specified key, or
* {@code null} if there was no mapping for the key.
* @throws NullPointerException if token or observation is {@code null}.
* @throws ObservationStoreException if observation isn't stored.
*/
Observation putIfAbsent(Token token, Observation obs);

Expand All @@ -56,6 +57,7 @@ public interface ObservationStore {
* @return the previous value associated with the specified key, or
* {@code null} if there was no mapping for the key.
* @throws NullPointerException if token or observation is {@code null}.
* @throws ObservationStoreException if observation isn't stored.
*/
Observation put(Token token, Observation obs);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*******************************************************************************
* Copyright (c) 2020 Bosch IO GmbH and others.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v2.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v20.html
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.html.
*
* Contributors:
* Bosch IO GmbH - initial implementation
******************************************************************************/
package org.eclipse.californium.core.observe;

/**
* Exception indicating, that a observation could not be stored.
*
* @see ObservationStore
*/
public class ObservationStoreException extends RuntimeException {

private static final long serialVersionUID = 1L;

/**
* Create new instance with message.
*
* @param message message
*/
public ObservationStoreException(String message) {
super(message);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,17 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

import org.eclipse.californium.TestTools;
import org.eclipse.californium.category.Medium;
Expand All @@ -49,11 +54,17 @@
import org.eclipse.californium.core.coap.EmptyMessage;
import org.eclipse.californium.core.coap.Request;
import org.eclipse.californium.core.coap.Response;
import org.eclipse.californium.core.coap.Token;
import org.eclipse.californium.core.network.CoapEndpoint;
import org.eclipse.californium.core.network.EndpointManager;
import org.eclipse.californium.core.network.config.NetworkConfig;
import org.eclipse.californium.core.network.interceptors.MessageInterceptorAdapter;
import org.eclipse.californium.core.observe.Observation;
import org.eclipse.californium.core.observe.ObservationStore;
import org.eclipse.californium.core.observe.ObservationUtil;
import org.eclipse.californium.core.observe.ObservationStoreException;
import org.eclipse.californium.core.server.resources.CoapExchange;
import org.eclipse.californium.elements.EndpointContext;
import org.eclipse.californium.elements.rule.TestNameLoggerRule;
import org.eclipse.californium.rule.CoapNetworkRule;
import org.eclipse.californium.rule.CoapThreadsRule;
Expand Down Expand Up @@ -117,6 +128,7 @@ public class ObserveTest {
private CoapEndpoint serverEndpoint;
private MyResource resourceX;
private MyResource resourceY;
private MyObservationStore observations;
private ClientMessageInterceptor interceptor;

private final CountDownLatch waitforit = new CountDownLatch(1);
Expand Down Expand Up @@ -185,6 +197,17 @@ public void testObserveLifecycle() throws Exception {
// check that relations to resource X AND Y have been canceled
assertTrue(resourceX.getObserverCount() == 0);
assertTrue(resourceY.getObserverCount() == 0);

observations.setStoreException(new ObservationStoreException("test"));

Request requestC = Request.newGet();
requestC.setURI(uriY);
requestC.setObserve();
requestC.send();

Response responseC = requestC.waitForResponse(1000);
assertNull("Client received unexpected response", responseC);
assertNotNull("send error expected", requestC.getSendError());
}

@Test
Expand Down Expand Up @@ -384,6 +407,7 @@ private CoapServer createServer() {
CoapEndpoint.Builder builder = new CoapEndpoint.Builder();
builder.setInetSocketAddress(TestTools.LOCALHOST_EPHEMERAL);
builder.setNetworkConfig(config);

serverEndpoint = builder.build();

CoapServer server = new CoapServer(config);
Expand All @@ -396,6 +420,17 @@ private CoapServer createServer() {

uriX = TestTools.getUri(serverEndpoint, TARGET_X);
uriY = TestTools.getUri(serverEndpoint, TARGET_Y);

observations = new MyObservationStore(config);

// setup the client endpoint using the special observation store
builder = new CoapEndpoint.Builder();
builder.setInetSocketAddress(TestTools.LOCALHOST_EPHEMERAL);
builder.setNetworkConfig(config);
builder.setObservationStore(observations);
CoapEndpoint coapEndpoint = builder.build();
EndpointManager.getEndpointManager().setDefaultEndpoint(coapEndpoint);

return server;
}

Expand Down Expand Up @@ -536,4 +571,112 @@ public void prepareResponse() {
}

}

/**
* An observation store that keeps all observations in-memory.
*/
private static class MyObservationStore implements ObservationStore {

private final ConcurrentMap<Token, Observation> map = new ConcurrentHashMap<>();
private final NetworkConfig config;
private volatile AtomicReference<ObservationStoreException> exception = new AtomicReference<ObservationStoreException>();

public MyObservationStore(NetworkConfig config) {
this.config = config;
}

public void setStoreException(ObservationStoreException exception) {
this.exception.set(exception);
}

@Override
public void setExecutor(ScheduledExecutorService executor) {
}

@Override
public Observation putIfAbsent(Token key, Observation obs) {
if (key == null) {
throw new NullPointerException("token must not be null");
} else if (obs == null) {
throw new NullPointerException("observation must not be null");
} else {
ObservationStoreException exception = this.exception.getAndSet(null);
if (exception != null) {
throw exception;
}
return map.putIfAbsent(key, obs);
}
}

@Override
public Observation put(Token key, Observation obs) {
if (key == null) {
throw new NullPointerException("token must not be null");
} else if (obs == null) {
throw new NullPointerException("observation must not be null");
} else {
ObservationStoreException exception = this.exception.getAndSet(null);
if (exception != null) {
throw exception;
}
return map.put(key, obs);
}
}

@Override
public Observation get(Token token) {
if (token == null) {
return null;
} else {
Observation obs = map.get(token);
// clone request in order to prevent accumulation of
// message observers on original request
return ObservationUtil.shallowClone(obs);
}
}

@Override
public void remove(Token token) {
if (token != null) {
map.remove(token);
}
}

/**
* Gets the number of observations currently held in this store.
*
* @return The number of observations.
*/
public int getSize() {
return map.size();
}

/**
* Removes all observations from this store.
*/
public void clear() {
map.clear();
}

@Override
public void setContext(Token token, final EndpointContext ctx) {

if (token != null && ctx != null) {
Observation obs = map.get(token);
if (obs != null) {
map.replace(token, obs, new Observation(obs.getRequest(), ctx));
}
}
}

@Override
public void start() {
}

@Override
public void stop() {
}
}


}