Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Experimentally relax the constraints of the exchange executor. #2153

Merged
merged 1 commit into from
Jun 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@
import org.eclipse.californium.elements.EndpointContext;
import org.eclipse.californium.elements.EndpointIdentityResolver;
import org.eclipse.californium.elements.UdpMulticastConnector;
import org.eclipse.californium.elements.util.CheckedExecutor;
import org.eclipse.californium.elements.util.ClockUtil;
import org.eclipse.californium.elements.util.SerialExecutor;
import org.eclipse.californium.elements.util.StringUtil;
Expand Down Expand Up @@ -132,7 +133,31 @@
public class Exchange {

private static final Logger LOGGER = LoggerFactory.getLogger(Exchange.class);


/**
* Dummy executor when {@code null} is provided. Experimental, not supported
* and may fail.
*
* @since 3.9
*/
private static final CheckedExecutor DUMMY_EXECUTOR = new CheckedExecutor() {

@Override
public void execute(Runnable command) {
command.run();
}

@Override
public boolean checkOwner() {
return true;
}

@Override
public void assertOwner() {
// no check applied
}
};

static final boolean DEBUG = LOGGER.isTraceEnabled();

private static final int MAX_OBSERVE_NO = (1 << 24) - 1;
Expand Down Expand Up @@ -170,7 +195,7 @@ public enum Origin {
*
* @since 3.0 (changed from optional (unit tests) to mandatory)
*/
private final SerialExecutor executor;
private final CheckedExecutor executor;
/** The nano timestamp when this exchange has been created */
private final long nanoTimestamp;
/**
Expand Down Expand Up @@ -207,7 +232,10 @@ public enum Origin {
*/
private volatile Endpoint endpoint;

/** An remove handler to be called when a exchange must be removed from the exchange store */
/**
* An remove handler to be called when a exchange must be removed from the
* exchange store
*/
private volatile RemoveHandler removeHandler;

/** Indicates if the exchange is complete */
Expand Down Expand Up @@ -286,7 +314,8 @@ public enum Origin {
// true if the exchange has failed due to a timeout
private volatile boolean timedOut;

// the timeout scale factor, exponential back-off between retransmissions, if larger than 1.0F.
// the timeout scale factor, exponential back-off between retransmissions,
// if larger than 1.0F.
private float timeoutScale;

// the timeout of the current request or response set by reliability layer
Expand Down Expand Up @@ -327,19 +356,24 @@ public enum Origin {

private volatile EndpointContextOperator endpointContextPreOperator;

//If object security option is used, the Cryptographic context identifier is stored here
// If object security option is used, the Cryptographic context identifier
// is stored here
// for request/response mapping of contexts
private byte[] cryptoContextId;

/**
* Creates a new exchange with the specified request and origin.
*
* Note: since 3.9 {@code null} as executor doesn't longer fail with a
* {@link NullPointerException}. Using {@code null} is still not supported
* and comes with risks, that especially requires your own responsibility.
*
* @param request the request that starts the exchange
* @param peersIdentity peer's identity. Usually that's the peer's
* {@link InetSocketAddress}.
* @param origin the origin of the request (LOCAL or REMOTE)
* @param executor executor to be used for exchanges.
* @throws NullPointerException if request or executor is {@code null}
* @throws NullPointerException if request is {@code null}
* @see EndpointIdentityResolver
* @since 3.0 (added peersIdentity, executor adapted to mandatory)
*/
Expand All @@ -351,6 +385,10 @@ public Exchange(Request request, Object peersIdentity, Origin origin, Executor e
* Creates a new exchange with the specified request, origin, context, and
* notification marker.
*
* Note: since 3.9 {@code null} as executor doesn't longer fail with a
* {@link NullPointerException}. Using {@code null} is still not supported
* and comes with risks, that especially requires your own responsibility.
*
* @param request the request that starts the exchange
* @param peersIdentity peer's identity. Usually that's the peer's
* {@link InetSocketAddress}.
Expand All @@ -359,19 +397,21 @@ public Exchange(Request request, Object peersIdentity, Origin origin, Executor e
* @param ctx the endpoint context of this exchange
* @param notification {@code true} for notification exchange, {@code false}
* otherwise
* @throws NullPointerException if request or executor is {@code null}
* @throws NullPointerException if request is {@code null}
* @see EndpointIdentityResolver
* @since 3.0 (added peersIdentity, executor adapted to mandatory)
*/
public Exchange(Request request, Object peersIdentity, Origin origin, Executor executor, EndpointContext ctx, boolean notification) {
public Exchange(Request request, Object peersIdentity, Origin origin, Executor executor, EndpointContext ctx,
boolean notification) {
// might only be the first block of the whole request
if (request == null) {
throw new NullPointerException("request must not be null!");
} else if (executor == null) {
throw new NullPointerException("executor must not be null");
// Dummy executor.
executor = DUMMY_EXECUTOR;
}
this.id = INSTANCE_COUNTER.incrementAndGet();
this.executor = new SerialExecutor(executor);
this.executor = executor instanceof CheckedExecutor ? (CheckedExecutor) executor : new SerialExecutor(executor);
this.currentRequest = request;
this.request = request;
this.origin = origin;
Expand Down Expand Up @@ -628,12 +668,12 @@ public void setCurrentRequest(Request newCurrentRequest) {

/**
* Returns the response to the request or {@code null}, if no response has
* arrived yet.
* arrived yet.
*
* If there is an observe relation, the last received
* notification is the response on the client side. On the server side, that
* is the last notification to be sent, but may differ from the current
* response, if that is in transit.
* If there is an observe relation, the last received notification is the
* response on the client side. On the server side, that is the last
* notification to be sent, but may differ from the current response, if
* that is in transit.
*
* @return the response. or {@code null},
*/
Expand Down Expand Up @@ -878,7 +918,8 @@ public int incrementFailedTransmissionCount() {
}

/**
* Get timeout scale factor for exponential back-off between retransmissions.
* Get timeout scale factor for exponential back-off between
* retransmissions.
*
* @return timeout scale factor for exponential back-off.
* @since 3.0
Expand Down Expand Up @@ -1049,8 +1090,9 @@ public Throwable getCaller() {
* <p>
* This means that both request and response have been sent/received.
* <p>
* This method invokes the {@linkplain RemoveHandler#remove(Exchange, KeyToken, KeyMID)
* remove} method on the observer registered on this exchange (if any).
* This method invokes the
* {@linkplain RemoveHandler#remove(Exchange, KeyToken, KeyMID) remove}
* method on the observer registered on this exchange (if any).
* <p>
* Call this method to trigger a clean-up in the Matcher through its
* ExchangeObserverImpl. Usually, it is called automatically when reaching
Expand Down Expand Up @@ -1310,8 +1352,9 @@ public void removeNotifications() {
* exchange to increase security when matching an incoming response to this
* exchange's request.
* </p>
* If a {@link #setEndpointContextPreOperator(EndpointContextOperator)} is used,
* this pre-operator is called before the endpoint context is set and forwarded.
* If a {@link #setEndpointContextPreOperator(EndpointContextOperator)} is
* used, this pre-operator is called before the endpoint context is set and
* forwarded.
*
* @param ctx the endpoint context information
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/********************************************************************************
* Copyright (c) 2023 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0 which is available at
* https://www.eclipse.org/legal/epl-2.0, or the Eclipse Distribution License
* v1.0 which is available at
* https://www.eclipse.org/org/documents/edl-v10.php.
*
* SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause
********************************************************************************/
package org.eclipse.californium.elements.util;

import java.util.ConcurrentModificationException;
import java.util.concurrent.Executor;

/**
* Checked owner executor.
*
* @since 3.9
*/
public interface CheckedExecutor extends Executor {

/**
* Assert, that the current thread executes the current job.
*
* @throws ConcurrentModificationException if current thread doesn't execute
* the current job
*/
void assertOwner();

/**
* Check, if current thread executes the current job.
*
* @return {@code true}, if current thread executes the current job,
* {@code false}, otherwise.
*/
boolean checkOwner();

}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
*
* Serialize job execution before passing the jobs to a provided executor.
*/
public class SerialExecutor extends AbstractExecutorService {
public class SerialExecutor extends AbstractExecutorService implements CheckedExecutor {

private static final Logger LOGGER = LoggerFactory.getLogger(SerialExecutor.class);

Expand Down Expand Up @@ -119,12 +119,11 @@ public void execute(final Runnable command) {
}

/**
* Assert, that the current thread executes the
* {@link #currentlyExecutedJob}.
* {@inheritDoc}
*
* @throws ConcurrentModificationException if current thread doesn't execute
* the {@link #currentlyExecutedJob}.
*/
* {@link #currentlyExecutedJob} is used for the current job.
*/
@Override
public void assertOwner() {
final Thread me = Thread.currentThread();
if (owner.get() != me) {
Expand All @@ -138,11 +137,11 @@ public void assertOwner() {
}

/**
* Check, if current thread executes the {@link #currentlyExecutedJob}.
* {@inheritDoc}
*
* @return {@code true}, if current thread executes the
* {@link #currentlyExecutedJob}, {@code false}, otherwise.
*/
* {@link #currentlyExecutedJob} is used for the current job.
*/
@Override
public boolean checkOwner() {
return owner.get() == Thread.currentThread();
}
Expand Down