diff --git a/dubbo-dependencies-bom/pom.xml b/dubbo-dependencies-bom/pom.xml index 1ba31db4c43..2d8be674495 100644 --- a/dubbo-dependencies-bom/pom.xml +++ b/dubbo-dependencies-bom/pom.xml @@ -103,6 +103,7 @@ 2.12.0 2.9.0 1.4.2 + 2.0.0 1.3.6 3.1.15 0.8.0 @@ -233,6 +234,11 @@ consul-api ${consul_version} + + com.pszymczyk.consul + embedded-consul + ${consul_process_version} + com.googlecode.xmemcached xmemcached diff --git a/dubbo-registry/dubbo-registry-consul/pom.xml b/dubbo-registry/dubbo-registry-consul/pom.xml index ec32dbe4340..0b40fbf0b06 100644 --- a/dubbo-registry/dubbo-registry-consul/pom.xml +++ b/dubbo-registry/dubbo-registry-consul/pom.xml @@ -36,6 +36,11 @@ com.ecwid.consul consul-api + + com.pszymczyk.consul + embedded-consul + test + diff --git a/dubbo-registry/dubbo-registry-consul/src/main/java/org/apache/dubbo/registry/consul/ConsulRegistry.java b/dubbo-registry/dubbo-registry-consul/src/main/java/org/apache/dubbo/registry/consul/ConsulRegistry.java index f9a30f3fee1..888faa47ca5 100644 --- a/dubbo-registry/dubbo-registry-consul/src/main/java/org/apache/dubbo/registry/consul/ConsulRegistry.java +++ b/dubbo-registry/dubbo-registry-consul/src/main/java/org/apache/dubbo/registry/consul/ConsulRegistry.java @@ -32,10 +32,12 @@ import com.ecwid.consul.v1.catalog.CatalogServicesRequest; import com.ecwid.consul.v1.health.HealthServicesRequest; import com.ecwid.consul.v1.health.model.HealthService; +import org.apache.dubbo.rpc.RpcException; import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.ArrayList; import java.util.Map; import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; @@ -155,6 +157,24 @@ public void doUnsubscribe(URL url, NotifyListener listener) { notifier.stop(); } + @Override + public List lookup(URL url) { + if (url == null) { + throw new IllegalArgumentException("lookup url == null"); + } + try { + String service = url.getServiceKey(); + Response> result = client.getHealthServices(service, HealthServicesRequest.newBuilder().setTag(SERVICE_TAG).build()); + if (result == null || result.getValue() == null || result.getValue().isEmpty()) { + return new ArrayList<>(); + } else { + return convert(result.getValue()); + } + } catch (Throwable e) { + throw new RpcException("Failed to lookup " + url + " from consul " + getUrl() + ", cause: " + e.getMessage(), e); + } + } + @Override public boolean isAvailable() { return client.getAgentSelf() != null; diff --git a/dubbo-registry/dubbo-registry-consul/src/test/java/org/apache/dubbo/registry/consul/ConsulRegistryTest.java b/dubbo-registry/dubbo-registry-consul/src/test/java/org/apache/dubbo/registry/consul/ConsulRegistryTest.java new file mode 100644 index 00000000000..08203f37b5d --- /dev/null +++ b/dubbo-registry/dubbo-registry-consul/src/test/java/org/apache/dubbo/registry/consul/ConsulRegistryTest.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dubbo.registry.consul; + +import com.pszymczyk.consul.ConsulProcess; +import com.pszymczyk.consul.ConsulStarterBuilder; +import org.apache.dubbo.common.URL; +import org.apache.dubbo.common.status.Status; +import org.apache.dubbo.registry.NotifyListener; +import org.apache.dubbo.registry.Registry; +import org.apache.dubbo.registry.status.RegistryStatusChecker; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.nullValue; +import static org.hamcrest.CoreMatchers.not; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.mockito.Mockito.mock; + +public class ConsulRegistryTest { + + private static ConsulProcess consul; + private ConsulRegistry consulRegistry; + private String service = "org.apache.dubbo.test.injvmServie"; + private URL serviceUrl = URL.valueOf("consul://127.0.0.1:8012/" + service + "?notify=false&methods=test1,test2"); + private URL registryUrl; + private ConsulRegistryFactory consulRegistryFactory; + + @BeforeEach + public void setUp() throws Exception { + this.consul = ConsulStarterBuilder.consulStarter() + .build() + .start(); + this.registryUrl = URL.valueOf("consul://localhost:" + consul.getHttpPort()); + + consulRegistryFactory = new ConsulRegistryFactory(); + this.consulRegistry = (ConsulRegistry) consulRegistryFactory.createRegistry(registryUrl); + } + + @AfterEach + public void tearDown() throws Exception { + consul.close(); + this.consulRegistry.destroy(); + } + + @Test + public void testRegister() { + Set registered; + + for (int i = 0; i < 2; i++) { + consulRegistry.register(serviceUrl); + registered = consulRegistry.getRegistered(); + assertThat(registered.contains(serviceUrl), is(true)); + } + + registered = consulRegistry.getRegistered(); + + assertThat(registered.size(), is(1)); + } + + @Test + public void testSubscribe() { + NotifyListener listener = mock(NotifyListener.class); + consulRegistry.subscribe(serviceUrl, listener); + + Map> subscribed = consulRegistry.getSubscribed(); + assertThat(subscribed.size(), is(1)); + assertThat(subscribed.get(serviceUrl).size(), is(1)); + + consulRegistry.unsubscribe(serviceUrl, listener); + subscribed = consulRegistry.getSubscribed(); + assertThat(subscribed.size(), is(1)); + assertThat(subscribed.get(serviceUrl).size(), is(0)); + } + + @Test + public void testAvailable() { + consulRegistry.register(serviceUrl); + assertThat(consulRegistry.isAvailable(), is(true)); + +// consulRegistry.destroy(); +// assertThat(consulRegistry.isAvailable(), is(false)); + } + + @Test + public void testLookup() throws InterruptedException { + List lookup = consulRegistry.lookup(serviceUrl); + assertThat(lookup.size(), is(0)); + + consulRegistry.register(serviceUrl); + Thread.sleep(5000); + lookup = consulRegistry.lookup(serviceUrl); + assertThat(lookup.size(), is(1)); + } + + @Test + public void testStatusChecker() { + RegistryStatusChecker registryStatusChecker = new RegistryStatusChecker(); + Status status = registryStatusChecker.check(); + assertThat(status.getLevel(), is(Status.Level.UNKNOWN)); + + Registry registry = consulRegistryFactory.getRegistry(registryUrl); + assertThat(registry, not(nullValue())); + + status = registryStatusChecker.check(); + assertThat(status.getLevel(), is(Status.Level.OK)); + + registry.register(serviceUrl); + status = registryStatusChecker.check(); + assertThat(status.getLevel(), is(Status.Level.OK)); + } + +}