Skip to content

Commit

Permalink
Add support for SourceOnly snapshot repository (#4073)
Browse files Browse the repository at this point in the history
This commit adds support for SourceOnly repository.

- Introduce a ResetOffset() method on JsonReader to reset the offset back to the initial offset.
- Tidy up XML docs on repositories.
  • Loading branch information
russcam authored Sep 4, 2019
1 parent 169b784 commit 4e8a5ea
Show file tree
Hide file tree
Showing 12 changed files with 376 additions and 117 deletions.
15 changes: 15 additions & 0 deletions src/Elasticsearch.Net/Utf8Json/JsonReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ internal struct JsonReader

readonly byte[] bytes;
int offset;
int initialOffset;

public JsonReader(byte[] bytes)
: this(bytes, 0)
Expand All @@ -60,6 +61,8 @@ public JsonReader(byte[] bytes, int offset)
this.offset = offset += 3;
}
}

this.initialOffset = offset;
}

JsonParsingException CreateParsingException(string expected)
Expand Down Expand Up @@ -113,11 +116,23 @@ bool IsInRange
}
}

/// <summary>
/// Advances the offset by a specified amount
/// </summary>
/// <param name="offset">The amount to offset by</param>
public void AdvanceOffset(int offset)
{
this.offset += offset;
}

/// <summary>
/// Resets the offset of the reader back to the original offset
/// </summary>
public void ResetOffset()
{
this.offset = this.initialOffset;
}

