Skip to content

Commit

Permalink
Experimentally relax the constraints of the exchange executor.
Browse files Browse the repository at this point in the history
Don't fail, when a null executor is passed in.
Still not supported and may fail later, including hard to find race
conditions.

Signed-off-by: Achim Kraus <achim.kraus@cloudcoap.net>
  • Loading branch information
boaks committed Jun 28, 2023
1 parent 222a945 commit ec7de43
Show file tree
Hide file tree
Showing 3 changed files with 115 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@
import org.eclipse.californium.elements.EndpointContext;
import org.eclipse.californium.elements.EndpointIdentityResolver;
import org.eclipse.californium.elements.UdpMulticastConnector;
import org.eclipse.californium.elements.util.CheckedExecutor;
import org.eclipse.californium.elements.util.ClockUtil;
import org.eclipse.californium.elements.util.SerialExecutor;
import org.eclipse.californium.elements.util.StringUtil;
Expand Down Expand Up @@ -132,7 +133,31 @@
public class Exchange {

private static final Logger LOGGER = LoggerFactory.getLogger(Exchange.class);


/**
* Dummy executor when {@code null} is provided. Experimental, not supported
* and may fail.
*
* @since 3.9
*/
private static final CheckedExecutor DUMMY_EXECUTOR = new CheckedExecutor() {

@Override
public void execute(Runnable command) {
command.run();
}

@Override
public boolean checkOwner() {
return true;
}

@Override
public void assertOwner() {
// no check applied
}
};

static final boolean DEBUG = LOGGER.isTraceEnabled();

private static final int MAX_OBSERVE_NO = (1 << 24) - 1;
Expand Down Expand Up @@ -170,7 +195,7 @@ public enum Origin {
*
* @since 3.0 (changed from optional (unit tests) to mandatory)
*/
private final SerialExecutor executor;
private final CheckedExecutor executor;
/** The nano timestamp when this exchange has been created */
private final long nanoTimestamp;
/**
Expand Down Expand Up @@ -207,7 +232,10 @@ public enum Origin {
*/
private volatile Endpoint endpoint;

/** An remove handler to be called when a exchange must be removed from the exchange store */
/**
* An remove handler to be called when a exchange must be removed from the
* exchange store
*/
private volatile RemoveHandler removeHandler;

/** Indicates if the exchange is complete */
Expand Down Expand Up @@ -286,7 +314,8 @@ public enum Origin {
// true if the exchange has failed due to a timeout
private volatile boolean timedOut;

// the timeout scale factor, exponential back-off between retransmissions, if larger than 1.0F.
// the timeout scale factor, exponential back-off between retransmissions,
// if larger than 1.0F.
private float timeoutScale;

// the timeout of the current request or response set by reliability layer
Expand Down Expand Up @@ -327,19 +356,24 @@ public enum Origin {

private volatile EndpointContextOperator endpointContextPreOperator;

//If object security option is used, the Cryptographic context identifier is stored here
// If object security option is used, the Cryptographic context identifier
// is stored here
// for request/response mapping of contexts
private byte[] cryptoContextId;

/**
* Creates a new exchange with the specified request and origin.
*
* Note: since 3.9 {@code null} as executor doesn't longer fail with a
* {@link NullPointerException}. Using {@code null} is still not supported
* and comes with risks, that especially requires your own responsibility.
*
* @param request the request that starts the exchange
* @param peersIdentity peer's identity. Usually that's the peer's
* {@link InetSocketAddress}.
* @param origin the origin of the request (LOCAL or REMOTE)
* @param executor executor to be used for exchanges.
* @throws NullPointerException if request or executor is {@code null}
* @throws NullPointerException if request is {@code null}
* @see EndpointIdentityResolver
* @since 3.0 (added peersIdentity, executor adapted to mandatory)
*/
Expand All @@ -351,6 +385,10 @@ public Exchange(Request request, Object peersIdentity, Origin origin, Executor e
* Creates a new exchange with the specified request, origin, context, and
* notification marker.
*
* Note: since 3.9 {@code null} as executor doesn't longer fail with a
* {@link NullPointerException}. Using {@code null} is still not supported
* and comes with risks, that especially requires your own responsibility.
*
* @param request the request that starts the exchange
* @param peersIdentity peer's identity. Usually that's the peer's
* {@link InetSocketAddress}.
Expand All @@ -359,19 +397,21 @@ public Exchange(Request request, Object peersIdentity, Origin origin, Executor e
* @param ctx the endpoint context of this exchange
* @param notification {@code true} for notification exchange, {@code false}
* otherwise
* @throws NullPointerException if request or executor is {@code null}
* @throws NullPointerException if request is {@code null}
* @see EndpointIdentityResolver
* @since 3.0 (added peersIdentity, executor adapted to mandatory)
*/
public Exchange(Request request, Object peersIdentity, Origin origin, Executor executor, EndpointContext ctx, boolean notification) {
public Exchange(Request request, Object peersIdentity, Origin origin, Executor executor, EndpointContext ctx,
boolean notification) {
// might only be the first block of the whole request
if (request == null) {
throw new NullPointerException("request must not be null!");
} else if (executor == null) {
throw new NullPointerException("executor must not be null");
// Dummy executor.
executor = DUMMY_EXECUTOR;
}
this.id = INSTANCE_COUNTER.incrementAndGet();
this.executor = new SerialExecutor(executor);
this.executor = executor instanceof CheckedExecutor ? (CheckedExecutor) executor : new SerialExecutor(executor);
this.currentRequest = request;
this.request = request;
this.origin = origin;
Expand Down Expand Up @@ -628,12 +668,12 @@ public void setCurrentRequest(Request newCurrentRequest) {

/**
* Returns the response to the request or {@code null}, if no response has
* arrived yet.
* arrived yet.
*
* If there is an observe relation, the last received
* notification is the response on the client side. On the server side, that
* is the last notification to be sent, but may differ from the current
* response, if that is in transit.
* If there is an observe relation, the last received notification is the
* response on the client side. On the server side, that is the last
* notification to be sent, but may differ from the current response, if
* that is in transit.
*
* @return the response. or {@code null},
*/
Expand Down Expand Up @@ -878,7 +918,8 @@ public int incrementFailedTransmissionCount() {
}

/**
* Get timeout scale factor for exponential back-off between retransmissions.
* Get timeout scale factor for exponential back-off between
* retransmissions.
*
* @return timeout scale factor for exponential back-off.
* @since 3.0
Expand Down Expand Up @@ -1049,8 +1090,9 @@ public Throwable getCaller() {
* <p>
* This means that both request and response have been sent/received.
* <p>
* This method invokes the {@linkplain RemoveHandler#remove(Exchange, KeyToken, KeyMID)
* remove} method on the observer registered on this exchange (if any).
* This method invokes the
* {@linkplain RemoveHandler#remove(Exchange, KeyToken, KeyMID) remove}
* method on the observer registered on this exchange (if any).
* <p>
* Call this method to trigger a clean-up in the Matcher through its
* ExchangeObserverImpl. Usually, it is called automatically when reaching
Expand Down Expand Up @@ -1310,8 +1352,9 @@ public void removeNotifications() {
* exchange to increase security when matching an incoming response to this
* exchange's request.
* </p>
* If a {@link #setEndpointContextPreOperator(EndpointContextOperator)} is used,
* this pre-operator is called before the endpoint context is set and forwarded.
* If a {@link #setEndpointContextPreOperator(EndpointContextOperator)} is
* used, this pre-operator is called before the endpoint context is set and
* forwarded.
*
* @param ctx the endpoint context information
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/********************************************************************************
* Copyright (c) 2023 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0 which is available at
* https://www.eclipse.org/legal/epl-2.0, or the Eclipse Distribution License
* v1.0 which is available at
* https://www.eclipse.org/org/documents/edl-v10.php.
*
* SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause
********************************************************************************/
package org.eclipse.californium.elements.util;

import java.util.ConcurrentModificationException;
import java.util.concurrent.Executor;

/**
* Checked owner executor.
*
* @since 3.9
*/
public interface CheckedExecutor extends Executor {

/**
* Assert, that the current thread executes the current job.
*
* @throws ConcurrentModificationException if current thread doesn't execute
* the current job
*/
void assertOwner();

/**
* Check, if current thread executes the current job.
*
* @return {@code true}, if current thread executes the current job,
* {@code false}, otherwise.
*/
boolean checkOwner();

}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
*
* Serialize job execution before passing the jobs to a provided executor.
*/
public class SerialExecutor extends AbstractExecutorService {
public class SerialExecutor extends AbstractExecutorService implements CheckedExecutor {

private static final Logger LOGGER = LoggerFactory.getLogger(SerialExecutor.class);

Expand Down Expand Up @@ -119,12 +119,11 @@ public void execute(final Runnable command) {
}

/**
* Assert, that the current thread executes the
* {@link #currentlyExecutedJob}.
* {@inheritDoc}
*
* @throws ConcurrentModificationException if current thread doesn't execute
* the {@link #currentlyExecutedJob}.
*/
* {@link #currentlyExecutedJob} is used for the current job.
*/
@Override
public void assertOwner() {
final Thread me = Thread.currentThread();
if (owner.get() != me) {
Expand All @@ -138,11 +137,11 @@ public void assertOwner() {
}

/**
* Check, if current thread executes the {@link #currentlyExecutedJob}.
* {@inheritDoc}
*
* @return {@code true}, if current thread executes the
* {@link #currentlyExecutedJob}, {@code false}, otherwise.
*/
* {@link #currentlyExecutedJob} is used for the current job.
*/
@Override
public boolean checkOwner() {
return owner.get() == Thread.currentThread();
}
Expand Down

0 comments on commit ec7de43

Please sign in to comment.