Skip to content

Commit

Permalink
[NOCOMMIT] Hacky ingestion source wiring ideas
Browse files Browse the repository at this point in the history
  • Loading branch information
msfroh committed Nov 20, 2024
1 parent 5068fad commit ffb98d6
Show file tree
Hide file tree
Showing 5 changed files with 58 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.opensearch.common.settings.Setting.Property;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.xcontent.XContentHelper;
import org.opensearch.common.xcontent.json.JsonXContent;
import org.opensearch.core.Assertions;
import org.opensearch.core.common.Strings;
import org.opensearch.core.common.io.stream.BufferedChecksumStreamOutput;
Expand Down Expand Up @@ -661,6 +662,7 @@ public static APIBlock readFrom(StreamInput input) throws IOException {
public static final String REMOTE_STORE_CUSTOM_KEY = "remote_store";
public static final String TRANSLOG_METADATA_KEY = "translog_metadata";
public static final String CONTEXT_KEY = "context";
public static final String KEY_INGESTION_SOURCE = "ingestion_source";

public static final String INDEX_STATE_FILE_PREFIX = "state-";

Expand Down Expand Up @@ -1352,6 +1354,38 @@ public static Builder builder(IndexMetadata indexMetadata) {
return new Builder(indexMetadata);
}

public static class IngestionSource {
public String type;
public Map<String, Object> config;

public static IngestionSource fromXContent(XContentParser parser) throws IOException {
if (parser.currentToken() == XContentParser.Token.START_OBJECT) { // on a start object move to next token
parser.nextToken();
}
String currentField = null;
String type;
JsonXContent config = null;
while (parser.nextToken() != XContentParser.Token.END_OBJECT) {
if (parser.nextToken() == XContentParser.Token.FIELD_NAME) {
currentField = parser.currentName();
} else if ("type".equals(currentField)) {
type = parser.text();
} else if ("config".equals(currentField)) {
XContentBuilder.builder(JsonXContent.jsonXContent)
config = parser.map();
}
}
config = Map.of(
"topic", "test",
"bootstrapServer", "localhost:9092",
"Props", Map.of(
"auto.commit", false
)
);
}
}


/**
* Builder of index metadata.
*
Expand Down Expand Up @@ -1474,6 +1508,10 @@ public Builder settings(Settings settings) {
return this;
}

public Builder ingestionSource(IngestionSource source) {
//
}

public MappingMetadata mapping() {
return mappings.get(MapperService.SINGLE_MAPPING_NAME);
}
Expand Down Expand Up @@ -1956,6 +1994,8 @@ public static IndexMetadata fromXContent(XContentParser parser) throws IOExcepti
parser.skipChildren();
} else if (CONTEXT_KEY.equals(currentFieldName)) {
builder.context(Context.fromXContent(parser));
} else if (KEY_INGESTION_SOURCE.equals(currentFieldName)) {
builder.ingestionSourceConfig(IngestionSource.fromXcontent(parser));
} else {
// assume it's custom index metadata
builder.putCustom(currentFieldName, parser.mapStrings());
Expand Down
3 changes: 2 additions & 1 deletion server/src/main/java/org/opensearch/index/IndexModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -741,7 +741,8 @@ public IndexService newIndexService(
remoteStoreSettings,
fileCache,
compositeIndexSettings,
replicator
replicator,
ingestionSourceFactory
);
success = true;
return indexService;
Expand Down
5 changes: 4 additions & 1 deletion server/src/main/java/org/opensearch/index/IndexService.java
Original file line number Diff line number Diff line change
Expand Up @@ -662,6 +662,8 @@ protected void closeInternal() {
path
);
eventListener.onStoreCreated(shardId);

IngestionSource ingestionSource = ingestionSourceFactory.create(shardId);
indexShard = new IndexShard(
routing,
this.indexSettings,
Expand Down Expand Up @@ -691,7 +693,8 @@ protected void closeInternal() {
recoverySettings,
remoteStoreSettings,
seedRemote,
discoveryNodes
discoveryNodes,
ingestionSource
);
eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created");
eventListener.afterIndexShardCreated(indexShard);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,7 @@ public class IndicesService extends AbstractLifecycleComponent
private final Collection<Function<IndexSettings, Optional<EngineFactory>>> engineFactoryProviders;
private final Map<String, IndexStorePlugin.DirectoryFactory> directoryFactories;
private final Map<String, IndexStorePlugin.RecoveryStateFactory> recoveryStateFactories;
private final Map<String, IngestionSourceFactoryProvider> ingestionSourceFactoryProviders;
final AbstractRefCounted indicesRefCount; // pkg-private for testing
private final CountDownLatch closeLatch = new CountDownLatch(1);
private volatile boolean idFieldDataEnabled;
Expand Down Expand Up @@ -949,6 +950,9 @@ private synchronized IndexService createIndexService(
indexCreationContext
);

IndexMetadata.IngestionSourceConfig ingestionSource = indexMetadata.getIngestionSource();
IngestionSourceFactory ingestionSourceFactory = ingestionSourceFactoryProviderRegistry.get(ingestionSource.type).create(ingestionSource.config);

final IndexModule indexModule = new IndexModule(
idxSettings,
analysisRegistry,
Expand All @@ -959,7 +963,8 @@ private synchronized IndexService createIndexService(
indexNameExpressionResolver,
recoveryStateFactories,
fileCache,
compositeIndexSettings
compositeIndexSettings,
ingestionSourceFactory
);
for (IndexingOperationListener operationListener : indexingOperationListeners) {
indexModule.addIndexOperationListener(operationListener);
Expand Down
7 changes: 6 additions & 1 deletion server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -919,6 +919,10 @@ protected Node(
CacheModule cacheModule = new CacheModule(pluginsService.filterPlugins(CachePlugin.class), settings);
CacheService cacheService = cacheModule.getCacheService();
final SegmentReplicator segmentReplicator = new SegmentReplicator(threadPool);
Map<String, IngestionSourceFactoryProvider> ingestionSourceFactoryProviders = new HashMap<>();
pluginsService.filterPlugins(IngestionSourcePlugin.class)
.stream()
.forEach(p -> ingestionSourceFactoryProviders.putAll(p.getIngestionSourceFactoryProviders()));
final IndicesService indicesService = new IndicesService(
settings,
pluginsService,
Expand Down Expand Up @@ -949,7 +953,8 @@ protected Node(
remoteStoreSettings,
fileCache,
compositeIndexSettings,
segmentReplicator::startReplication
segmentReplicator::startReplication,
ingestionSourceFactoryProviders
);

final IngestService ingestService = new IngestService(
Expand Down

0 comments on commit ffb98d6

Please sign in to comment.