public byte[] GetBufferUnsafe()
{
return bytes;
Expand Down
55 changes: 35 additions & 20 deletions src/Nest/Modules/SnapshotAndRestore/Repositories/AzureRepository.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,14 @@

namespace Nest
{
/// <summary>
/// A snapshot repository that stores snapshots in an Azure storage account
/// <para />
/// Requires the repository-azure plugin to be installed on the cluster
/// </summary>
public interface IAzureRepository : IRepository<IAzureRepositorySettings> { }

/// <inheritdoc />
public class AzureRepository : IAzureRepository
{
public AzureRepository() { }
Expand All @@ -16,32 +22,55 @@ public AzureRepository() { }
public string Type { get; } = "azure";
}

/// <summary>
/// Snapshot repository settings for <see cref="IAzureRepository"/>
/// </summary>
public interface IAzureRepositorySettings : IRepositorySettings
{
/// <summary>
/// The path within the container on which to store the snapshot data.
/// </summary>
[DataMember(Name ="base_path")]
string BasePath { get; set; }

/// <summary>
/// Big files can be broken down into chunks during the snapshot process, if needed.
/// The chunk size can be specified in bytes or by using size value notation,
/// i.e. 1g, 10m, 5k. Defaults to 64m (64m max)
/// </summary>
[DataMember(Name ="chunk_size")]
string ChunkSize { get; set; }

/// <summary>
/// When set to true metadata files are stored in compressed format. This setting doesn't
/// affect index files that are already compressed by default. Defaults to <c>false</c>.
/// </summary>
[DataMember(Name ="compress")]
bool? Compress { get; set; }

/// <summary>
/// Tha name of the container
/// </summary>
[DataMember(Name ="container")]
string Container { get; set; }
}

/// <inheritdoc />
public class AzureRepositorySettings : IAzureRepositorySettings
{
/// <inheritdoc />
[DataMember(Name ="base_path")]
public string BasePath { get; set; }

/// <inheritdoc />
[DataMember(Name ="chunk_size")]
public string ChunkSize { get; set; }

/// <inheritdoc />
[DataMember(Name ="compress")]
public bool? Compress { get; set; }

/// <inheritdoc />
[DataMember(Name ="container")]
public string Container { get; set; }
}
Expand All @@ -54,42 +83,28 @@ public class AzureRepositorySettingsDescriptor
bool? IAzureRepositorySettings.Compress { get; set; }
string IAzureRepositorySettings.Container { get; set; }

/// <summary>
/// Container name. Defaults to elasticsearch-snapshots
/// </summary>
/// <param name="container"></param>
/// <inheritdoc cref="IAzureRepositorySettings.Container"/>
public AzureRepositorySettingsDescriptor Container(string container) => Assign(container, (a, v) => a.Container = v);

/// <summary>
/// Specifies the path within container to repository data. Defaults to empty (root directory).
/// </summary>
/// <param name="basePath"></param>
/// <returns></returns>
/// <inheritdoc cref="IAzureRepositorySettings.BasePath"/>
public AzureRepositorySettingsDescriptor BasePath(string basePath) => Assign(basePath, (a, v) => a.BasePath = v);

/// <summary>
/// When set to true metadata files are stored in compressed format. This setting doesn't
/// affect index files that are already compressed by default. Defaults to false.
/// </summary>
/// <param name="compress"></param>
/// <inheritdoc cref="IAzureRepositorySettings.Compress"/>
public AzureRepositorySettingsDescriptor Compress(bool? compress = true) => Assign(compress, (a, v) => a.Compress = v);

/// <summary>
/// Big files can be broken down into chunks during snapshotting if needed.
/// The chunk size can be specified in bytes or by using size value notation,
/// i.e. 1g, 10m, 5k. Defaults to 64m (64m max)
/// </summary>
/// <param name="chunkSize"></param>
/// <inheritdoc cref="IAzureRepositorySettings.ChunkSize"/>
public AzureRepositorySettingsDescriptor ChunkSize(string chunkSize) => Assign(chunkSize, (a, v) => a.ChunkSize = v);
}

/// <inheritdoc cref="IAzureRepository"/>
public class AzureRepositoryDescriptor
: DescriptorBase<AzureRepositoryDescriptor, IAzureRepository>, IAzureRepository
{
IAzureRepositorySettings IRepository<IAzureRepositorySettings>.Settings { get; set; }
string ISnapshotRepository.Type { get; } = "azure";
object IRepositoryWithSettings.DelegateSettings => Self.Settings;

/// <inheritdoc cref="IAzureRepositorySettings"/>
public AzureRepositoryDescriptor Settings(Func<AzureRepositorySettingsDescriptor, IAzureRepositorySettings> settingsSelector) =>
Assign(settingsSelector, (a, v) => a.Settings = v?.Invoke(new AzureRepositorySettingsDescriptor()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,52 +3,52 @@

namespace Nest
{
/// <summary>
/// Creates a snapshot repository
/// </summary>
[MapsApi("snapshot.create_repository.json")]
[JsonFormatter(typeof(CreateRepositoryFormatter))]
public partial interface ICreateRepositoryRequest
{
/// <summary>
/// The snapshot repository
/// </summary>
ISnapshotRepository Repository { get; set; }
}

/// <inheritdoc cref="ICreateRepositoryRequest" />
public partial class CreateRepositoryRequest
{
/// <inheritdoc />
public ISnapshotRepository Repository { get; set; }
}

/// <inheritdoc cref="ICreateRepositoryRequest" />
public partial class CreateRepositoryDescriptor
{
ISnapshotRepository ICreateRepositoryRequest.Repository { get; set; }

/// <summary>
/// The shared file system repository ("type": "fs") is using shared file system to store snapshot.
/// The path specified in the location parameter should point to the same location in the shared
/// filesystem and be accessible on all data and master nodes.
/// </summary>
/// <inheritdoc cref="IFileSystemRepository"/>
public CreateRepositoryDescriptor FileSystem(Func<FileSystemRepositoryDescriptor, IFileSystemRepository> selector) =>
Assign(selector, (a, v) => a.Repository = v?.Invoke(new FileSystemRepositoryDescriptor()));

/// <summary>
/// The URL repository ("type": "url") can be used as an alternative read-only way to access data
/// created by shared file system repository is using shared file system to store snapshot.
/// </summary>
/// <inheritdoc cref="IReadOnlyUrlRepository" />
public CreateRepositoryDescriptor ReadOnlyUrl(Func<ReadOnlyUrlRepositoryDescriptor, IReadOnlyUrlRepository> selector) =>
Assign(selector, (a, v) => a.Repository = v?.Invoke(new ReadOnlyUrlRepositoryDescriptor()));

/// <summary>
/// Specify an azure storage container to snapshot and restore to. (defaults to a container named elasticsearch-snapshots)
/// </summary>
/// <inheritdoc cref="IAzureRepository" />
public CreateRepositoryDescriptor Azure(Func<AzureRepositoryDescriptor, IAzureRepository> selector = null) =>
Assign(selector.InvokeOrDefault(new AzureRepositoryDescriptor()), (a, v) => a.Repository = v);

/// <summary> Create an snapshot/restore repository that points to an HDFS filesystem </summary>
/// <inheritdoc cref="IHdfsRepository" />
public CreateRepositoryDescriptor Hdfs(Func<HdfsRepositoryDescriptor, IHdfsRepository> selector) =>
Assign(selector, (a, v) => a.Repository = v?.Invoke(new HdfsRepositoryDescriptor()));

/// <summary> Snapshot and restore to an Amazon S3 bucket </summary>
/// <inheritdoc cref="IS3Repository" />
public CreateRepositoryDescriptor S3(Func<S3RepositoryDescriptor, IS3Repository> selector) =>
Assign(selector, (a, v) => a.Repository = v?.Invoke(new S3RepositoryDescriptor()));

/// <summary> Snapshot and restore to an Amazon S3 bucket </summary>
/// <inheritdoc cref="ISourceOnlyRepository" />
public CreateRepositoryDescriptor SourceOnly(Func<SourceOnlyRepositoryDescriptor, ISourceOnlyRepository> selector) =>
Assign(selector, (a, v) => a.Repository = v?.Invoke(new SourceOnlyRepositoryDescriptor()));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,14 @@

namespace Nest
{
/// <summary>
/// A snapshot repository that uses a shared file system to store snapshot data.
/// The path specified in the location parameter should point to the same location in the shared
/// filesystem and be accessible on all data and master nodes.
/// </summary>
public interface IFileSystemRepository : IRepository<IFileSystemRepositorySettings> { }

/// <inheritdoc />
public class FileSystemRepository : IFileSystemRepository
{
public FileSystemRepository(FileSystemRepositorySettings settings) => Settings = settings;
Expand All @@ -15,29 +21,53 @@ public class FileSystemRepository : IFileSystemRepository
public string Type { get; } = "fs";
}

/// <summary>
/// Repository settings for <see cref="IFileSystemRepository"/>
/// </summary>
public interface IFileSystemRepositorySettings : IRepositorySettings
{
/// <summary>
/// Big files can be broken down into chunks during the snapshot process, if needed.
/// The chunk size can be specified in bytes or by using size value notation, i.e. 1g, 10m, 5k.
/// Defaults to null (unlimited chunk size).
/// </summary>
[DataMember(Name ="chunk_size")]
string ChunkSize { get; set; }

/// <summary>
/// Turns on compression of the snapshot files. Defaults to true.
/// </summary>
[DataMember(Name ="compress")]
[JsonFormatter(typeof(NullableStringBooleanFormatter))]
bool? Compress { get; set; }

/// <summary>
/// Throttles the number of streams (per node) preforming snapshot operation. Defaults to 5
/// </summary>
[DataMember(Name ="concurrent_streams")]
[JsonFormatter(typeof(NullableStringIntFormatter))]
int? ConcurrentStreams { get; set; }

/// <summary>
/// Location of the snapshots. Mandatory.
/// </summary>
[DataMember(Name ="location")]
string Location { get; set; }

/// <summary>
/// Throttles per node restore rate. Defaults to 20mb per second.
/// </summary>
[DataMember(Name ="max_restore_bytes_per_second")]
string RestoreBytesPerSecondMaximum { get; set; }

/// <summary>
/// Throttles per node snapshot rate. Defaults to 20mb per second.
/// </summary>
[DataMember(Name ="max_snapshot_bytes_per_second")]
string SnapshotBytesPerSecondMaximum { get; set; }
}

/// <inheritdoc cref="IFileSystemRepositorySettings"/>
public class FileSystemRepositorySettings : IFileSystemRepositorySettings
{
internal FileSystemRepositorySettings() { }
Expand All @@ -57,6 +87,7 @@ internal FileSystemRepositorySettings() { }
public string SnapshotBytesPerSecondMaximum { get; set; }
}

/// <inheritdoc cref="IFileSystemRepositorySettings"/>
public class FileSystemRepositorySettingsDescriptor
: DescriptorBase<FileSystemRepositorySettingsDescriptor, IFileSystemRepositorySettings>, IFileSystemRepositorySettings
{
Expand All @@ -67,55 +98,37 @@ public class FileSystemRepositorySettingsDescriptor
string IFileSystemRepositorySettings.RestoreBytesPerSecondMaximum { get; set; }
string IFileSystemRepositorySettings.SnapshotBytesPerSecondMaximum { get; set; }

/// <summary>
/// Location of the snapshots. Mandatory.
/// </summary>
/// <param name="location"></param>
/// <inheritdoc cref="IFileSystemRepositorySettings.Location"/>
public FileSystemRepositorySettingsDescriptor Location(string location) => Assign(location, (a, v) => a.Location = v);

/// <summary>
/// Turns on compression of the snapshot files. Defaults to true.
/// </summary>
/// <param name="compress"></param>
/// <inheritdoc cref="IFileSystemRepositorySettings.Compress"/>
public FileSystemRepositorySettingsDescriptor Compress(bool? compress = true) => Assign(compress, (a, v) => a.Compress = v);

/// <summary>
/// Throttles the number of streams (per node) preforming snapshot operation. Defaults to 5
/// </summary>
/// <param name="concurrentStreams"></param>
/// <inheritdoc cref="IFileSystemRepositorySettings.ConcurrentStreams"/>
public FileSystemRepositorySettingsDescriptor ConcurrentStreams(int? concurrentStreams) =>
Assign(concurrentStreams, (a, v) => a.ConcurrentStreams = v);

/// <summary>
/// Big files can be broken down into chunks during snapshotting if needed.
/// The chunk size can be specified in bytes or by using size value notation, i.e. 1g, 10m, 5k.
/// Defaults to null (unlimited chunk size).
/// </summary>
/// <param name="chunkSize"></param>
/// <inheritdoc cref="IFileSystemRepositorySettings.ChunkSize"/>
public FileSystemRepositorySettingsDescriptor ChunkSize(string chunkSize) => Assign(chunkSize, (a, v) => a.ChunkSize = v);

/// <summary>
/// Throttles per node restore rate. Defaults to 20mb per second.
/// </summary>
/// <param name="maximumBytesPerSecond"></param>
/// <inheritdoc cref="IFileSystemRepositorySettings.RestoreBytesPerSecondMaximum"/>
public FileSystemRepositorySettingsDescriptor RestoreBytesPerSecondMaximum(string maximumBytesPerSecond) =>
Assign(maximumBytesPerSecond, (a, v) => a.RestoreBytesPerSecondMaximum = v);

/// <summary>
/// Throttles per node snapshot rate. Defaults to 20mb per second.
/// </summary>
/// <param name="maximumBytesPerSecond"></param>
/// <inheritdoc cref="IFileSystemRepositorySettings.SnapshotBytesPerSecondMaximum"/>
public FileSystemRepositorySettingsDescriptor SnapshotBytesPerSecondMaximum(string maximumBytesPerSecond) =>
Assign(maximumBytesPerSecond, (a, v) => a.SnapshotBytesPerSecondMaximum = v);
}

/// <inheritdoc cref="IFileSystemRepository"/>
public class FileSystemRepositoryDescriptor
: DescriptorBase<FileSystemRepositoryDescriptor, IFileSystemRepository>, IFileSystemRepository
{
IFileSystemRepositorySettings IRepository<IFileSystemRepositorySettings>.Settings { get; set; }
object IRepositoryWithSettings.DelegateSettings => Self.Settings;
string ISnapshotRepository.Type { get; } = "fs";

/// <inheritdoc cref="IFileSystemRepositorySettings"/>
public FileSystemRepositoryDescriptor Settings(string location,
Func<FileSystemRepositorySettingsDescriptor, IFileSystemRepositorySettings> settingsSelector = null
) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,13 @@ public GetRepositoryResponse Deserialize(ref JsonReader reader, IJsonFormatterRe
var hdfs = GetRepository<HdfsRepository, HdfsRepositorySettings>(settings, formatterResolver);
repositories.Add(name, hdfs);
break;
case "source":
// reset the offset
snapshotSegmentReader.ResetOffset();
var source = formatterResolver.GetFormatter<ISourceOnlyRepository>()
.Deserialize(ref snapshotSegmentReader, formatterResolver);
repositories.Add(name, source);
break;
}
}
}
Expand Down
Loading

0 comments on commit 4e8a5ea

Please sign in to comment.