Skip to content

Commit

Permalink
Rename HiveCluster to MetastoreLocator
Browse files Browse the repository at this point in the history
  • Loading branch information
electrum committed Mar 11, 2019
1 parent 7875e3e commit 6eb271f
Show file tree
Hide file tree
Showing 9 changed files with 50 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,10 @@

import org.apache.thrift.TException;

/**
* A Hive cluster is a single logical installation of Hive. It might
* have multiple instances of the metastore service (for scalability
* purposes), but they would all return the same data.
* <p/>
* This Hive plugin only supports having a single Hive cluster per
* instantiation of the plugin, but a plugin that extends this code
* could support multiple, dynamically located Hive clusters.
*/
public interface HiveCluster
public interface MetastoreLocator
{
/**
* Create a connected {@link HiveMetastoreClient} to this HiveCluster
* Create a connected {@link HiveMetastoreClient}
*/
HiveMetastoreClient createMetastoreClient()
throws TException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,25 +28,25 @@
import static java.util.Objects.requireNonNull;
import static java.util.stream.Collectors.toList;

public class StaticHiveCluster
implements HiveCluster
public class StaticMetastoreLocator
implements MetastoreLocator
{
private final List<HostAndPort> addresses;
private final HiveMetastoreClientFactory clientFactory;
private final String metastoreUsername;

@Inject
public StaticHiveCluster(StaticMetastoreConfig config, HiveMetastoreClientFactory clientFactory)
public StaticMetastoreLocator(StaticMetastoreConfig config, HiveMetastoreClientFactory clientFactory)
{
this(config.getMetastoreUris(), config.getMetastoreUsername(), clientFactory);
}

public StaticHiveCluster(List<URI> metastoreUris, String metastoreUsername, HiveMetastoreClientFactory clientFactory)
public StaticMetastoreLocator(List<URI> metastoreUris, String metastoreUsername, HiveMetastoreClientFactory clientFactory)
{
requireNonNull(metastoreUris, "metastoreUris is null");
checkArgument(!metastoreUris.isEmpty(), "metastoreUris must specify at least one URI");
this.addresses = metastoreUris.stream()
.map(StaticHiveCluster::checkMetastoreUri)
.map(StaticMetastoreLocator::checkMetastoreUri)
.map(uri -> HostAndPort.fromParts(uri.getHost(), uri.getPort()))
.collect(toList());
this.metastoreUsername = metastoreUsername;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ public class ThriftHiveMetastore
implements HiveMetastore
{
private final ThriftHiveMetastoreStats stats;
private final HiveCluster clientProvider;
private final MetastoreLocator clientProvider;
private final Function<Exception, Exception> exceptionMapper;
private final double backoffScaleFactor;
private final Duration minBackoffDelay;
Expand All @@ -120,14 +120,14 @@ public class ThriftHiveMetastore
private final int maxRetries;

@Inject
public ThriftHiveMetastore(HiveCluster hiveCluster, ThriftHiveMetastoreConfig thriftConfig)
public ThriftHiveMetastore(MetastoreLocator metastoreLocator, ThriftHiveMetastoreConfig thriftConfig)
{
this(hiveCluster, new ThriftHiveMetastoreStats(), identity(), thriftConfig);
this(metastoreLocator, new ThriftHiveMetastoreStats(), identity(), thriftConfig);
}

public ThriftHiveMetastore(HiveCluster hiveCluster, ThriftHiveMetastoreStats stats, Function<Exception, Exception> exceptionMapper, ThriftHiveMetastoreConfig thriftConfig)
public ThriftHiveMetastore(MetastoreLocator metastoreLocator, ThriftHiveMetastoreStats stats, Function<Exception, Exception> exceptionMapper, ThriftHiveMetastoreConfig thriftConfig)
{
this.clientProvider = requireNonNull(hiveCluster, "hiveCluster is null");
this.clientProvider = requireNonNull(metastoreLocator, "metastoreLocator is null");
this.stats = requireNonNull(stats, "stats is null");
this.exceptionMapper = requireNonNull(exceptionMapper, "exceptionMapper is null");
this.backoffScaleFactor = thriftConfig.getBackoffScaleFactor();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public class ThriftMetastoreModule
protected void setup(Binder binder)
{
binder.bind(HiveMetastoreClientFactory.class).in(Scopes.SINGLETON);
binder.bind(HiveCluster.class).to(StaticHiveCluster.class).in(Scopes.SINGLETON);
binder.bind(MetastoreLocator.class).to(StaticMetastoreLocator.class).in(Scopes.SINGLETON);
configBinder(binder).bindConfig(StaticMetastoreConfig.class);
configBinder(binder).bindConfig(ThriftHiveMetastoreConfig.class);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@
import io.prestosql.plugin.hive.metastore.StorageFormat;
import io.prestosql.plugin.hive.metastore.Table;
import io.prestosql.plugin.hive.metastore.thrift.BridgingHiveMetastore;
import io.prestosql.plugin.hive.metastore.thrift.HiveCluster;
import io.prestosql.plugin.hive.metastore.thrift.TestingHiveCluster;
import io.prestosql.plugin.hive.metastore.thrift.MetastoreLocator;
import io.prestosql.plugin.hive.metastore.thrift.TestingMetastoreLocator;
import io.prestosql.plugin.hive.metastore.thrift.ThriftHiveMetastore;
import io.prestosql.plugin.hive.metastore.thrift.ThriftHiveMetastoreConfig;
import io.prestosql.plugin.hive.orc.OrcPageSource;
Expand Down Expand Up @@ -708,9 +708,9 @@ protected final void setup(String host, int port, String databaseName, String ti
hiveClientConfig.setMetastoreSocksProxy(HostAndPort.fromString(proxy));
}

HiveCluster hiveCluster = new TestingHiveCluster(hiveClientConfig, host, port);
MetastoreLocator metastoreLocator = new TestingMetastoreLocator(hiveClientConfig, host, port);
ExtendedHiveMetastore metastore = new CachingHiveMetastore(
new BridgingHiveMetastore(new ThriftHiveMetastore(hiveCluster, new ThriftHiveMetastoreConfig())),
new BridgingHiveMetastore(new ThriftHiveMetastore(metastoreLocator, new ThriftHiveMetastoreConfig())),
executor,
Duration.valueOf("1m"),
Duration.valueOf("15s"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@
import io.prestosql.plugin.hive.metastore.PrincipalPrivileges;
import io.prestosql.plugin.hive.metastore.Table;
import io.prestosql.plugin.hive.metastore.thrift.BridgingHiveMetastore;
import io.prestosql.plugin.hive.metastore.thrift.HiveCluster;
import io.prestosql.plugin.hive.metastore.thrift.TestingHiveCluster;
import io.prestosql.plugin.hive.metastore.thrift.MetastoreLocator;
import io.prestosql.plugin.hive.metastore.thrift.TestingMetastoreLocator;
import io.prestosql.plugin.hive.metastore.thrift.ThriftHiveMetastore;
import io.prestosql.plugin.hive.metastore.thrift.ThriftHiveMetastoreConfig;
import io.prestosql.spi.connector.ColumnHandle;
Expand Down Expand Up @@ -155,15 +155,15 @@ protected void setup(String host, int port, String databaseName, Function<HiveCl
config.setMetastoreSocksProxy(HostAndPort.fromString(proxy));
}

HiveCluster hiveCluster = new TestingHiveCluster(config, host, port);
MetastoreLocator metastoreLocator = new TestingMetastoreLocator(config, host, port);
ExecutorService executor = newCachedThreadPool(daemonThreadsNamed("hive-%s"));
HivePartitionManager hivePartitionManager = new HivePartitionManager(TYPE_MANAGER, config);

HdfsConfiguration hdfsConfiguration = hdfsConfigurationProvider.apply(config);

hdfsEnvironment = new HdfsEnvironment(hdfsConfiguration, config, new NoHdfsAuthentication());
metastoreClient = new TestingHiveMetastore(
new BridgingHiveMetastore(new ThriftHiveMetastore(hiveCluster, new ThriftHiveMetastoreConfig())),
new BridgingHiveMetastore(new ThriftHiveMetastore(metastoreLocator, new ThriftHiveMetastoreConfig())),
executor,
config,
getBasePath(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
import com.google.common.util.concurrent.ListeningExecutorService;
import io.airlift.units.Duration;
import io.prestosql.plugin.hive.metastore.thrift.BridgingHiveMetastore;
import io.prestosql.plugin.hive.metastore.thrift.HiveCluster;
import io.prestosql.plugin.hive.metastore.thrift.HiveMetastoreClient;
import io.prestosql.plugin.hive.metastore.thrift.MetastoreLocator;
import io.prestosql.plugin.hive.metastore.thrift.MockHiveMetastoreClient;
import io.prestosql.plugin.hive.metastore.thrift.ThriftHiveMetastore;
import io.prestosql.plugin.hive.metastore.thrift.ThriftHiveMetastoreConfig;
Expand Down Expand Up @@ -55,9 +55,9 @@ public class TestCachingHiveMetastore
public void setUp()
{
mockClient = new MockHiveMetastoreClient();
MockHiveCluster mockHiveCluster = new MockHiveCluster(mockClient);
MetastoreLocator metastoreLocator = new MockMetastoreLocator(mockClient);
ListeningExecutorService executor = listeningDecorator(newCachedThreadPool(daemonThreadsNamed("test-%s")));
ThriftHiveMetastore thriftHiveMetastore = new ThriftHiveMetastore(mockHiveCluster, new ThriftHiveMetastoreConfig());
ThriftHiveMetastore thriftHiveMetastore = new ThriftHiveMetastore(metastoreLocator, new ThriftHiveMetastoreConfig());
metastore = new CachingHiveMetastore(
new BridgingHiveMetastore(thriftHiveMetastore),
executor,
Expand Down Expand Up @@ -257,12 +257,12 @@ public void testNoCacheExceptions()
assertEquals(mockClient.getAccessCount(), 2);
}

private static class MockHiveCluster
implements HiveCluster
private static class MockMetastoreLocator
implements MetastoreLocator
{
private final HiveMetastoreClient client;

private MockHiveCluster(HiveMetastoreClient client)
private MockMetastoreLocator(HiveMetastoreClient client)
{
this.client = client;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,13 @@
import java.util.List;
import java.util.Optional;

import static io.airlift.testing.Assertions.assertContains;
import static java.util.Arrays.asList;
import static java.util.Collections.singletonList;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.fail;

public class TestStaticHiveCluster
public class TestStaticMetastoreLocator
{
private static final HiveMetastoreClient DEFAULT_CLIENT = createFakeMetastoreClient();
private static final HiveMetastoreClient FALLBACK_CLIENT = createFakeMetastoreClient();
Expand All @@ -50,61 +49,57 @@ public class TestStaticHiveCluster
public void testDefaultHiveMetastore()
throws TException
{
HiveCluster cluster = createHiveCluster(CONFIG_WITH_FALLBACK, singletonList(DEFAULT_CLIENT));
assertEquals(cluster.createMetastoreClient(), DEFAULT_CLIENT);
MetastoreLocator locator = createMetastoreLocator(CONFIG_WITH_FALLBACK, singletonList(DEFAULT_CLIENT));
assertEquals(locator.createMetastoreClient(), DEFAULT_CLIENT);
}

@Test
public void testFallbackHiveMetastore()
throws TException
{
HiveCluster cluster = createHiveCluster(CONFIG_WITH_FALLBACK, asList(null, null, FALLBACK_CLIENT));
assertEquals(cluster.createMetastoreClient(), FALLBACK_CLIENT);
MetastoreLocator locator = createMetastoreLocator(CONFIG_WITH_FALLBACK, asList(null, null, FALLBACK_CLIENT));
assertEquals(locator.createMetastoreClient(), FALLBACK_CLIENT);
}

@Test
public void testFallbackHiveMetastoreFails()
{
HiveCluster cluster = createHiveCluster(CONFIG_WITH_FALLBACK, asList(null, null, null));
assertCreateClientFails(cluster, "Failed connecting to Hive metastore: [default:8080, fallback:8090, fallback2:8090]");
MetastoreLocator locator = createMetastoreLocator(CONFIG_WITH_FALLBACK, asList(null, null, null));
assertCreateClientFails(locator, "Failed connecting to Hive metastore: [default:8080, fallback:8090, fallback2:8090]");
}

@Test
public void testMetastoreFailedWithoutFallback()
{
HiveCluster cluster = createHiveCluster(CONFIG_WITHOUT_FALLBACK, singletonList(null));
assertCreateClientFails(cluster, "Failed connecting to Hive metastore: [default:8080]");
MetastoreLocator locator = createMetastoreLocator(CONFIG_WITHOUT_FALLBACK, singletonList(null));
assertCreateClientFails(locator, "Failed connecting to Hive metastore: [default:8080]");
}

@Test
public void testFallbackHiveMetastoreWithHiveUser()
throws TException
{
HiveCluster cluster = createHiveCluster(CONFIG_WITH_FALLBACK_WITH_USER, asList(null, null, FALLBACK_CLIENT));
assertEquals(cluster.createMetastoreClient(), FALLBACK_CLIENT);
MetastoreLocator locator = createMetastoreLocator(CONFIG_WITH_FALLBACK_WITH_USER, asList(null, null, FALLBACK_CLIENT));
assertEquals(locator.createMetastoreClient(), FALLBACK_CLIENT);
}

@Test
public void testMetastoreFailedWithoutFallbackWithHiveUser()
{
HiveCluster cluster = createHiveCluster(CONFIG_WITHOUT_FALLBACK_WITH_USER, singletonList(null));
assertCreateClientFails(cluster, "Failed connecting to Hive metastore: [default:8080]");
MetastoreLocator locator = createMetastoreLocator(CONFIG_WITHOUT_FALLBACK_WITH_USER, singletonList(null));
assertCreateClientFails(locator, "Failed connecting to Hive metastore: [default:8080]");
}

private static void assertCreateClientFails(HiveCluster cluster, String message)
private static void assertCreateClientFails(MetastoreLocator locator, String message)
{
try {
cluster.createMetastoreClient();
fail("expected exception");
}
catch (TException e) {
assertContains(e.getMessage(), message);
}
assertThatThrownBy(locator::createMetastoreClient)
.hasCauseInstanceOf(TException.class)
.hasMessage(message);
}

private static HiveCluster createHiveCluster(StaticMetastoreConfig config, List<HiveMetastoreClient> clients)
private static MetastoreLocator createMetastoreLocator(StaticMetastoreConfig config, List<HiveMetastoreClient> clients)
{
return new StaticHiveCluster(config, new MockHiveMetastoreClientFactory(Optional.empty(), new Duration(1, SECONDS), clients));
return new StaticMetastoreLocator(config, new MockHiveMetastoreClientFactory(Optional.empty(), new Duration(1, SECONDS), clients));
}

private static HiveMetastoreClient createFakeMetastoreClient()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@

import static java.util.Objects.requireNonNull;

public class TestingHiveCluster
implements HiveCluster
public class TestingMetastoreLocator
implements MetastoreLocator
{
private final HiveClientConfig config;
private final HostAndPort address;

public TestingHiveCluster(HiveClientConfig config, String host, int port)
public TestingMetastoreLocator(HiveClientConfig config, String host, int port)
{
this.config = requireNonNull(config, "config is null");
this.address = HostAndPort.fromParts(requireNonNull(host, "host is null"), port);
Expand Down

0 comments on commit 6eb271f

Please sign in to comment.