diff --git a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/CronSchedule.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/scheduler/CronSchedule.java similarity index 75% rename from x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/CronSchedule.java rename to x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/scheduler/CronSchedule.java index 0a093742cdc29..b2c763e47f39c 100644 --- a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/CronSchedule.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/scheduler/CronSchedule.java @@ -3,15 +3,12 @@ * or more contributor license agreements. Licensed under the Elastic License; * you may not use this file except in compliance with the Elastic License. */ -package org.elasticsearch.xpack.rollup.job; - -import org.elasticsearch.xpack.core.scheduler.Cron; -import org.elasticsearch.xpack.core.scheduler.SchedulerEngine; +package org.elasticsearch.xpack.core.scheduler; public class CronSchedule implements SchedulerEngine.Schedule { private final Cron cron; - CronSchedule(String cronExpression) { + public CronSchedule(String cronExpression) { this.cron = new Cron(cronExpression); } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/scheduler/SchedulerEngine.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/scheduler/SchedulerEngine.java index 71784a8e9ebfd..0c062c0687938 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/scheduler/SchedulerEngine.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/scheduler/SchedulerEngine.java @@ -17,9 +17,12 @@ import java.time.Clock; import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Set; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -136,6 +139,10 @@ public void stop() { } } + public Set scheduledJobIds() { + return Collections.unmodifiableSet(new HashSet<>(schedules.keySet())); + } + public void add(Job job) { ActiveSchedule schedule = new ActiveSchedule(job.getId(), job.getSchedule(), clock.millis()); schedules.compute(schedule.name, (name, previousSchedule) -> { diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycle.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycle.java index 2e7d2fbbc555d..664c0d07b74df 100644 --- a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycle.java +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycle.java @@ -6,6 +6,7 @@ package org.elasticsearch.xpack.indexlifecycle; import org.apache.lucene.util.SetOnce; +import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionResponse; import org.elasticsearch.client.Client; @@ -23,6 +24,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.SettingsFilter; import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.env.Environment; import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.plugins.ActionPlugin; @@ -42,9 +44,9 @@ import org.elasticsearch.xpack.core.indexlifecycle.LifecycleAction; import org.elasticsearch.xpack.core.indexlifecycle.LifecycleSettings; import org.elasticsearch.xpack.core.indexlifecycle.LifecycleType; -import org.elasticsearch.xpack.core.indexlifecycle.SetPriorityAction; import org.elasticsearch.xpack.core.indexlifecycle.ReadOnlyAction; import org.elasticsearch.xpack.core.indexlifecycle.RolloverAction; +import org.elasticsearch.xpack.core.indexlifecycle.SetPriorityAction; import org.elasticsearch.xpack.core.indexlifecycle.ShrinkAction; import org.elasticsearch.xpack.core.indexlifecycle.TimeseriesLifecycleType; import org.elasticsearch.xpack.core.indexlifecycle.UnfollowAction; @@ -78,7 +80,18 @@ import org.elasticsearch.xpack.indexlifecycle.action.TransportRetryAction; import org.elasticsearch.xpack.indexlifecycle.action.TransportStartILMAction; import org.elasticsearch.xpack.indexlifecycle.action.TransportStopILMAction; - +import org.elasticsearch.xpack.snapshotlifecycle.SnapshotLifecycleService; +import org.elasticsearch.xpack.snapshotlifecycle.action.DeleteSnapshotLifecycleAction; +import org.elasticsearch.xpack.snapshotlifecycle.action.GetSnapshotLifecycleAction; +import org.elasticsearch.xpack.snapshotlifecycle.action.PutSnapshotLifecycleAction; +import org.elasticsearch.xpack.snapshotlifecycle.action.RestDeleteSnapshotLifecycleAction; +import org.elasticsearch.xpack.snapshotlifecycle.action.RestGetSnapshotLifecycleAction; +import org.elasticsearch.xpack.snapshotlifecycle.action.RestPutSnapshotLifecycleAction; +import org.elasticsearch.xpack.snapshotlifecycle.action.TransportDeleteSnapshotLifecycleAction; +import org.elasticsearch.xpack.snapshotlifecycle.action.TransportGetSnapshotLifecycleAction; +import org.elasticsearch.xpack.snapshotlifecycle.action.TransportPutSnapshotLifecycleAction; + +import java.io.IOException; import java.time.Clock; import java.util.ArrayList; import java.util.Arrays; @@ -91,6 +104,7 @@ public class IndexLifecycle extends Plugin implements ActionPlugin { private final SetOnce indexLifecycleInitialisationService = new SetOnce<>(); + private final SetOnce snapshotLifecycleService = new SetOnce<>(); private Settings settings; private boolean enabled; private boolean transportClientMode; @@ -137,12 +151,13 @@ public Collection createComponents(Client client, ClusterService cluster } indexLifecycleInitialisationService.set(new IndexLifecycleService(settings, client, clusterService, threadPool, getClock(), System::currentTimeMillis, xContentRegistry)); - return Collections.singletonList(indexLifecycleInitialisationService.get()); + snapshotLifecycleService.set(new SnapshotLifecycleService(settings, client, clusterService, getClock())); + return Arrays.asList(indexLifecycleInitialisationService.get(), snapshotLifecycleService.get()); } @Override public List getNamedWriteables() { - return Arrays.asList(); + return Collections.emptyList(); } @Override @@ -184,7 +199,11 @@ public List getRestHandlers(Settings settings, RestController restC new RestRetryAction(settings, restController), new RestStopAction(settings, restController), new RestStartILMAction(settings, restController), - new RestGetStatusAction(settings, restController) + new RestGetStatusAction(settings, restController), + // Snapshot lifecycle actions + new RestPutSnapshotLifecycleAction(settings, restController), + new RestDeleteSnapshotLifecycleAction(settings, restController), + new RestGetSnapshotLifecycleAction(settings, restController) ); } @@ -203,14 +222,19 @@ public List getRestHandlers(Settings settings, RestController restC new ActionHandler<>(RetryAction.INSTANCE, TransportRetryAction.class), new ActionHandler<>(StartILMAction.INSTANCE, TransportStartILMAction.class), new ActionHandler<>(StopILMAction.INSTANCE, TransportStopILMAction.class), - new ActionHandler<>(GetStatusAction.INSTANCE, TransportGetStatusAction.class)); + new ActionHandler<>(GetStatusAction.INSTANCE, TransportGetStatusAction.class), + // Snapshot lifecycle actions + new ActionHandler<>(PutSnapshotLifecycleAction.INSTANCE, TransportPutSnapshotLifecycleAction.class), + new ActionHandler<>(DeleteSnapshotLifecycleAction.INSTANCE, TransportDeleteSnapshotLifecycleAction.class), + new ActionHandler<>(GetSnapshotLifecycleAction.INSTANCE, TransportGetSnapshotLifecycleAction.class)); } @Override public void close() { - IndexLifecycleService lifecycleService = indexLifecycleInitialisationService.get(); - if (lifecycleService != null) { - lifecycleService.close(); + try { + IOUtils.close(indexLifecycleInitialisationService.get(), snapshotLifecycleService.get()); + } catch (IOException e) { + throw new ElasticsearchException("unable to close index lifecycle services", e); } } } diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/snapshotlifecycle/SnapshotLifecycleMetadata.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/snapshotlifecycle/SnapshotLifecycleMetadata.java new file mode 100644 index 0000000000000..dbeb2686affcb --- /dev/null +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/snapshotlifecycle/SnapshotLifecycleMetadata.java @@ -0,0 +1,116 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.snapshotlifecycle; + +import org.elasticsearch.Version; +import org.elasticsearch.cluster.AbstractDiffable; +import org.elasticsearch.cluster.Diff; +import org.elasticsearch.cluster.DiffableUtils; +import org.elasticsearch.cluster.NamedDiff; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.xpack.core.XPackPlugin.XPackMetaDataCustom; + +import java.io.IOException; +import java.util.Collections; +import java.util.EnumSet; +import java.util.Map; +import java.util.TreeMap; + +/** + * Custom cluster state metadata that stores all the snapshot lifecycle + * policies and their associated metadata + */ +public class SnapshotLifecycleMetadata implements XPackMetaDataCustom { + + public static final String TYPE = "snapshot_lifecycle"; + + private final Map snapshotConfigurations; + + public SnapshotLifecycleMetadata(Map snapshotConfigurations) { + this.snapshotConfigurations = Collections.unmodifiableMap(snapshotConfigurations); + // TODO: maybe operation mode here so it can be disabled/re-enabled separately like ILM is + } + + public SnapshotLifecycleMetadata(StreamInput in) throws IOException { + this.snapshotConfigurations = in.readMap(StreamInput::readString, SnapshotLifecyclePolicyMetadata::new); + } + + public Map getSnapshotConfigurations() { + return this.snapshotConfigurations; + } + + @Override + public EnumSet context() { + return MetaData.ALL_CONTEXTS; + } + + @Override + public Diff diff(MetaData.Custom previousState) { + return new SnapshotLifecycleMetadataDiff((SnapshotLifecycleMetadata) previousState, this); + } + + @Override + public String getWriteableName() { + return TYPE; + } + + @Override + public Version getMinimalSupportedVersion() { + return Version.V_8_0_0; // TODO: revisit this when we figure out where this goes + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeMap(this.snapshotConfigurations, StreamOutput::writeString, (out1, value) -> value.writeTo(out1)); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.field("policies", this.snapshotConfigurations); + return builder; + } + + @Override + public String toString() { + return Strings.toString(this); + } + + public static class SnapshotLifecycleMetadataDiff implements NamedDiff { + + final Diff> lifecycles; + + SnapshotLifecycleMetadataDiff(SnapshotLifecycleMetadata before, SnapshotLifecycleMetadata after) { + this.lifecycles = DiffableUtils.diff(before.snapshotConfigurations, after.snapshotConfigurations, + DiffableUtils.getStringKeySerializer()); + } + + @Override + public MetaData.Custom apply(MetaData.Custom part) { + TreeMap newLifecycles = new TreeMap<>( + lifecycles.apply(((SnapshotLifecycleMetadata) part).snapshotConfigurations)); + return new SnapshotLifecycleMetadata(newLifecycles); + } + + @Override + public String getWriteableName() { + return TYPE; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + lifecycles.writeTo(out); + } + + static Diff readLifecyclePolicyDiffFrom(StreamInput in) throws IOException { + return AbstractDiffable.readDiffFrom(SnapshotLifecyclePolicy::new, in); + } + } +} diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/snapshotlifecycle/SnapshotLifecyclePolicy.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/snapshotlifecycle/SnapshotLifecyclePolicy.java new file mode 100644 index 0000000000000..2e5d6495f5b39 --- /dev/null +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/snapshotlifecycle/SnapshotLifecyclePolicy.java @@ -0,0 +1,160 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.snapshotlifecycle; + +import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotRequest; +import org.elasticsearch.cluster.AbstractDiffable; +import org.elasticsearch.cluster.Diffable; +import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.ValidationException; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.xcontent.ConstructingObjectParser; +import org.elasticsearch.common.xcontent.ToXContentObject; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentParser; + +import java.io.IOException; +import java.util.Map; +import java.util.Objects; + +/** + * A {@code SnapshotLifecyclePolicy} is a policy for the cluster including a schedule of when a + * snapshot should be triggered, what the snapshot should be named, what repository it should go + * to, and the configuration for the snapshot itself. + */ +public class SnapshotLifecyclePolicy extends AbstractDiffable + implements Writeable, Diffable, ToXContentObject { + + private final String id; + private final String name; + private final String schedule; + private final String repository; + private final Map configuration; + + private static final ParseField NAME = new ParseField("name"); + private static final ParseField SCHEDULE = new ParseField("schedule"); + private static final ParseField REPOSITORY = new ParseField("repository"); + private static final ParseField CONFIG = new ParseField("config"); + + @SuppressWarnings("unchecked") + private static final ConstructingObjectParser PARSER = + new ConstructingObjectParser<>("snapshot_lifecycle", true, + (a, id) -> { + String name = (String) a[0]; + String schedule = (String) a[1]; + String repo = (String) a[2]; + Map config = (Map) a[3]; + return new SnapshotLifecyclePolicy(id, name, schedule, repo, config); + }); + + static { + PARSER.declareString(ConstructingObjectParser.constructorArg(), NAME); + PARSER.declareString(ConstructingObjectParser.constructorArg(), SCHEDULE); + PARSER.declareString(ConstructingObjectParser.constructorArg(), REPOSITORY); + PARSER.declareObject(ConstructingObjectParser.constructorArg(), (p, c) -> p.map(), CONFIG); + } + + public SnapshotLifecyclePolicy(final String id, final String name, final String schedule, + final String repository, Map configuration) { + this.id = id; + this.name = name; + this.schedule = schedule; + this.repository = repository; + this.configuration = configuration; + } + + public SnapshotLifecyclePolicy(StreamInput in) throws IOException { + this.id = in.readString(); + this.name = in.readString(); + this.schedule = in.readString(); + this.repository = in.readString(); + this.configuration = in.readMap(); + } + + public String getId() { + return this.id; + } + + public String getName() { + return this.name; + } + + public String getSchedule() { + return this.schedule; + } + + public String getRepository() { + return this.repository; + } + + public Map getConfig() { + return this.configuration; + } + + public ValidationException validate() { + // TODO: implement validation + return null; + } + + public CreateSnapshotRequest toRequest() { + throw new UnsupportedOperationException("implement me"); + } + + public static SnapshotLifecyclePolicy parse(XContentParser parser, String id) { + return PARSER.apply(parser, id); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeString(this.id); + out.writeString(this.name); + out.writeString(this.schedule); + out.writeString(this.repository); + out.writeMap(this.configuration); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field(NAME.getPreferredName(), this.name); + builder.field(SCHEDULE.getPreferredName(), this.schedule); + builder.field(REPOSITORY.getPreferredName(), this.repository); + builder.field(CONFIG.getPreferredName(), this.configuration); + builder.endObject(); + return builder; + } + + @Override + public int hashCode() { + return Objects.hash(id, name, schedule, repository, configuration); + } + + @Override + public boolean equals(Object obj) { + if (obj == null) { + return false; + } + + if (obj.getClass() != getClass()) { + return false; + } + SnapshotLifecyclePolicy other = (SnapshotLifecyclePolicy) obj; + return Objects.equals(id, other.id) && + Objects.equals(name, other.name) && + Objects.equals(schedule, other.schedule) && + Objects.equals(repository, other.repository) && + Objects.equals(configuration, other.configuration); + } + + @Override + public String toString() { + return Strings.toString(this); + } +} diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/snapshotlifecycle/SnapshotLifecyclePolicyMetadata.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/snapshotlifecycle/SnapshotLifecyclePolicyMetadata.java new file mode 100644 index 0000000000000..ab52ab84d9944 --- /dev/null +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/snapshotlifecycle/SnapshotLifecyclePolicyMetadata.java @@ -0,0 +1,149 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.snapshotlifecycle; + +import org.elasticsearch.cluster.AbstractDiffable; +import org.elasticsearch.cluster.Diffable; +import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.xcontent.ConstructingObjectParser; +import org.elasticsearch.common.xcontent.ObjectParser; +import org.elasticsearch.common.xcontent.ToXContentObject; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentParser; + +import java.io.IOException; +import java.time.Instant; +import java.time.ZoneOffset; +import java.time.ZonedDateTime; +import java.util.Map; +import java.util.Objects; + +/** + * {@code SnapshotLifecyclePolicyMetadata} encapsulates a {@link SnapshotLifecyclePolicy} as well as + * the additional meta information link headers used for execution, version (a monotonically + * incrementing number), and last modified date + */ +public class SnapshotLifecyclePolicyMetadata extends AbstractDiffable + implements ToXContentObject, Diffable { + + static final ParseField POLICY = new ParseField("policy"); + static final ParseField HEADERS = new ParseField("headers"); + static final ParseField VERSION = new ParseField("version"); + static final ParseField MODIFIED_DATE = new ParseField("modified_date"); + static final ParseField MODIFIED_DATE_STRING = new ParseField("modified_date_string"); + + private final SnapshotLifecyclePolicy policy; + private final Map headers; + private final long version; + private final long modifiedDate; + + @SuppressWarnings("unchecked") + public static final ConstructingObjectParser PARSER = + new ConstructingObjectParser<>("snapshot_policy_metadata", + a -> { + SnapshotLifecyclePolicy policy = (SnapshotLifecyclePolicy) a[0]; + return new SnapshotLifecyclePolicyMetadata(policy, (Map) a[1], (long) a[2], (long) a[3]); + }); + + static { + PARSER.declareObject(ConstructingObjectParser.constructorArg(), SnapshotLifecyclePolicy::parse, POLICY); + PARSER.declareField(ConstructingObjectParser.constructorArg(), XContentParser::mapStrings, HEADERS, ObjectParser.ValueType.OBJECT); + PARSER.declareLong(ConstructingObjectParser.constructorArg(), VERSION); + PARSER.declareLong(ConstructingObjectParser.constructorArg(), MODIFIED_DATE); + PARSER.declareString(ConstructingObjectParser.constructorArg(), MODIFIED_DATE_STRING); + } + + public SnapshotLifecyclePolicyMetadata(SnapshotLifecyclePolicy policy, Map headers, long version, long modifiedDate) { + this.policy = policy; + this.headers = headers; + this.version = version; + this.modifiedDate = modifiedDate; + } + + @SuppressWarnings("unchecked") + SnapshotLifecyclePolicyMetadata(StreamInput in) throws IOException { + this.policy = new SnapshotLifecyclePolicy(in); + this.headers = (Map) in.readGenericValue(); + this.version = in.readVLong(); + this.modifiedDate = in.readVLong(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + this.policy.writeTo(out); + out.writeGenericValue(this.headers); + out.writeVLong(this.version); + out.writeVLong(this.modifiedDate); + } + + public Map getHeaders() { + return headers; + } + + public SnapshotLifecyclePolicy getPolicy() { + return policy; + } + + public String getName() { + return policy.getName(); + } + + public long getVersion() { + return version; + } + + public long getModifiedDate() { + return modifiedDate; + } + + public String getModifiedDateString() { + ZonedDateTime modifiedDateTime = ZonedDateTime.ofInstant(Instant.ofEpochMilli(modifiedDate), ZoneOffset.UTC); + return modifiedDateTime.toString(); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field(POLICY.getPreferredName(), policy); + builder.field(HEADERS.getPreferredName(), headers); + builder.field(VERSION.getPreferredName(), version); + builder.field(MODIFIED_DATE.getPreferredName(), modifiedDate); + builder.field(MODIFIED_DATE_STRING.getPreferredName(), getModifiedDateString()); + builder.endObject(); + return builder; + } + + @Override + public int hashCode() { + return Objects.hash(policy, headers, version, modifiedDate); + } + + @Override + public boolean equals(Object obj) { + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + SnapshotLifecyclePolicyMetadata other = (SnapshotLifecyclePolicyMetadata) obj; + return Objects.equals(policy, other.policy) && + Objects.equals(headers, other.headers) && + Objects.equals(version, other.version) && + Objects.equals(modifiedDate, other.modifiedDate); + } + + @Override + public String toString() { + // Note: this is on purpose. While usually we would use Strings.toString(this) to render + // this using toXContent, it may contain sensitive information in the headers and thus + // should not emit them in case it accidentally gets logged. + return super.toString(); + } +} diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/snapshotlifecycle/SnapshotLifecycleService.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/snapshotlifecycle/SnapshotLifecycleService.java new file mode 100644 index 0000000000000..c281d8e63874e --- /dev/null +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/snapshotlifecycle/SnapshotLifecycleService.java @@ -0,0 +1,124 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.snapshotlifecycle; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.ClusterChangedEvent; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateListener; +import org.elasticsearch.cluster.LocalNodeMasterListener; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.ConcurrentCollections; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.xpack.core.scheduler.CronSchedule; +import org.elasticsearch.xpack.core.scheduler.SchedulerEngine; + +import java.io.Closeable; +import java.time.Clock; +import java.util.Map; + +/** + * {@code SnapshotLifecycleService} manages snapshot policy scheduling and triggering of the + * {@link SnapshotLifecycleTask}. It reacts to new policies in the cluster state by scheduling a + * task according to the policy's schedule. + */ +public class SnapshotLifecycleService implements LocalNodeMasterListener, Closeable, ClusterStateListener { + + private static final Logger logger = LogManager.getLogger(SnapshotLifecycleMetadata.class); + + private final SchedulerEngine scheduler; + private final ClusterService clusterService; + private final SnapshotLifecycleTask snapshotTask; + private final Map scheduledTasks = ConcurrentCollections.newConcurrentMap(); + private volatile boolean isMaster = false; + + public SnapshotLifecycleService(Settings settings, Client client, ClusterService clusterService, + Clock clock) { + this.scheduler = new SchedulerEngine(settings, clock); + this.clusterService = clusterService; + this.snapshotTask = new SnapshotLifecycleTask(client); + clusterService.addLocalNodeMasterListener(this); // TODO: change this not to use 'this' + clusterService.addListener(this); + } + + @Override + public void clusterChanged(ClusterChangedEvent event) { + if (this.isMaster) { + // TODO: handle modified policies (currently they are ignored) + // TODO: handle deleted policies + scheduleSnapshotJobs(event.state()); + } + } + + @Override + public void onMaster() { + this.isMaster = true; + scheduler.register(snapshotTask); + scheduleSnapshotJobs(clusterService.state()); + } + + @Override + public void offMaster() { + this.isMaster = false; + scheduler.unregister(snapshotTask); + cancelSnapshotJobs(); + } + + /** + * Schedule all non-scheduled snapshot jobs contained in the cluster state + */ + public void scheduleSnapshotJobs(final ClusterState state) { + SnapshotLifecycleMetadata snapMeta = state.metaData().custom(SnapshotLifecycleMetadata.TYPE); + if (snapMeta != null) { + snapMeta.getSnapshotConfigurations().values().forEach(this::maybeScheduleSnapshot); + } + } + + /** + * Schedule the {@link SnapshotLifecyclePolicy} job if it does not already exist. If the job already + * exists it is not interfered with. + */ + public void maybeScheduleSnapshot(final SnapshotLifecyclePolicyMetadata snapshotLifecyclePolicy) { + final String jobId = snapshotLifecyclePolicy.getPolicy().getId(); + scheduledTasks.computeIfAbsent(jobId, id -> { + final SchedulerEngine.Job job = new SchedulerEngine.Job(jobId, + new CronSchedule(snapshotLifecyclePolicy.getPolicy().getSchedule())); + logger.info("scheduling snapshot lifecycle job [{}]", jobId); + scheduler.add(job); + return job; + }); + } + + /** + * Cancel all scheduled snapshot jobs + */ + public void cancelSnapshotJobs() { + scheduler.scheduledJobIds().forEach(scheduler::remove); + scheduledTasks.clear(); + } + + /** + * Cancel the given snapshot lifecycle id + */ + public void cancelScheduledSnapshot(final String snapshotLifecycleId) { + scheduledTasks.remove(snapshotLifecycleId); + scheduler.remove(snapshotLifecycleId); + } + + @Override + public String executorName() { + return ThreadPool.Names.SNAPSHOT; + } + + @Override + public void close() { + this.scheduler.stop(); + } +} diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/snapshotlifecycle/SnapshotLifecycleTask.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/snapshotlifecycle/SnapshotLifecycleTask.java new file mode 100644 index 0000000000000..3c1b1df34fddc --- /dev/null +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/snapshotlifecycle/SnapshotLifecycleTask.java @@ -0,0 +1,29 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.snapshotlifecycle; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.client.Client; +import org.elasticsearch.xpack.core.scheduler.SchedulerEngine; + +public class SnapshotLifecycleTask implements SchedulerEngine.Listener { + + private static Logger logger = LogManager.getLogger(SnapshotLifecycleTask.class); + + private final Client client; + + SnapshotLifecycleTask(final Client client) { + this.client = client; + } + + @Override + public void triggered(SchedulerEngine.Event event) { + logger.info("--> triggered job: {}", event); + // TODO: implement snapshotting the indices from the job + } +} diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/snapshotlifecycle/action/DeleteSnapshotLifecycleAction.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/snapshotlifecycle/action/DeleteSnapshotLifecycleAction.java new file mode 100644 index 0000000000000..eca85152a1112 --- /dev/null +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/snapshotlifecycle/action/DeleteSnapshotLifecycleAction.java @@ -0,0 +1,92 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.snapshotlifecycle.action; + +import org.elasticsearch.action.Action; +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.support.master.AcknowledgedRequest; +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.xcontent.ToXContentObject; + +import java.io.IOException; +import java.util.Objects; + +public class DeleteSnapshotLifecycleAction extends Action { + public static final DeleteSnapshotLifecycleAction INSTANCE = new DeleteSnapshotLifecycleAction(); + public static final String NAME = "cluster:admin/ilm/snapshot/delete"; + + protected DeleteSnapshotLifecycleAction() { + super(NAME); + } + + @Override + public DeleteSnapshotLifecycleAction.Response newResponse() { + return new Response(); + } + + public static class Request extends AcknowledgedRequest { + + private String lifecycleId; + + Request(String lifecycleId) { + this.lifecycleId = Objects.requireNonNull(lifecycleId, "id may not be null"); + } + + Request() { + } + + public String getLifecycleId() { + return this.lifecycleId; + } + + @Override + public ActionRequestValidationException validate() { + return null; + } + + @Override + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + lifecycleId = in.readString(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeString(lifecycleId); + } + + @Override + public int hashCode() { + return lifecycleId.hashCode(); + } + + @Override + public boolean equals(Object obj) { + if (obj == null) { + return false; + } + if (obj.getClass() != getClass()) { + return false; + } + Request other = (Request) obj; + return Objects.equals(lifecycleId, other.lifecycleId); + } + } + + public static class Response extends AcknowledgedResponse implements ToXContentObject { + + public Response() { + } + + public Response(boolean acknowledged) { + super(acknowledged); + } + } +} diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/snapshotlifecycle/action/GetSnapshotLifecycleAction.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/snapshotlifecycle/action/GetSnapshotLifecycleAction.java new file mode 100644 index 0000000000000..7994c4e194b18 --- /dev/null +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/snapshotlifecycle/action/GetSnapshotLifecycleAction.java @@ -0,0 +1,208 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.snapshotlifecycle.action; + +import org.elasticsearch.action.Action; +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.support.master.AcknowledgedRequest; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.xcontent.ToXContentFragment; +import org.elasticsearch.common.xcontent.ToXContentObject; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.xpack.snapshotlifecycle.SnapshotLifecyclePolicy; +import org.elasticsearch.xpack.snapshotlifecycle.SnapshotLifecyclePolicyMetadata; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; + +public class GetSnapshotLifecycleAction extends Action { + public static final GetSnapshotLifecycleAction INSTANCE = new GetSnapshotLifecycleAction(); + public static final String NAME = "cluster:admin/ilm/snapshot/get"; + + protected GetSnapshotLifecycleAction() { + super(NAME); + } + + @Override + public GetSnapshotLifecycleAction.Response newResponse() { + throw new UnsupportedOperationException(); + } + + @Override + public Writeable.Reader getResponseReader() { + return GetSnapshotLifecycleAction.Response::new; + } + + public static class Request extends AcknowledgedRequest { + + private String[] lifecycleIds; + + Request(String... lifecycleIds) { + this.lifecycleIds = Objects.requireNonNull(lifecycleIds, "ids may not be null"); + } + + Request() { + this.lifecycleIds = Strings.EMPTY_ARRAY; + } + + public String[] getLifecycleIds() { + return this.lifecycleIds; + } + + @Override + public ActionRequestValidationException validate() { + return null; + } + + @Override + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + lifecycleIds = in.readStringArray(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeStringArray(lifecycleIds); + } + + @Override + public int hashCode() { + return Arrays.hashCode(lifecycleIds); + } + + @Override + public boolean equals(Object obj) { + if (obj == null) { + return false; + } + if (obj.getClass() != getClass()) { + return false; + } + Request other = (Request) obj; + return Arrays.equals(lifecycleIds, other.lifecycleIds); + } + } + + public static class Response extends ActionResponse implements ToXContentObject { + + private List lifecycles; + + public Response() { } + + public Response(List lifecycles) { + this.lifecycles = lifecycles; + } + + public Response(StreamInput in) throws IOException { + this.lifecycles = in.readList(SnapshotLifecyclePolicyItem::new); + } + + @Override + public String toString() { + return Strings.toString(this); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + for (SnapshotLifecyclePolicyItem item : lifecycles) { + item.toXContent(builder, params); + } + builder.endObject(); + return builder; + } + + @Override + public int hashCode() { + return Objects.hash(lifecycles); + } + + @Override + public boolean equals(Object obj) { + if (obj == null) { + return false; + } + if (obj.getClass() != getClass()) { + return false; + } + Response other = (Response) obj; + return lifecycles.equals(other.lifecycles); + } + } + + /** + * The {@code SnapshotLifecyclePolicyItem} class is a special wrapper almost exactly like the + * {@link SnapshotLifecyclePolicyMetadata}, however, it elides the headers to ensure that they + * are not leaked to the user since they may contain sensitive information. + */ + public static class SnapshotLifecyclePolicyItem implements ToXContentFragment { + + private final SnapshotLifecyclePolicy policy; + private final long version; + private final long modifiedDate; + + public SnapshotLifecyclePolicyItem(SnapshotLifecyclePolicy policy, long version, long modifiedDate) { + this.policy = policy; + this.version = version; + this.modifiedDate = modifiedDate; + } + + public SnapshotLifecyclePolicyItem(StreamInput in) throws IOException { + this.policy = new SnapshotLifecyclePolicy(in); + this.version = in.readVLong(); + this.modifiedDate = in.readVLong(); + } + + public SnapshotLifecyclePolicy getPolicy() { + return policy; + } + + public long getVersion() { + return version; + } + + public long getModifiedDate() { + return modifiedDate; + } + + @Override + public int hashCode() { + return Objects.hash(policy, version, modifiedDate); + } + + @Override + public boolean equals(Object obj) { + if (obj == null) { + return false; + } + if (obj.getClass() != getClass()) { + return false; + } + SnapshotLifecyclePolicyItem other = (SnapshotLifecyclePolicyItem) obj; + return policy.equals(other.policy) && + version == other.version && + modifiedDate == other.modifiedDate; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(policy.getId()); + builder.field("version", version); + builder.field("modified_date", modifiedDate); + builder.field("policy", policy); + builder.endObject(); + return builder; + } + } +} diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/snapshotlifecycle/action/PutSnapshotLifecycleAction.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/snapshotlifecycle/action/PutSnapshotLifecycleAction.java new file mode 100644 index 0000000000000..a6d88a27674cb --- /dev/null +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/snapshotlifecycle/action/PutSnapshotLifecycleAction.java @@ -0,0 +1,130 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.snapshotlifecycle.action; + +import org.elasticsearch.action.Action; +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.support.master.AcknowledgedRequest; +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.xcontent.ToXContentObject; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.xpack.snapshotlifecycle.SnapshotLifecyclePolicy; + +import java.io.IOException; +import java.util.Objects; + +public class PutSnapshotLifecycleAction extends Action { + public static final PutSnapshotLifecycleAction INSTANCE = new PutSnapshotLifecycleAction(); + public static final String NAME = "cluster:admin/ilm/snapshot/put"; + + protected PutSnapshotLifecycleAction() { + super(NAME); + } + + @Override + public PutSnapshotLifecycleAction.Response newResponse() { + throw new UnsupportedOperationException(); + } + + @Override + public Writeable.Reader getResponseReader() { + return Response::new; + } + + public static class Request extends AcknowledgedRequest implements ToXContentObject { + + private String lifecycleId; + private SnapshotLifecyclePolicy lifecycle; + + Request(String lifecycleId, SnapshotLifecyclePolicy lifecycle) { + this.lifecycleId = lifecycleId; + this.lifecycle = lifecycle; + } + + Request() { } + + public String getLifecycleId() { + return this.lifecycleId; + } + + public SnapshotLifecyclePolicy getLifecycle() { + return this.lifecycle; + } + + public static Request parseRequest(String lifecycleId, XContentParser parser) { + return new Request(lifecycleId, SnapshotLifecyclePolicy.parse(parser, lifecycleId)); + } + + @Override + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + lifecycleId = in.readString(); + lifecycle = new SnapshotLifecyclePolicy(in); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeString(lifecycleId); + lifecycle.writeTo(out); + } + + @Override + public ActionRequestValidationException validate() { + return null; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field(lifecycleId, lifecycle); + builder.endObject(); + return builder; + } + + @Override + public int hashCode() { + return Objects.hash(lifecycleId, lifecycle); + } + + @Override + public boolean equals(Object obj) { + if (obj == null) { + return false; + } + if (obj.getClass() != getClass()) { + return false; + } + Request other = (Request) obj; + return lifecycleId.equals(other.lifecycleId) && + lifecycle.equals(other.lifecycle); + } + + @Override + public String toString() { + return Strings.toString(this); + } + } + + public static class Response extends AcknowledgedResponse implements ToXContentObject { + + public Response() { } + + public Response(boolean acknowledged) { + super(acknowledged); + } + + public Response(StreamInput streamInput) throws IOException { + this(streamInput.readBoolean()); + } + } +} diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/snapshotlifecycle/action/RestDeleteSnapshotLifecycleAction.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/snapshotlifecycle/action/RestDeleteSnapshotLifecycleAction.java new file mode 100644 index 0000000000000..2a9a291b86412 --- /dev/null +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/snapshotlifecycle/action/RestDeleteSnapshotLifecycleAction.java @@ -0,0 +1,37 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.snapshotlifecycle.action; + +import org.elasticsearch.client.node.NodeClient; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.rest.BaseRestHandler; +import org.elasticsearch.rest.RestController; +import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.action.RestToXContentListener; + +public class RestDeleteSnapshotLifecycleAction extends BaseRestHandler { + + public RestDeleteSnapshotLifecycleAction(Settings settings, RestController controller) { + super(settings); + controller.registerHandler(RestRequest.Method.DELETE, "/_ilm/snapshot/{name}", this); + } + + @Override + public String getName() { + return "ilm_delete_snapshot_lifecycle"; + } + + @Override + protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) { + String lifecycleId = request.param("name"); + DeleteSnapshotLifecycleAction.Request req = new DeleteSnapshotLifecycleAction.Request(lifecycleId); + req.timeout(request.paramAsTime("timeout", req.timeout())); + req.masterNodeTimeout(request.paramAsTime("master_timeout", req.masterNodeTimeout())); + + return channel -> client.execute(DeleteSnapshotLifecycleAction.INSTANCE, req, new RestToXContentListener<>(channel)); + } +} diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/snapshotlifecycle/action/RestGetSnapshotLifecycleAction.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/snapshotlifecycle/action/RestGetSnapshotLifecycleAction.java new file mode 100644 index 0000000000000..ac4443c7e7ad5 --- /dev/null +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/snapshotlifecycle/action/RestGetSnapshotLifecycleAction.java @@ -0,0 +1,39 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.snapshotlifecycle.action; + +import org.elasticsearch.client.node.NodeClient; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.rest.BaseRestHandler; +import org.elasticsearch.rest.RestController; +import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.action.RestToXContentListener; + +public class RestGetSnapshotLifecycleAction extends BaseRestHandler { + + public RestGetSnapshotLifecycleAction(Settings settings, RestController controller) { + super(settings); + controller.registerHandler(RestRequest.Method.GET, "/_ilm/snapshot", this); + controller.registerHandler(RestRequest.Method.GET, "/_ilm/snapshot/{name}", this); + } + + @Override + public String getName() { + return "ilm_get_snapshot_lifecycle"; + } + + @Override + protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) { + String[] lifecycleNames = Strings.splitStringByCommaToArray(request.param("name")); + GetSnapshotLifecycleAction.Request req = new GetSnapshotLifecycleAction.Request(lifecycleNames); + req.timeout(request.paramAsTime("timeout", req.timeout())); + req.masterNodeTimeout(request.paramAsTime("master_timeout", req.masterNodeTimeout())); + + return channel -> client.execute(GetSnapshotLifecycleAction.INSTANCE, req, new RestToXContentListener<>(channel)); + } +} diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/snapshotlifecycle/action/RestPutSnapshotLifecycleAction.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/snapshotlifecycle/action/RestPutSnapshotLifecycleAction.java new file mode 100644 index 0000000000000..a0eba8da655d5 --- /dev/null +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/snapshotlifecycle/action/RestPutSnapshotLifecycleAction.java @@ -0,0 +1,41 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.snapshotlifecycle.action; + +import org.elasticsearch.client.node.NodeClient; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.rest.BaseRestHandler; +import org.elasticsearch.rest.RestController; +import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.action.RestToXContentListener; + +import java.io.IOException; + +public class RestPutSnapshotLifecycleAction extends BaseRestHandler { + + public RestPutSnapshotLifecycleAction(Settings settings, RestController controller) { + super(settings); + controller.registerHandler(RestRequest.Method.PUT, "/_ilm/snapshot/{name}", this); + } + + @Override + public String getName() { + return "ilm_put_snapshot_lifecycle"; + } + + @Override + protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException { + String snapLifecycleName = request.param("name"); + try (XContentParser parser = request.contentParser()) { + PutSnapshotLifecycleAction.Request req = PutSnapshotLifecycleAction.Request.parseRequest(snapLifecycleName, parser); + req.timeout(request.paramAsTime("timeout", req.timeout())); + req.masterNodeTimeout(request.paramAsTime("master_timeout", req.masterNodeTimeout())); + return channel -> client.execute(PutSnapshotLifecycleAction.INSTANCE, req, new RestToXContentListener<>(channel)); + } + } +} diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/snapshotlifecycle/action/TransportDeleteSnapshotLifecycleAction.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/snapshotlifecycle/action/TransportDeleteSnapshotLifecycleAction.java new file mode 100644 index 0000000000000..2c66cb842a2c1 --- /dev/null +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/snapshotlifecycle/action/TransportDeleteSnapshotLifecycleAction.java @@ -0,0 +1,91 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.snapshotlifecycle.action; + +import org.elasticsearch.ResourceNotFoundException; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.master.TransportMasterNodeAction; +import org.elasticsearch.cluster.AckedClusterStateUpdateTask; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.block.ClusterBlockException; +import org.elasticsearch.cluster.block.ClusterBlockLevel; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.snapshotlifecycle.SnapshotLifecycleMetadata; +import org.elasticsearch.xpack.snapshotlifecycle.SnapshotLifecyclePolicyMetadata; + +import java.util.Map; +import java.util.stream.Collectors; + +public class TransportDeleteSnapshotLifecycleAction extends + TransportMasterNodeAction { + + @Inject + public TransportDeleteSnapshotLifecycleAction(TransportService transportService, ClusterService clusterService, ThreadPool threadPool, + ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) { + super(DeleteSnapshotLifecycleAction.NAME, transportService, clusterService, threadPool, actionFilters, + indexNameExpressionResolver, DeleteSnapshotLifecycleAction.Request::new); + } + + @Override + protected String executor() { + return ThreadPool.Names.SAME; + } + + @Override + protected DeleteSnapshotLifecycleAction.Response newResponse() { + return new DeleteSnapshotLifecycleAction.Response(); + } + + @Override + protected void masterOperation(DeleteSnapshotLifecycleAction.Request request, + ClusterState state, + ActionListener listener) throws Exception { + clusterService.submitStateUpdateTask("delete-snapshot-lifecycle-" + request.getLifecycleId(), + new AckedClusterStateUpdateTask(request, listener) { + @Override + protected DeleteSnapshotLifecycleAction.Response newResponse(boolean acknowledged) { + return new DeleteSnapshotLifecycleAction.Response(acknowledged); + } + + @Override + public ClusterState execute(ClusterState currentState) { + SnapshotLifecycleMetadata snapMeta = currentState.metaData().custom(SnapshotLifecycleMetadata.TYPE); + if (snapMeta == null) { + throw new ResourceNotFoundException("snapshot lifecycle policy not found: {}", request.getLifecycleId()); + } + // Check that the policy exists in the first place + snapMeta.getSnapshotConfigurations().entrySet().stream() + .filter(e -> e.getValue().getPolicy().getId().equals(request.getLifecycleId())) + .findAny() + .orElseThrow(() -> new ResourceNotFoundException("snapshot lifecycle policy not found: {}", + request.getLifecycleId())); + + Map newConfigs = snapMeta.getSnapshotConfigurations().entrySet().stream() + .filter(e -> e.getKey().equals(request.getLifecycleId()) == false) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + + MetaData metaData = currentState.metaData(); + return ClusterState.builder(currentState) + .metaData(MetaData.builder(metaData) + .putCustom(SnapshotLifecycleMetadata.TYPE, + new SnapshotLifecycleMetadata(newConfigs))) + .build(); + } + }); + } + + @Override + protected ClusterBlockException checkBlock(DeleteSnapshotLifecycleAction.Request request, ClusterState state) { + return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE); + } +} diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/snapshotlifecycle/action/TransportGetSnapshotLifecycleAction.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/snapshotlifecycle/action/TransportGetSnapshotLifecycleAction.java new file mode 100644 index 0000000000000..2e7ecf51b6ef5 --- /dev/null +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/snapshotlifecycle/action/TransportGetSnapshotLifecycleAction.java @@ -0,0 +1,90 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.snapshotlifecycle.action; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.master.TransportMasterNodeAction; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.block.ClusterBlockException; +import org.elasticsearch.cluster.block.ClusterBlockLevel; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.snapshotlifecycle.SnapshotLifecycleMetadata; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; + +public class TransportGetSnapshotLifecycleAction extends + TransportMasterNodeAction { + + private static final Logger logger = LogManager.getLogger(TransportPutSnapshotLifecycleAction.class); + + @Inject + public TransportGetSnapshotLifecycleAction(TransportService transportService, ClusterService clusterService, ThreadPool threadPool, + ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) { + super(GetSnapshotLifecycleAction.NAME, transportService, clusterService, threadPool, actionFilters, indexNameExpressionResolver, + GetSnapshotLifecycleAction.Request::new); + } + @Override + protected String executor() { + return ThreadPool.Names.SAME; + } + + @Override + protected GetSnapshotLifecycleAction.Response newResponse() { + throw new UnsupportedOperationException(); + } + + @Override + protected GetSnapshotLifecycleAction.Response read(StreamInput in) throws IOException { + return new GetSnapshotLifecycleAction.Response(in); + } + + @Override + protected void masterOperation(final GetSnapshotLifecycleAction.Request request, + final ClusterState state, + final ActionListener listener) { + SnapshotLifecycleMetadata snapMeta = state.metaData().custom(SnapshotLifecycleMetadata.TYPE); + if (snapMeta == null) { + listener.onResponse(new GetSnapshotLifecycleAction.Response(Collections.emptyList())); + } else { + final Set ids = new HashSet<>(Arrays.asList(request.getLifecycleIds())); + List lifecycles = snapMeta.getSnapshotConfigurations() + .values() + .stream() + .filter(meta -> { + if (ids.isEmpty()) { + return true; + } else { + return ids.contains(meta.getPolicy().getId()); + } + }) + .map(meta -> + new GetSnapshotLifecycleAction.SnapshotLifecyclePolicyItem(meta.getPolicy(), + meta.getVersion(), meta.getModifiedDate())) + .collect(Collectors.toList()); + listener.onResponse(new GetSnapshotLifecycleAction.Response(lifecycles)); + } + } + + @Override + protected ClusterBlockException checkBlock(GetSnapshotLifecycleAction.Request request, ClusterState state) { + return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_READ); + } +} diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/snapshotlifecycle/action/TransportPutSnapshotLifecycleAction.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/snapshotlifecycle/action/TransportPutSnapshotLifecycleAction.java new file mode 100644 index 0000000000000..ba8b62235a8b0 --- /dev/null +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/snapshotlifecycle/action/TransportPutSnapshotLifecycleAction.java @@ -0,0 +1,120 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.snapshotlifecycle.action; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.master.TransportMasterNodeAction; +import org.elasticsearch.cluster.AckedClusterStateUpdateTask; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.block.ClusterBlockException; +import org.elasticsearch.cluster.block.ClusterBlockLevel; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.core.ClientHelper; +import org.elasticsearch.xpack.core.indexlifecycle.LifecyclePolicy; +import org.elasticsearch.xpack.snapshotlifecycle.SnapshotLifecycleMetadata; +import org.elasticsearch.xpack.snapshotlifecycle.SnapshotLifecyclePolicyMetadata; + +import java.io.IOException; +import java.time.Instant; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.stream.Collectors; + +public class TransportPutSnapshotLifecycleAction extends + TransportMasterNodeAction { + + private static final Logger logger = LogManager.getLogger(TransportPutSnapshotLifecycleAction.class); + + @Inject + public TransportPutSnapshotLifecycleAction(TransportService transportService, ClusterService clusterService, ThreadPool threadPool, + ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) { + super(PutSnapshotLifecycleAction.NAME, transportService, clusterService, threadPool, actionFilters, indexNameExpressionResolver, + PutSnapshotLifecycleAction.Request::new); + } + @Override + protected String executor() { + return ThreadPool.Names.SAME; + } + + @Override + protected PutSnapshotLifecycleAction.Response newResponse() { + throw new UnsupportedOperationException(); + } + + @Override + protected PutSnapshotLifecycleAction.Response read(StreamInput in) throws IOException { + return new PutSnapshotLifecycleAction.Response(in); + } + + @Override + protected void masterOperation(final PutSnapshotLifecycleAction.Request request, + final ClusterState state, + final ActionListener listener) { + // headers from the thread context stored by the AuthenticationService to be shared between the + // REST layer and the Transport layer here must be accessed within this thread and not in the + // cluster state thread in the ClusterStateUpdateTask below since that thread does not share the + // same context, and therefore does not have access to the appropriate security headers. + final Map filteredHeaders = threadPool.getThreadContext().getHeaders().entrySet().stream() + .filter(e -> ClientHelper.SECURITY_HEADER_FILTERS.contains(e.getKey())) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + LifecyclePolicy.validatePolicyName(request.getLifecycleId()); + clusterService.submitStateUpdateTask("put-snapshot-lifecycle-" + request.getLifecycleId(), + new AckedClusterStateUpdateTask(request, listener) { + @Override + public ClusterState execute(ClusterState currentState) { + SnapshotLifecycleMetadata snapMeta = currentState.metaData().custom(SnapshotLifecycleMetadata.TYPE); + + String id = request.getLifecycleId(); + final SnapshotLifecycleMetadata lifecycleMetadata; + if (snapMeta == null) { + SnapshotLifecyclePolicyMetadata meta = new SnapshotLifecyclePolicyMetadata(request.getLifecycle(), filteredHeaders, + 0, Instant.now().toEpochMilli()); + lifecycleMetadata = new SnapshotLifecycleMetadata(Collections.singletonMap(id, meta)); + logger.info("adding new snapshot lifecycle [{}]", id); + } else { + Map snapLifecycles = new HashMap<>(snapMeta.getSnapshotConfigurations()); + SnapshotLifecyclePolicyMetadata oldLifecycle = snapLifecycles.get(id); + SnapshotLifecyclePolicyMetadata newLifecycle = new SnapshotLifecyclePolicyMetadata(request.getLifecycle(), + filteredHeaders, oldLifecycle == null ? 0L : oldLifecycle.getVersion() + 1, Instant.now().toEpochMilli()); + snapLifecycles.put(id, newLifecycle); + lifecycleMetadata = new SnapshotLifecycleMetadata(snapLifecycles); + if (oldLifecycle == null) { + logger.info("adding new snapshot lifecycle [{}]", id); + } else { + logger.info("updating existing snapshot lifecycle [{}]", id); + } + } + + MetaData currentMeta = currentState.metaData(); + return ClusterState.builder(currentState) + .metaData(MetaData.builder(currentMeta) + .putCustom(SnapshotLifecycleMetadata.TYPE, lifecycleMetadata)) + .build(); + } + + @Override + protected PutSnapshotLifecycleAction.Response newResponse(boolean acknowledged) { + return new PutSnapshotLifecycleAction.Response(acknowledged); + } + }); + } + + @Override + protected ClusterBlockException checkBlock(PutSnapshotLifecycleAction.Request request, ClusterState state) { + return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE); + } +} diff --git a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/snapshotlifecycle/SnapshotLifecyclePolicyTests.java b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/snapshotlifecycle/SnapshotLifecyclePolicyTests.java new file mode 100644 index 0000000000000..3ecde4b8e45c6 --- /dev/null +++ b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/snapshotlifecycle/SnapshotLifecyclePolicyTests.java @@ -0,0 +1,92 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.snapshotlifecycle; + +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.test.AbstractSerializingTestCase; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +public class SnapshotLifecyclePolicyTests extends AbstractSerializingTestCase { + + private String id; + + @Override + protected SnapshotLifecyclePolicy doParseInstance(XContentParser parser) throws IOException { + return SnapshotLifecyclePolicy.parse(parser, id); + } + + @Override + protected SnapshotLifecyclePolicy createTestInstance() { + Map config = new HashMap<>(); + for (int i = 0; i < randomIntBetween(2, 5); i++) { + config.put(randomAlphaOfLength(4), randomAlphaOfLength(4)); + } + id = randomAlphaOfLength(5); + return new SnapshotLifecyclePolicy(id, + randomAlphaOfLength(4), + randomSchedule(), + randomAlphaOfLength(4), + config); + } + + private String randomSchedule() { + return randomIntBetween(0, 59) + " " + + randomIntBetween(0, 59) + " " + + randomIntBetween(0, 12) + " * * ?"; + } + + @Override + protected SnapshotLifecyclePolicy mutateInstance(SnapshotLifecyclePolicy instance) throws IOException { + switch (between(0, 4)) { + case 0: + return new SnapshotLifecyclePolicy(instance.getId() + randomAlphaOfLength(2), + instance.getName(), + instance.getSchedule(), + instance.getRepository(), + instance.getConfig()); + case 1: + return new SnapshotLifecyclePolicy(instance.getId(), + instance.getName() + randomAlphaOfLength(2), + instance.getSchedule(), + instance.getRepository(), + instance.getConfig()); + case 2: + return new SnapshotLifecyclePolicy(instance.getId(), + instance.getName(), + randomValueOtherThan(instance.getSchedule(), this::randomSchedule), + instance.getRepository(), + instance.getConfig()); + case 3: + return new SnapshotLifecyclePolicy(instance.getId(), + instance.getName(), + instance.getSchedule(), + instance.getRepository() + randomAlphaOfLength(2), + instance.getConfig()); + case 4: + Map newConfig = new HashMap<>(); + for (int i = 0; i < randomIntBetween(2, 5); i++) { + newConfig.put(randomAlphaOfLength(3), randomAlphaOfLength(3)); + } + return new SnapshotLifecyclePolicy(instance.getId(), + instance.getName() + randomAlphaOfLength(2), + instance.getSchedule(), + instance.getRepository(), + newConfig); + default: + throw new AssertionError("failure, got illegal switch case"); + } + } + + @Override + protected Writeable.Reader instanceReader() { + return SnapshotLifecyclePolicy::new; + } +} diff --git a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/RollupJobTask.java b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/RollupJobTask.java index f545ab049d44d..3773a0eb4fd96 100644 --- a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/RollupJobTask.java +++ b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/RollupJobTask.java @@ -32,6 +32,7 @@ import org.elasticsearch.xpack.core.rollup.job.RollupJob; import org.elasticsearch.xpack.core.rollup.job.RollupJobConfig; import org.elasticsearch.xpack.core.rollup.job.RollupJobStatus; +import org.elasticsearch.xpack.core.scheduler.CronSchedule; import org.elasticsearch.xpack.core.scheduler.SchedulerEngine; import org.elasticsearch.xpack.rollup.Rollup;