Skip to content

Commit

Permalink
REST API for GET,PUT and DELETE
Browse files Browse the repository at this point in the history
Signed-off-by: vamsi-amazon <reddyvam@amazon.com>
  • Loading branch information
vamsi-amazon committed Apr 4, 2023
1 parent 23cc0f6 commit e0dd50c
Show file tree
Hide file tree
Showing 30 changed files with 1,368 additions and 131 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package org.opensearch.sql.datasource;

import org.opensearch.sql.datasource.model.DataSource;
import org.opensearch.sql.datasource.model.DataSourceMetadata;

/**
* Interface for DataSourceLoaderCache which provides methods for
* fetch, loading and invalidating DataSource cache.
*/
public interface DataSourceLoaderCache {

/**
* Returns cached datasource object or loads a new one if not present.
*
* @param dataSourceMetadata {@link DataSourceMetadata}.
* @return {@link DataSource}
*/
DataSource getOrLoadDataSource(DataSourceMetadata dataSourceMetadata);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package org.opensearch.sql.datasource;

import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.opensearch.sql.datasource.model.DataSource;
import org.opensearch.sql.datasource.model.DataSourceMetadata;
import org.opensearch.sql.datasource.model.DataSourceType;
import org.opensearch.sql.storage.DataSourceFactory;

/**
* Default implementation of DataSourceLoaderCache. This implementation
* utilizes Google Guava Cache {@link Cache} for caching DataSource objects
* against {@link DataSourceMetadata}. Expires the cache objects every 24 hrs after
* the last access.
*/
public class DataSourceLoaderCacheImpl implements DataSourceLoaderCache {
private final Map<DataSourceType, DataSourceFactory> dataSourceFactoryMap;
private final Cache<DataSourceMetadata, DataSource> dataSourceCache;

/**
* DataSourceLoaderCacheImpl constructor.
*
* @param dataSourceFactorySet set of {@link DataSourceFactory}.
*/
public DataSourceLoaderCacheImpl(Set<DataSourceFactory> dataSourceFactorySet) {
this.dataSourceFactoryMap = dataSourceFactorySet.stream()
.collect(Collectors.toMap(DataSourceFactory::getDataSourceType, f -> f));
this.dataSourceCache = CacheBuilder.newBuilder()
.maximumSize(1000)
.expireAfterAccess(24, TimeUnit.HOURS)
.build();
}

@Override
public DataSource getOrLoadDataSource(DataSourceMetadata dataSourceMetadata) {
DataSource dataSource = this.dataSourceCache.getIfPresent(dataSourceMetadata);
if (dataSource == null) {
dataSource = this.dataSourceFactoryMap.get(dataSourceMetadata.getConnector())
.createDataSource(dataSourceMetadata);
this.dataSourceCache.put(dataSourceMetadata, dataSource);
return dataSource;
}
return dataSource;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,21 @@ public interface DataSourceService {
* Returns all dataSource Metadata objects. The returned objects won't contain
* any of the credential info.
*
* @param isDefaultDataSourceRequired is used to specify
* if default opensearch connector is required in the output list.
* @return set of {@link DataSourceMetadata}.
*/
Set<DataSourceMetadata> getDataSourceMetadataSet();
Set<DataSourceMetadata> getDataSourceMetadata(boolean isDefaultDataSourceRequired);


/**
* Returns dataSourceMetadata object with specific name.
* The returned objects won't contain any crendetial info.
*
* @param name name of the {@link DataSource}.
* @return set of {@link DataSourceMetadata}.
*/
DataSourceMetadata getDataSourceMetadata(String name);

/**
* Register {@link DataSource} defined by {@link DataSourceMetadata}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@

import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
Expand All @@ -17,7 +21,9 @@
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import javax.xml.crypto.Data;
import org.opensearch.sql.common.utils.StringUtils;
import org.opensearch.sql.datasource.exceptions.DataSourceNotFoundException;
import org.opensearch.sql.datasource.model.DataSource;
import org.opensearch.sql.datasource.model.DataSourceMetadata;
import org.opensearch.sql.datasource.model.DataSourceType;
Expand All @@ -35,9 +41,7 @@ public class DataSourceServiceImpl implements DataSourceService {

private static String DATASOURCE_NAME_REGEX = "[@*A-Za-z]+?[*a-zA-Z_\\-0-9]*";

private final ConcurrentHashMap<DataSourceMetadata, DataSource> dataSourceMap;

private final Map<DataSourceType, DataSourceFactory> dataSourceFactoryMap;
private final DataSourceLoaderCache dataSourceLoaderCache;

private final DataSourceMetadataStorage dataSourceMetadataStorage;

Expand All @@ -50,57 +54,80 @@ public DataSourceServiceImpl(Set<DataSourceFactory> dataSourceFactories,
DataSourceMetadataStorage dataSourceMetadataStorage,
DataSourceUserAuthorizationHelper
dataSourceUserAuthorizationHelper) {
dataSourceFactoryMap =
dataSourceFactories.stream()
.collect(Collectors.toMap(DataSourceFactory::getDataSourceType, f -> f));
dataSourceMap = new ConcurrentHashMap<>();
this.dataSourceMetadataStorage = dataSourceMetadataStorage;
this.dataSourceUserAuthorizationHelper = dataSourceUserAuthorizationHelper;
this.dataSourceLoaderCache = new DataSourceLoaderCacheImpl(dataSourceFactories);
}

@Override
public Set<DataSourceMetadata> getDataSourceMetadataSet() {
public Set<DataSourceMetadata> getDataSourceMetadata(boolean isDefaultDataSourceRequired) {
List<DataSourceMetadata> dataSourceMetadataList
= this.dataSourceMetadataStorage.getDataSourceMetadata();
Set<DataSourceMetadata> dataSourceMetadataSet = new HashSet<>(dataSourceMetadataList);
dataSourceMetadataSet.add(DataSourceMetadata.defaultOpenSearchDataSourceMetadata());
if (isDefaultDataSourceRequired) {
dataSourceMetadataSet.add(DataSourceMetadata.defaultOpenSearchDataSourceMetadata());
}
removeAuthInfo(dataSourceMetadataSet);
return dataSourceMetadataSet;
}

@Override
public DataSourceMetadata getDataSourceMetadata(String datasourceName) {
Optional<DataSourceMetadata> dataSourceMetadataOptional
= this.dataSourceMetadataStorage.getDataSourceMetadata(datasourceName);
if (dataSourceMetadataOptional.isEmpty()) {
throw new IllegalArgumentException("DataSource with name: " + datasourceName
+ " doesn't exist.");
}
removeAuthInfo(dataSourceMetadataOptional.get());
return dataSourceMetadataOptional.get();
}


@Override
public DataSource getDataSource(String dataSourceName) {
Optional<DataSourceMetadata>
dataSourceMetadataOptional = getDataSourceMetadata(dataSourceName);
if (dataSourceMetadataOptional.isEmpty()) {
throw new IllegalArgumentException(
throw new DataSourceNotFoundException(
String.format("DataSource with name %s doesn't exist.", dataSourceName));
} else {
DataSourceMetadata dataSourceMetadata = dataSourceMetadataOptional.get();
this.dataSourceUserAuthorizationHelper
.authorizeDataSource(dataSourceMetadata);
return getDataSourceFromMetadata(dataSourceMetadata);
return dataSourceLoaderCache.getOrLoadDataSource(dataSourceMetadata);
}
}

@Override
public void createDataSource(DataSourceMetadata metadata) {
validateDataSourceMetaData(metadata);
if (!metadata.getName().equals(DEFAULT_DATASOURCE_NAME)) {
this.dataSourceLoaderCache.getOrLoadDataSource(metadata);
this.dataSourceMetadataStorage.createDataSourceMetadata(metadata);
}
dataSourceMap.put(metadata,
dataSourceFactoryMap.get(metadata.getConnector()).createDataSource(metadata));
}

@Override
public void updateDataSource(DataSourceMetadata dataSourceMetadata) {
throw new UnsupportedOperationException("will be supported in future");
validateDataSourceMetaData(dataSourceMetadata);
if (!dataSourceMetadata.getName().equals(DEFAULT_DATASOURCE_NAME)) {
this.dataSourceLoaderCache.getOrLoadDataSource(dataSourceMetadata);
this.dataSourceMetadataStorage.updateDataSourceMetadata(dataSourceMetadata);
} else {
throw new UnsupportedOperationException(
"Not allowed to update default datasource :" + DEFAULT_DATASOURCE_NAME);
}
}

@Override
public void deleteDataSource(String dataSourceName) {
throw new UnsupportedOperationException("will be supported in future");
if (dataSourceName.equals(DEFAULT_DATASOURCE_NAME)) {
throw new UnsupportedOperationException(
"Not allowed to delete default datasource :" + DEFAULT_DATASOURCE_NAME);
} else {
this.dataSourceMetadataStorage.deleteDataSourceMetadata(dataSourceName);
}
}

@Override
Expand Down Expand Up @@ -138,19 +165,19 @@ private Optional<DataSourceMetadata> getDataSourceMetadata(String dataSourceName
}
}

private DataSource getDataSourceFromMetadata(DataSourceMetadata dataSourceMetadata) {
if (!dataSourceMap.containsKey(dataSourceMetadata)) {
clearDataSource(dataSourceMetadata);
dataSourceMap.put(dataSourceMetadata,
dataSourceFactoryMap.get(dataSourceMetadata.getConnector())
.createDataSource(dataSourceMetadata));
}
return dataSourceMap.get(dataSourceMetadata);
}

private void clearDataSource(DataSourceMetadata dataSourceMetadata) {
dataSourceMap.entrySet()
.removeIf(entry -> entry.getKey().getName().equals(dataSourceMetadata.getName()));
// It is advised to avoid sending any kind credential
// info in api response from security point of view.
private void removeAuthInfo(Set<DataSourceMetadata> dataSourceMetadataSet) {
dataSourceMetadataSet.forEach(this::removeAuthInfo);
}

private void removeAuthInfo(DataSourceMetadata dataSourceMetadata) {
HashMap<String, String> safeProperties
= new HashMap<>(dataSourceMetadata.getProperties());
safeProperties
.entrySet()
.removeIf(entry -> entry.getKey().contains("auth"));
dataSourceMetadata.setProperties(safeProperties);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
/*
*
* * Copyright OpenSearch Contributors
* * SPDX-License-Identifier: Apache-2.0
*
*/

package org.opensearch.sql.datasource.exceptions;

/**
* DataSourceNotFoundException.
*/
public class DataSourceNotFoundException extends RuntimeException {
public DataSourceNotFoundException(String message) {
super(message);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public String explain() {
public void open() {
List<ExprValue> exprValues = new ArrayList<>();
Set<DataSourceMetadata> dataSourceMetadataSet
= dataSourceService.getDataSourceMetadataSet();
= dataSourceService.getDataSourceMetadata(true);
for (DataSourceMetadata dataSourceMetadata : dataSourceMetadataSet) {
exprValues.add(
new ExprTupleValue(new LinkedHashMap<>(ImmutableMap.of(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

package org.opensearch.sql.storage;

import java.util.Map;
import org.opensearch.sql.datasource.DataSourceService;
import org.opensearch.sql.datasource.model.DataSource;
import org.opensearch.sql.datasource.model.DataSourceMetadata;
Expand All @@ -28,4 +29,5 @@ public interface DataSourceFactory {
* Create {@link DataSource}.
*/
DataSource createDataSource(DataSourceMetadata metadata);

}
Original file line number Diff line number Diff line change
Expand Up @@ -185,13 +185,18 @@ private class DefaultDataSourceService implements DataSourceService {


@Override
public Set<DataSourceMetadata> getDataSourceMetadataSet() {
public Set<DataSourceMetadata> getDataSourceMetadata(boolean isDefaultDataSourceRequired) {
return Stream.of(opensearchDataSource, prometheusDataSource)
.map(ds -> new DataSourceMetadata(ds.getName(),
ds.getConnectorType(),Collections.emptyList(),
ImmutableMap.of())).collect(Collectors.toSet());
}

@Override
public DataSourceMetadata getDataSourceMetadata(String name) {
return null;
}

@Override
public void createDataSource(DataSourceMetadata metadata) {
throw new UnsupportedOperationException("unsupported operation");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package org.opensearch.sql.datasource;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;

import com.google.common.collect.ImmutableMap;
import java.util.Collections;
import java.util.List;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.opensearch.sql.datasource.model.DataSource;
import org.opensearch.sql.datasource.model.DataSourceMetadata;
import org.opensearch.sql.datasource.model.DataSourceType;
import org.opensearch.sql.storage.DataSourceFactory;
import org.opensearch.sql.storage.StorageEngine;

@ExtendWith(MockitoExtension.class)
class DataSourceLoaderCacheImplTest {

@Mock
private DataSourceFactory dataSourceFactory;

@Mock
private StorageEngine storageEngine;

@BeforeEach
public void setup() {
lenient()
.doAnswer(
invocation -> {
DataSourceMetadata metadata = invocation.getArgument(0);
return new DataSource(metadata.getName(), metadata.getConnector(), storageEngine);
})
.when(dataSourceFactory)
.createDataSource(any());
when(dataSourceFactory.getDataSourceType()).thenReturn(DataSourceType.OPENSEARCH);
}

@Test
void testGetOrLoadDataSource() {
DataSourceLoaderCache dataSourceLoaderCache =
new DataSourceLoaderCacheImpl(Collections.singleton(dataSourceFactory));
DataSourceMetadata dataSourceMetadata = new DataSourceMetadata();
dataSourceMetadata.setName("testDS");
dataSourceMetadata.setConnector(DataSourceType.OPENSEARCH);
dataSourceMetadata.setAllowedRoles(Collections.emptyList());
dataSourceMetadata.setProperties(ImmutableMap.of());
DataSource dataSource = dataSourceLoaderCache.getOrLoadDataSource(dataSourceMetadata);
verify(dataSourceFactory, times(1)).createDataSource(dataSourceMetadata);
Assertions.assertEquals(dataSource,
dataSourceLoaderCache.getOrLoadDataSource(dataSourceMetadata));
verifyNoMoreInteractions(dataSourceFactory);
}

@Test
void testGetOrLoadDataSourceWithMetadataUpdate() {
DataSourceLoaderCache dataSourceLoaderCache =
new DataSourceLoaderCacheImpl(Collections.singleton(dataSourceFactory));
DataSourceMetadata dataSourceMetadata = getMetadata();
dataSourceLoaderCache.getOrLoadDataSource(dataSourceMetadata);
dataSourceLoaderCache.getOrLoadDataSource(dataSourceMetadata);
dataSourceMetadata.setAllowedRoles(List.of("testDS_access"));
dataSourceLoaderCache.getOrLoadDataSource(dataSourceMetadata);
dataSourceLoaderCache.getOrLoadDataSource(dataSourceMetadata);
verify(dataSourceFactory, times(2)).createDataSource(dataSourceMetadata);
}

private DataSourceMetadata getMetadata() {
DataSourceMetadata dataSourceMetadata = new DataSourceMetadata();
dataSourceMetadata.setName("testDS");
dataSourceMetadata.setConnector(DataSourceType.OPENSEARCH);
dataSourceMetadata.setAllowedRoles(Collections.emptyList());
dataSourceMetadata.setProperties(ImmutableMap.of());
return dataSourceMetadata;
}

}
Loading

0 comments on commit e0dd50c

Please sign in to comment.