Skip to content

Commit

Permalink
Regroup event on WriteComposite at client side.
Browse files Browse the repository at this point in the history
  • Loading branch information
sbernard31 committed Mar 5, 2021
1 parent f98311f commit 6432a4b
Show file tree
Hide file tree
Showing 9 changed files with 179 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ public List<Integer> getAvailableResourceIds(int instanceId) {
@Override
public synchronized CreateResponse create(ServerIdentity identity, CreateRequest request) {
try {
beginTransaction();
beginTransaction(LwM2mPath.OBJECT_DEPTH);

if (!identity.isSystem()) {
if (id == LwM2mId.SECURITY) {
Expand All @@ -133,7 +133,7 @@ public synchronized CreateResponse create(ServerIdentity identity, CreateRequest
return doCreate(identity, request);

} finally {
endTransaction();
endTransaction(LwM2mPath.OBJECT_DEPTH);
}
}

Expand Down Expand Up @@ -183,7 +183,7 @@ protected ReadResponse doRead(ServerIdentity identity, ReadRequest request) {
@Override
public synchronized WriteResponse write(ServerIdentity identity, WriteRequest request) {
try {
beginTransaction();
beginTransaction(LwM2mPath.OBJECT_DEPTH);

LwM2mPath path = request.getPath();

Expand Down Expand Up @@ -234,7 +234,7 @@ public synchronized WriteResponse write(ServerIdentity identity, WriteRequest re

return doWrite(identity, request);
} finally {
endTransaction();
endTransaction(LwM2mPath.OBJECT_DEPTH);
}
}

Expand Down Expand Up @@ -476,12 +476,14 @@ public void removeListener(ObjectListener listener) {
transactionalListener.removeListener(listener);
}

protected void beginTransaction() {
transactionalListener.beginTransaction();
@Override
public synchronized void beginTransaction(byte level) {
transactionalListener.beginTransaction(level);
}

protected void endTransaction() {
transactionalListener.endTransaction();
@Override
public synchronized void endTransaction(byte level) {
transactionalListener.endTransaction(level);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,5 +103,9 @@ public interface LwM2mObjectEnabler {

void setLwM2mClient(LwM2mClient client);

void beginTransaction(byte level);

void endTransaction(byte level);

ContentFormat getDefaultEncodingFormat(DownlinkRequest<?> request);
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.eclipse.leshan.core.Destroyable;
import org.eclipse.leshan.core.Startable;
import org.eclipse.leshan.core.Stoppable;
import org.eclipse.leshan.core.node.LwM2mPath;

/**
* The LWM2M Object Tree.
Expand Down Expand Up @@ -148,4 +149,17 @@ public void resourceChanged(LwM2mObjectEnabler object, int instanceId, int... re
}
}
}

public void beginTransaction(Map<Integer, LwM2mObjectEnabler> enablers) {
for (LwM2mObjectEnabler enabler : enablers.values()) {
enabler.beginTransaction(LwM2mPath.ROOT_DEPTH);
}
}

public void endTransaction(Map<Integer, LwM2mObjectEnabler> enablers) {
for (LwM2mObjectEnabler enabler : enablers.values()) {
enabler.endTransaction(LwM2mPath.ROOT_DEPTH);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -178,37 +178,45 @@ public WriteCompositeResponse write(ServerIdentity identity, WriteCompositeReque

// TODO all of this should be done in an atomic way. So I suppose if we want to support this we need to add a
// kind of transaction mechanism with rollback feature and also a way to lock objectTree?
// current transaction mechanism is just about regroup event and so observe notification.

// Write Nodes
for (Entry<LwM2mPath, LwM2mNode> entry : request.getNodes().entrySet()) {
// Get corresponding object enabler
LwM2mPath path = entry.getKey();
LwM2mNode node = entry.getValue();
LwM2mObjectEnabler objectEnabler = enablers.get(path.getObjectId());

// WriteComposite is about patching so we need to use write UPDATE Mode which is only available on instance.
// So we create an instance from node
LwM2mObjectInstance instance;
if (node instanceof LwM2mResource) {
instance = new LwM2mObjectInstance(path.getObjectInstanceId(), (LwM2mResource) node);
} else if (node instanceof LwM2mResourceInstance) {
LwM2mResourceInstance resourceInstance = (LwM2mResourceInstance) node;
instance = new LwM2mObjectInstance(path.getObjectInstanceId(),
new LwM2mMultipleResource(path.getResourceId(), resourceInstance.getType(), resourceInstance));
} else {
return WriteCompositeResponse.internalServerError(
String.format("node %s should be a resource or a resource instance, not a %s", path,
node.getClass().getSimpleName()));
}
try {
tree.beginTransaction(enablers);
for (Entry<LwM2mPath, LwM2mNode> entry : request.getNodes().entrySet()) {
// Get corresponding object enabler
LwM2mPath path = entry.getKey();
LwM2mNode node = entry.getValue();
LwM2mObjectEnabler objectEnabler = enablers.get(path.getObjectId());

// WriteComposite is about patching so we need to use write UPDATE Mode
// which is only available on instance.
// So we create an instance from resource or resource instance.
LwM2mObjectInstance instance;
if (node instanceof LwM2mResource) {
instance = new LwM2mObjectInstance(path.getObjectInstanceId(), (LwM2mResource) node);
} else if (node instanceof LwM2mResourceInstance) {
LwM2mResourceInstance resourceInstance = (LwM2mResourceInstance) node;
instance = new LwM2mObjectInstance(path.getObjectInstanceId(), new LwM2mMultipleResource(
path.getResourceId(), resourceInstance.getType(), resourceInstance));
} else {
return WriteCompositeResponse.internalServerError(
String.format("node %s should be a resource or a resource instance, not a %s", path,
node.getClass().getSimpleName()));
}

WriteResponse response = objectEnabler.write(identity, new WriteRequest(Mode.UPDATE,
request.getContentFormat(), path.toObjectInstancePath(), instance, request.getCoapRequest()));
WriteResponse response = objectEnabler.write(identity, new WriteRequest(Mode.UPDATE,
request.getContentFormat(), path.toObjectInstancePath(), instance, request.getCoapRequest()));

if (response.isFailure()) {
return new WriteCompositeResponse(response.getCode(), response.getErrorMessage(), null);
if (response.isFailure()) {
return new WriteCompositeResponse(response.getCode(), response.getErrorMessage(), null);
}
}
} finally {
tree.endTransaction(enablers);
}
return WriteCompositeResponse.success();

}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import java.util.Map;

import org.eclipse.leshan.client.resource.listener.ObjectListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* An {@link ObjectListener} which is able to store notification during transaction and raise all grouped event at the
Expand All @@ -29,7 +31,10 @@
* This class is not threadsafe.
*/
public class TransactionalObjectListener implements ObjectListener {
protected boolean inTransaction = false;

private static Logger LOG = LoggerFactory.getLogger(TransactionalObjectListener.class);

protected int currentLevel = 0;
protected List<Integer> instancesAdded = new ArrayList<>();
protected List<Integer> instancesRemoved = new ArrayList<>();
protected Map<Integer, List<Integer>> resourcesChangedByInstance = new HashMap<>();
Expand All @@ -49,16 +54,46 @@ public void removeListener(ObjectListener listener) {
innerListeners.remove(listener);
}

public void beginTransaction() {
inTransaction = true;
/**
* Open a transaction with a given level. Same level must be used to open and close a transaction.
* <p>
* a transaction can be started in another transaction but in that case the inner transaction should use a higher
* level.
*
* @param level the transaction level, a not 0 positive integer.
*/
public void beginTransaction(byte level) {
if (level <= 0) {
throw new IllegalArgumentException("level must be > 0.");
}
if (currentLevel == 0) {
currentLevel = level;
} else if (level <= currentLevel) {
LOG.warn(
"Begin transaction with a lower level {} than the current one {} for object {}, this could bring to unexpected behavior",
objectEnabler.getId(), currentLevel, level);
currentLevel = level;
}
// else if level > currentLevel
// there is nothing to do has this transaction is inner a another one.
}

public void endTransaction(byte level) {
if (currentLevel == level) {
try {
fireStoredEvents();
} catch (Exception e) {
LOG.warn("Exception raised when we fired Event about object {}", objectEnabler.getId(), e);
}
instancesAdded.clear();
instancesRemoved.clear();
resourcesChangedByInstance.clear();
currentLevel = 0;
}
}

public void endTransaction() {
fireStoredEvents();
instancesAdded.clear();
instancesRemoved.clear();
resourcesChangedByInstance.clear();
inTransaction = false;
protected boolean inTransaction() {
return currentLevel > 0;
}

protected void fireStoredEvents() {
Expand All @@ -74,7 +109,7 @@ protected void fireStoredEvents() {

@Override
public void objectInstancesAdded(LwM2mObjectEnabler object, int... instanceIds) {
if (!inTransaction) {
if (!inTransaction()) {
fireObjectInstancesAdded(instanceIds);
} else {
// store additions
Expand All @@ -90,7 +125,7 @@ public void objectInstancesAdded(LwM2mObjectEnabler object, int... instanceIds)

@Override
public void objectInstancesRemoved(LwM2mObjectEnabler object, int... instanceIds) {
if (!inTransaction) {
if (!inTransaction()) {
fireObjectInstancesRemoved(instanceIds);
} else {
// store deletion
Expand All @@ -106,7 +141,7 @@ public void objectInstancesRemoved(LwM2mObjectEnabler object, int... instanceIds

@Override
public void resourceChanged(LwM2mObjectEnabler object, int instanceId, int... resourcesIds) {
if (!inTransaction) {
if (!inTransaction()) {
fireResourcesChanged(instanceId, resourcesIds);
} else {
List<Integer> resourcesChanged = resourcesChangedByInstance.get(instanceId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,12 @@
*/
public class LwM2mPath {

public static final byte ROOT_DEPTH = 1;
public static final byte OBJECT_DEPTH = 2;
public static final byte OBJECT_INSTANCE_DEPTH = 3;
public static final byte RESOURCE_DEPTH = 4;
public static final byte RESOURCE_INSTANCE_DEPTH = 5;

private final Integer objectId;
private final Integer objectInstanceId;
private final Integer resourceId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import org.eclipse.leshan.core.observation.Observation;
import org.eclipse.leshan.core.response.ObserveResponse;
Expand All @@ -13,6 +14,7 @@ public class TestObservationListener implements ObservationListener {

private CountDownLatch latch = new CountDownLatch(1);
private final AtomicBoolean receivedNotify = new AtomicBoolean();
private AtomicInteger counter = new AtomicInteger(0);
private ObserveResponse response;
private Exception error;

Expand All @@ -21,6 +23,7 @@ public void onResponse(Observation observation, Registration registration, Obser
receivedNotify.set(true);
this.response = response;
this.error = null;
this.counter.incrementAndGet();
latch.countDown();
}

Expand Down Expand Up @@ -57,10 +60,15 @@ public void waitForNotification(long timeout) throws InterruptedException {
latch.await(timeout, TimeUnit.MILLISECONDS);
}

public int getNotificationCount() {
return counter.get();
}

public void reset() {
latch = new CountDownLatch(1);
receivedNotify.set(false);
response = null;
error = null;
this.counter = new AtomicInteger(0);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,16 @@ public CreateResponse create(ServerIdentity identity, CreateRequest request) {
public void addListener(ObjectListener listener) {
nodeEnabler.addListener(listener);
}

@Override
public void beginTransaction(byte level) {
nodeEnabler.beginTransaction(level);
}

@Override
public void endTransaction(byte level) {
nodeEnabler.endTransaction(level);
}
};
}
}
Loading

0 comments on commit 6432a4b

Please sign in to comment.