diff --git a/leshan-server-cf/src/main/java/org/eclipse/leshan/server/californium/send/SendResource.java b/leshan-server-cf/src/main/java/org/eclipse/leshan/server/californium/send/SendResource.java index 5822b89086..64bcdfccf4 100644 --- a/leshan-server-cf/src/main/java/org/eclipse/leshan/server/californium/send/SendResource.java +++ b/leshan-server-cf/src/main/java/org/eclipse/leshan/server/californium/send/SendResource.java @@ -60,7 +60,7 @@ public void handlePOST(CoapExchange exchange) { ClientProfile clientProfile = profileProvider.getProfile(sender); // check we have a registration for this identity - if (clientProfile == null) { + if (clientProfile.getRegistration() == null) { exchange.respond(ResponseCode.NOT_FOUND, "no registration found"); return; } diff --git a/leshan-server-core/src/main/java/org/eclipse/leshan/server/LeshanServerBuilder.java b/leshan-server-core/src/main/java/org/eclipse/leshan/server/LeshanServerBuilder.java index 9b4aebc12a..3788b677a5 100644 --- a/leshan-server-core/src/main/java/org/eclipse/leshan/server/LeshanServerBuilder.java +++ b/leshan-server-core/src/main/java/org/eclipse/leshan/server/LeshanServerBuilder.java @@ -40,6 +40,7 @@ import org.eclipse.leshan.server.model.StandardModelProvider; import org.eclipse.leshan.server.queue.ClientAwakeTimeProvider; import org.eclipse.leshan.server.queue.StaticClientAwakeTimeProvider; +import org.eclipse.leshan.server.registration.CustomInMemoryRegistrationStore; import org.eclipse.leshan.server.registration.InMemoryRegistrationStore; import org.eclipse.leshan.server.registration.RandomStringRegistrationIdProvider; import org.eclipse.leshan.server.registration.Registration; @@ -276,7 +277,7 @@ public LeshanServerBuilder setEndpointsProvider(LwM2mServerEndpointsProvider end */ public LeshanServer build() { if (registrationStore == null) - registrationStore = new InMemoryRegistrationStore(); + registrationStore = new CustomInMemoryRegistrationStore(); if (authorizer == null) authorizer = new DefaultAuthorizer(securityStore); if (modelProvider == null) diff --git a/leshan-server-core/src/main/java/org/eclipse/leshan/server/registration/CustomInMemoryRegistrationStore.java b/leshan-server-core/src/main/java/org/eclipse/leshan/server/registration/CustomInMemoryRegistrationStore.java new file mode 100644 index 0000000000..80fe0fbbe1 --- /dev/null +++ b/leshan-server-core/src/main/java/org/eclipse/leshan/server/registration/CustomInMemoryRegistrationStore.java @@ -0,0 +1,551 @@ +/******************************************************************************* + * Copyright (c) 2022 Sierra Wireless and others. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v2.0 + * and Eclipse Distribution License v1.0 which accompany this distribution. + * + * The Eclipse Public License is available at + * http://www.eclipse.org/legal/epl-v20.html + * and the Eclipse Distribution License is available at + * http://www.eclipse.org/org/documents/edl-v10.html. + * + * Contributors: + * Sierra Wireless - initial API and implementation + *******************************************************************************/ +package org.eclipse.leshan.server.registration; + +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import org.eclipse.leshan.core.Destroyable; +import org.eclipse.leshan.core.Startable; +import org.eclipse.leshan.core.Stoppable; +import org.eclipse.leshan.core.observation.CompositeObservation; +import org.eclipse.leshan.core.observation.Observation; +import org.eclipse.leshan.core.observation.ObservationIdentifier; +import org.eclipse.leshan.core.observation.SingleObservation; +import org.eclipse.leshan.core.request.Identity; +import org.eclipse.leshan.core.util.NamedThreadFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * An in memory store for registration and observation. + */ +public class CustomInMemoryRegistrationStore implements RegistrationStore, Startable, Stoppable, Destroyable { + private final Logger LOG = LoggerFactory.getLogger(InMemoryRegistrationStore.class); + + // Data structure + private final Map regsByEp = new HashMap<>(); + private final Map regsByAddr = new HashMap<>(); + private final Map regsByRegId = new HashMap<>(); + private final Map regsByIdentity = new HashMap<>(); + private final Map obsByToken = new HashMap<>(); + private final Map> tokensByRegId = new HashMap<>(); + + private final ReadWriteLock lock = new ReentrantReadWriteLock(); + + // Listener use to notify when a registration expires + private ExpirationListener expirationListener; + + private final ScheduledExecutorService schedExecutor; + private ScheduledFuture cleanerTask; + private boolean started = false; + private final long cleanPeriod; // in seconds + + public CustomInMemoryRegistrationStore() { + this(2); // default clean period : 2s + } + + public CustomInMemoryRegistrationStore(long cleanPeriodInSec) { + this(Executors.newScheduledThreadPool(1, + new NamedThreadFactory(String.format("InMemoryRegistrationStore Cleaner (%ds)", cleanPeriodInSec))), + cleanPeriodInSec); + } + + public CustomInMemoryRegistrationStore(ScheduledExecutorService schedExecutor, long cleanPeriodInSec) { + this.schedExecutor = schedExecutor; + this.cleanPeriod = cleanPeriodInSec; + } + + /* *************** Leshan Registration API **************** */ + + @Override + public Deregistration addRegistration(Registration registration) { + String registrationidentifier = null; + + try { + lock.writeLock().lock(); + + if (registration.getIdentity().isPSK()) { + registrationidentifier = registration.getIdentity().getPskIdentity(); + } + if (registration.getIdentity().isRPK()) { + registrationidentifier = registration.getIdentity().getRawPublicKey().toString(); + } + if (registration.getIdentity().isX509()) { + registrationidentifier = registration.getIdentity().getX509CommonName(); + } + if (registration.getIdentity().isOSCORE()) { + registrationidentifier = registration.getIdentity().getOscoreIdentity().toString(); + } + if (!registration.getIdentity().isSecure()) { + registrationidentifier = registration.getIdentity().getPeerAddress().toString(); + } + + Registration registrationRemoved = regsByEp.put(registration.getEndpoint(), registration); + + regsByRegId.put(registration.getId(), registration); + regsByIdentity.put(registrationidentifier, registration); + // If a registration is already associated to this address we don't care as we only want to keep the most + // recent binding. + regsByAddr.put(registration.getSocketAddress(), registration); + if (registrationRemoved != null) { + Collection observationsRemoved = unsafeRemoveAllObservations(registrationRemoved.getId()); + if (!registrationRemoved.getSocketAddress().equals(registration.getSocketAddress())) { + removeFromMap(regsByAddr, registrationRemoved.getSocketAddress(), registrationRemoved); + } + if (!registrationRemoved.getId().equals(registration.getId())) { + removeFromMap(regsByRegId, registrationRemoved.getId(), registrationRemoved); + } + if (!(registrationRemoved.getIdentity().getPskIdentity()) + .equals(registration.getIdentity().getPskIdentity())) { + removeFromMap(regsByIdentity, registrationidentifier, registrationRemoved); + // removeFromMap(regsByIdentity, + // registrationRemoved.getIdentity().getPskIdentity(),registrationRemoved); + } + return new Deregistration(registrationRemoved, observationsRemoved); + } + } finally { + lock.writeLock().unlock(); + } + return null; + } + + @Override + public UpdatedRegistration updateRegistration(RegistrationUpdate update) { + try { + lock.writeLock().lock(); + + Registration registration = getRegistration(update.getRegistrationId()); + if (registration == null) { + return null; + } else { + Registration updatedRegistration = update.update(registration); + regsByEp.put(updatedRegistration.getEndpoint(), updatedRegistration); + // If registration is already associated to this address we don't care as we only want to keep the most + // recent binding. + regsByAddr.put(updatedRegistration.getSocketAddress(), updatedRegistration); + if (!registration.getSocketAddress().equals(updatedRegistration.getSocketAddress())) { + removeFromMap(regsByAddr, registration.getSocketAddress(), registration); + } + + if (!registration.getIdentity().equals(updatedRegistration.getIdentity())) { + if (updatedRegistration.getIdentity().isPSK()) { + regsByIdentity.put(updatedRegistration.getIdentity().getPskIdentity(), updatedRegistration); + removeFromMap(regsByIdentity, registration.getIdentity().getPskIdentity(), registration); + } + if (updatedRegistration.getIdentity().isRPK()) { + regsByIdentity.put(updatedRegistration.getIdentity().getRawPublicKey().toString(), + updatedRegistration); + removeFromMap(regsByIdentity, registration.getIdentity().getRawPublicKey().toString(), + registration); + } + if (updatedRegistration.getIdentity().isX509()) { + regsByIdentity.put(updatedRegistration.getIdentity().getX509CommonName(), updatedRegistration); + removeFromMap(regsByIdentity, registration.getIdentity().getX509CommonName(), registration); + } + if (updatedRegistration.getIdentity().isOSCORE()) { + regsByIdentity.put(updatedRegistration.getIdentity().getOscoreIdentity().toString(), + updatedRegistration); + removeFromMap(regsByIdentity, registration.getIdentity().getOscoreIdentity().toString(), + registration); + } + if (!updatedRegistration.getIdentity().isSecure()) { + regsByIdentity.put(updatedRegistration.getIdentity().getPeerAddress().toString(), + updatedRegistration); + removeFromMap(regsByIdentity, registration.getIdentity().getPeerAddress().toString(), + registration); + } + } + + regsByRegId.put(updatedRegistration.getId(), updatedRegistration); + + return new UpdatedRegistration(registration, updatedRegistration); + } + } finally { + lock.writeLock().unlock(); + } + } + + @Override + public Registration getRegistration(String registrationId) { + try { + lock.readLock().lock(); + return regsByRegId.get(registrationId); + } finally { + lock.readLock().unlock(); + } + } + + @Override + public Registration getRegistrationByEndpoint(String endpoint) { + try { + lock.readLock().lock(); + return regsByEp.get(endpoint); + } finally { + lock.readLock().unlock(); + } + } + + @Override + public Registration getRegistrationByAdress(InetSocketAddress address) { + try { + lock.readLock().lock(); + return regsByAddr.get(address); + } finally { + lock.readLock().unlock(); + } + } + + @Override + public Registration getRegistrationByIdentity(Identity identity) { + try { + lock.readLock().lock(); + if (identity.isPSK()) { + return regsByIdentity.get(identity.getPskIdentity()); + } + if (identity.isRPK()) { + return regsByIdentity.get(identity.getRawPublicKey().toString()); + } + if (identity.isX509()) { + return regsByIdentity.get(identity.getX509CommonName()); + } + if (identity.isOSCORE()) { + return regsByIdentity.get(identity.getOscoreIdentity().toString()); + } else { + return regsByIdentity.get(identity.getPeerAddress().toString()); + } + + } finally { + lock.readLock().unlock(); + } + } + + @Override + public Iterator getAllRegistrations() { + try { + lock.readLock().lock(); + return new ArrayList<>(regsByEp.values()).iterator(); + } finally { + lock.readLock().unlock(); + } + } + + @Override + public Deregistration removeRegistration(String registrationId) { + try { + lock.writeLock().lock(); + + Registration registration = getRegistration(registrationId); + if (registration != null) { + Collection observationsRemoved = unsafeRemoveAllObservations(registration.getId()); + regsByEp.remove(registration.getEndpoint()); + removeFromMap(regsByAddr, registration.getSocketAddress(), registration); + removeFromMap(regsByRegId, registration.getId(), registration); + + if (registration.getIdentity().isPSK()) { + removeFromMap(regsByIdentity, registration.getIdentity().getPskIdentity(), registration); + } + if (registration.getIdentity().isRPK()) { + removeFromMap(regsByIdentity, registration.getIdentity().getRawPublicKey().toString(), + registration); + } + if (registration.getIdentity().isX509()) { + removeFromMap(regsByIdentity, registration.getIdentity().getX509CommonName(), registration); + } + if (registration.getIdentity().isOSCORE()) { + removeFromMap(regsByIdentity, registration.getIdentity().getOscoreIdentity().toString(), + registration); + } + if (!registration.getIdentity().isSecure()) { + removeFromMap(regsByIdentity, registration.getIdentity().getPeerAddress().toString(), registration); + } + + return new Deregistration(registration, observationsRemoved); + } + return null; + } finally { + lock.writeLock().unlock(); + } + } + + /* *************** Leshan Observation API **************** */ + + @Override + public Collection addObservation(String registrationId, Observation observation, boolean addIfAbsent) { + List removed = new ArrayList<>(); + try { + lock.writeLock().lock(); + + if (!regsByRegId.containsKey(registrationId)) { + throw new IllegalStateException(String.format( + "can not add observation %s there is no registration with id %s", observation, registrationId)); + } + + Observation previousObservation; + ObservationIdentifier id = observation.getId(); + + if (addIfAbsent) { + if (!obsByToken.containsKey(id)) + previousObservation = obsByToken.put(id, observation); + else + previousObservation = obsByToken.get(id); + } else { + previousObservation = obsByToken.put(id, observation); + } + if (!tokensByRegId.containsKey(registrationId)) { + tokensByRegId.put(registrationId, new HashSet()); + } + tokensByRegId.get(registrationId).add(id); + + // log any collisions + if (previousObservation != null) { + removed.add(previousObservation); + LOG.warn("Token collision ? observation [{}] will be replaced by observation [{}] ", + previousObservation, observation); + } + + // cancel existing observations for the same path and registration id. + for (Observation obs : unsafeGetObservations(registrationId)) { + if (areTheSamePaths(observation, obs) && !observation.getId().equals(obs.getId())) { + unsafeRemoveObservation(obs.getId()); + removed.add(obs); + } + } + } finally { + lock.writeLock().unlock(); + } + + return removed; + } + + private boolean areTheSamePaths(Observation observation, Observation obs) { + if (observation instanceof SingleObservation && obs instanceof SingleObservation) { + return ((SingleObservation) observation).getPath().equals(((SingleObservation) obs).getPath()); + } + if (observation instanceof CompositeObservation && obs instanceof CompositeObservation) { + return ((CompositeObservation) observation).getPaths().equals(((CompositeObservation) obs).getPaths()); + } + return false; + } + + @Override + public Observation removeObservation(String registrationId, ObservationIdentifier observationId) { + try { + lock.writeLock().lock(); + Observation observation = unsafeGetObservation(observationId); + if (observation != null && registrationId.equals(observation.getRegistrationId())) { + unsafeRemoveObservation(observationId); + return observation; + } + return null; + } finally { + lock.writeLock().unlock(); + } + } + + @Override + public Observation getObservation(String registrationId, ObservationIdentifier observationId) { + try { + lock.readLock().lock(); + Observation observation = unsafeGetObservation(observationId); + if (observation != null && registrationId.equals(observation.getRegistrationId())) { + return observation; + } + return null; + } finally { + lock.readLock().unlock(); + } + } + + @Override + public Observation getObservation(ObservationIdentifier observationId) { + try { + lock.readLock().lock(); + Observation observation = unsafeGetObservation(observationId); + if (observation != null) { + return observation; + } + return null; + } finally { + lock.readLock().unlock(); + } + } + + @Override + public Collection getObservations(String registrationId) { + try { + lock.readLock().lock(); + return unsafeGetObservations(registrationId); + } finally { + lock.readLock().unlock(); + } + } + + @Override + public Collection removeObservations(String registrationId) { + try { + lock.writeLock().lock(); + return unsafeRemoveAllObservations(registrationId); + } finally { + lock.writeLock().unlock(); + } + } + + /* *************** Observation utility functions **************** */ + + private Observation unsafeGetObservation(ObservationIdentifier token) { + Observation obs = obsByToken.get(token); + return obs; + } + + private void unsafeRemoveObservation(ObservationIdentifier observationId) { + Observation removed = obsByToken.remove(observationId); + + if (removed != null) { + String registrationId = removed.getRegistrationId(); + Set tokens = tokensByRegId.get(registrationId); + tokens.remove(observationId); + if (tokens.isEmpty()) { + tokensByRegId.remove(registrationId); + } + } + } + + private Collection unsafeRemoveAllObservations(String registrationId) { + Collection removed = new ArrayList<>(); + Set ids = tokensByRegId.get(registrationId); + if (ids != null) { + for (ObservationIdentifier id : ids) { + Observation observationRemoved = obsByToken.remove(id); + if (observationRemoved != null) { + removed.add(observationRemoved); + } + } + } + tokensByRegId.remove(registrationId); + return removed; + } + + private Collection unsafeGetObservations(String registrationId) { + Collection result = new ArrayList<>(); + Set ids = tokensByRegId.get(registrationId); + if (ids != null) { + for (ObservationIdentifier id : ids) { + Observation obs = unsafeGetObservation(id); + if (obs != null) { + result.add(obs); + } + } + } + return result; + } + /* *************** Expiration handling **************** */ + + @Override + public void setExpirationListener(ExpirationListener listener) { + this.expirationListener = listener; + } + + /** + * start the registration store, will start regular cleanup of dead registrations. + */ + @Override + public synchronized void start() { + if (!started) { + started = true; + cleanerTask = schedExecutor.scheduleAtFixedRate(new Cleaner(), cleanPeriod, cleanPeriod, TimeUnit.SECONDS); + } + } + + /** + * Stop the underlying cleanup of the registrations. + */ + @Override + public synchronized void stop() { + if (started) { + started = false; + if (cleanerTask != null) { + cleanerTask.cancel(false); + cleanerTask = null; + } + } + } + + /** + * Destroy "cleanup" scheduler. + */ + @Override + public synchronized void destroy() { + started = false; + schedExecutor.shutdownNow(); + try { + schedExecutor.awaitTermination(5, TimeUnit.SECONDS); + } catch (InterruptedException e) { + LOG.warn("Destroying InMemoryRegistrationStore was interrupted.", e); + } + } + + private class Cleaner implements Runnable { + + @Override + public void run() { + try { + Collection allRegs = new ArrayList<>(); + try { + lock.readLock().lock(); + allRegs.addAll(regsByEp.values()); + } finally { + lock.readLock().unlock(); + } + + for (Registration reg : allRegs) { + if (!reg.isAlive()) { + // force de-registration + Deregistration removedRegistration = removeRegistration(reg.getId()); + expirationListener.registrationExpired(removedRegistration.getRegistration(), + removedRegistration.getObservations()); + } + } + } catch (Exception e) { + LOG.warn("Unexpected Exception while registration cleaning", e); + } + } + } + + // boolean remove(Object key, Object value) exist only since java8 + // So this method is here only while we want to support java 7 + protected boolean removeFromMap(Map map, K key, V value) { + if (map.containsKey(key) && Objects.equals(map.get(key), value)) { + map.remove(key); + return true; + } else + return false; + } +}