Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add SnapshotLifecycleService and related CRUD APIs #39795

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,12 @@
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.rollup.job;

import org.elasticsearch.xpack.core.scheduler.Cron;
import org.elasticsearch.xpack.core.scheduler.SchedulerEngine;
package org.elasticsearch.xpack.core.scheduler;

public class CronSchedule implements SchedulerEngine.Schedule {
private final Cron cron;

CronSchedule(String cronExpression) {
public CronSchedule(String cronExpression) {
this.cron = new Cron(cronExpression);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,12 @@

import java.time.Clock;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
Expand Down Expand Up @@ -136,6 +139,10 @@ public void stop() {
}
}

public Set<String> scheduledJobIds() {
return Collections.unmodifiableSet(new HashSet<>(schedules.keySet()));
}

public void add(Job job) {
ActiveSchedule schedule = new ActiveSchedule(job.getId(), job.getSchedule(), clock.millis());
schedules.compute(schedule.name, (name, previousSchedule) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package org.elasticsearch.xpack.indexlifecycle;

import org.apache.lucene.util.SetOnce;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.client.Client;
Expand All @@ -23,6 +24,7 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsFilter;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.env.Environment;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.plugins.ActionPlugin;
Expand All @@ -42,9 +44,9 @@
import org.elasticsearch.xpack.core.indexlifecycle.LifecycleAction;
import org.elasticsearch.xpack.core.indexlifecycle.LifecycleSettings;
import org.elasticsearch.xpack.core.indexlifecycle.LifecycleType;
import org.elasticsearch.xpack.core.indexlifecycle.SetPriorityAction;
import org.elasticsearch.xpack.core.indexlifecycle.ReadOnlyAction;
import org.elasticsearch.xpack.core.indexlifecycle.RolloverAction;
import org.elasticsearch.xpack.core.indexlifecycle.SetPriorityAction;
import org.elasticsearch.xpack.core.indexlifecycle.ShrinkAction;
import org.elasticsearch.xpack.core.indexlifecycle.TimeseriesLifecycleType;
import org.elasticsearch.xpack.core.indexlifecycle.UnfollowAction;
Expand Down Expand Up @@ -78,7 +80,18 @@
import org.elasticsearch.xpack.indexlifecycle.action.TransportRetryAction;
import org.elasticsearch.xpack.indexlifecycle.action.TransportStartILMAction;
import org.elasticsearch.xpack.indexlifecycle.action.TransportStopILMAction;

import org.elasticsearch.xpack.snapshotlifecycle.SnapshotLifecycleService;
import org.elasticsearch.xpack.snapshotlifecycle.action.DeleteSnapshotLifecycleAction;
import org.elasticsearch.xpack.snapshotlifecycle.action.GetSnapshotLifecycleAction;
import org.elasticsearch.xpack.snapshotlifecycle.action.PutSnapshotLifecycleAction;
import org.elasticsearch.xpack.snapshotlifecycle.action.RestDeleteSnapshotLifecycleAction;
import org.elasticsearch.xpack.snapshotlifecycle.action.RestGetSnapshotLifecycleAction;
import org.elasticsearch.xpack.snapshotlifecycle.action.RestPutSnapshotLifecycleAction;
import org.elasticsearch.xpack.snapshotlifecycle.action.TransportDeleteSnapshotLifecycleAction;
import org.elasticsearch.xpack.snapshotlifecycle.action.TransportGetSnapshotLifecycleAction;
import org.elasticsearch.xpack.snapshotlifecycle.action.TransportPutSnapshotLifecycleAction;

import java.io.IOException;
import java.time.Clock;
import java.util.ArrayList;
import java.util.Arrays;
Expand All @@ -91,6 +104,7 @@

public class IndexLifecycle extends Plugin implements ActionPlugin {
private final SetOnce<IndexLifecycleService> indexLifecycleInitialisationService = new SetOnce<>();
private final SetOnce<SnapshotLifecycleService> snapshotLifecycleService = new SetOnce<>();
private Settings settings;
private boolean enabled;
private boolean transportClientMode;
Expand Down Expand Up @@ -137,12 +151,13 @@ public Collection<Object> createComponents(Client client, ClusterService cluster
}
indexLifecycleInitialisationService.set(new IndexLifecycleService(settings, client, clusterService, threadPool,
getClock(), System::currentTimeMillis, xContentRegistry));
return Collections.singletonList(indexLifecycleInitialisationService.get());
snapshotLifecycleService.set(new SnapshotLifecycleService(settings, client, clusterService, getClock()));
return Arrays.asList(indexLifecycleInitialisationService.get(), snapshotLifecycleService.get());
}

@Override
public List<Entry> getNamedWriteables() {
return Arrays.asList();
return Collections.emptyList();
}

@Override
Expand Down Expand Up @@ -184,7 +199,11 @@ public List<RestHandler> getRestHandlers(Settings settings, RestController restC
new RestRetryAction(settings, restController),
new RestStopAction(settings, restController),
new RestStartILMAction(settings, restController),
new RestGetStatusAction(settings, restController)
new RestGetStatusAction(settings, restController),
// Snapshot lifecycle actions
new RestPutSnapshotLifecycleAction(settings, restController),
new RestDeleteSnapshotLifecycleAction(settings, restController),
new RestGetSnapshotLifecycleAction(settings, restController)
);
}

Expand All @@ -203,14 +222,19 @@ public List<RestHandler> getRestHandlers(Settings settings, RestController restC
new ActionHandler<>(RetryAction.INSTANCE, TransportRetryAction.class),
new ActionHandler<>(StartILMAction.INSTANCE, TransportStartILMAction.class),
new ActionHandler<>(StopILMAction.INSTANCE, TransportStopILMAction.class),
new ActionHandler<>(GetStatusAction.INSTANCE, TransportGetStatusAction.class));
new ActionHandler<>(GetStatusAction.INSTANCE, TransportGetStatusAction.class),
// Snapshot lifecycle actions
new ActionHandler<>(PutSnapshotLifecycleAction.INSTANCE, TransportPutSnapshotLifecycleAction.class),
new ActionHandler<>(DeleteSnapshotLifecycleAction.INSTANCE, TransportDeleteSnapshotLifecycleAction.class),
new ActionHandler<>(GetSnapshotLifecycleAction.INSTANCE, TransportGetSnapshotLifecycleAction.class));
}

@Override
public void close() {
IndexLifecycleService lifecycleService = indexLifecycleInitialisationService.get();
if (lifecycleService != null) {
lifecycleService.close();
try {
IOUtils.close(indexLifecycleInitialisationService.get(), snapshotLifecycleService.get());
} catch (IOException e) {
throw new ElasticsearchException("unable to close index lifecycle services", e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

package org.elasticsearch.xpack.snapshotlifecycle;

import org.elasticsearch.Version;
import org.elasticsearch.cluster.AbstractDiffable;
import org.elasticsearch.cluster.Diff;
import org.elasticsearch.cluster.DiffableUtils;
import org.elasticsearch.cluster.NamedDiff;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.xpack.core.XPackPlugin.XPackMetaDataCustom;

import java.io.IOException;
import java.util.Collections;
import java.util.EnumSet;
import java.util.Map;
import java.util.TreeMap;

/**
* Custom cluster state metadata that stores all the snapshot lifecycle
* policies and their associated metadata
*/
public class SnapshotLifecycleMetadata implements XPackMetaDataCustom {

public static final String TYPE = "snapshot_lifecycle";

private final Map<String, SnapshotLifecyclePolicyMetadata> snapshotConfigurations;

public SnapshotLifecycleMetadata(Map<String, SnapshotLifecyclePolicyMetadata> snapshotConfigurations) {
this.snapshotConfigurations = Collections.unmodifiableMap(snapshotConfigurations);
// TODO: maybe operation mode here so it can be disabled/re-enabled separately like ILM is
}

public SnapshotLifecycleMetadata(StreamInput in) throws IOException {
this.snapshotConfigurations = in.readMap(StreamInput::readString, SnapshotLifecyclePolicyMetadata::new);
}

public Map<String, SnapshotLifecyclePolicyMetadata> getSnapshotConfigurations() {
return this.snapshotConfigurations;
}

@Override
public EnumSet<MetaData.XContentContext> context() {
return MetaData.ALL_CONTEXTS;
}

@Override
public Diff<MetaData.Custom> diff(MetaData.Custom previousState) {
return new SnapshotLifecycleMetadataDiff((SnapshotLifecycleMetadata) previousState, this);
}

@Override
public String getWriteableName() {
return TYPE;
}

@Override
public Version getMinimalSupportedVersion() {
return Version.V_8_0_0; // TODO: revisit this when we figure out where this goes
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeMap(this.snapshotConfigurations, StreamOutput::writeString, (out1, value) -> value.writeTo(out1));
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.field("policies", this.snapshotConfigurations);
return builder;
}

@Override
public String toString() {
return Strings.toString(this);
}

public static class SnapshotLifecycleMetadataDiff implements NamedDiff<MetaData.Custom> {

final Diff<Map<String, SnapshotLifecyclePolicyMetadata>> lifecycles;

SnapshotLifecycleMetadataDiff(SnapshotLifecycleMetadata before, SnapshotLifecycleMetadata after) {
this.lifecycles = DiffableUtils.diff(before.snapshotConfigurations, after.snapshotConfigurations,
DiffableUtils.getStringKeySerializer());
}

@Override
public MetaData.Custom apply(MetaData.Custom part) {
TreeMap<String, SnapshotLifecyclePolicyMetadata> newLifecycles = new TreeMap<>(
lifecycles.apply(((SnapshotLifecycleMetadata) part).snapshotConfigurations));
return new SnapshotLifecycleMetadata(newLifecycles);
}

@Override
public String getWriteableName() {
return TYPE;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
lifecycles.writeTo(out);
}

static Diff<SnapshotLifecyclePolicy> readLifecyclePolicyDiffFrom(StreamInput in) throws IOException {
return AbstractDiffable.readDiffFrom(SnapshotLifecyclePolicy::new, in);
}
}
}
Loading