Skip to content

Commit

Permalink
Adds point in time APIs (opensearch-project#461)
Browse files Browse the repository at this point in the history
* Adds point in time APIs

Signed-off-by: Harsha Vamsi Kalluri <harshavamsi096@gmail.com>

* Add version check for Point in time tests

Signed-off-by: Harsha Vamsi Kalluri <harshavamsi096@gmail.com>

* Update point in time namespace

Signed-off-by: Harsha Vamsi Kalluri <harshavamsi096@gmail.com>

* Update point in time to pit everywhere

Signed-off-by: Harsha Vamsi Kalluri <harshavamsi096@gmail.com>

* Update USER_GUIDE.md

Co-authored-by: Andriy Redko <drreta@gmail.com>
Signed-off-by: Harsha Vamsi Kalluri <harshavamsi096@gmail.com>

* Update USER_GUIDE.md

Co-authored-by: Andriy Redko <drreta@gmail.com>
Signed-off-by: Harsha Vamsi Kalluri <harshavamsi096@gmail.com>

* Using AssumeTrue for tests for PIT

Signed-off-by: Harsha Vamsi Kalluri <harshavamsi096@gmail.com>

---------

Signed-off-by: Harsha Vamsi Kalluri <harshavamsi096@gmail.com>
Co-authored-by: Andriy Redko <drreta@gmail.com>
  • Loading branch information
harshavamsi and reta authored May 3, 2023
1 parent 4b9bcbc commit 4ceb43b
Show file tree
Hide file tree
Showing 19 changed files with 1,844 additions and 11 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)

### Added
- Document HTTP/2 support ([#330](https://github.com/opensearch-project/opensearch-java/pull/330))
- Added Point-In-Time APIs ([#461](https://github.com/opensearch-project/opensearch-java/pull/461))

### Dependencies

Expand Down
54 changes: 51 additions & 3 deletions USER_GUIDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,16 @@
- [Create a data stream](#create-a-data-stream)
- [Get data stream](#get-data-stream)
- [Data stream stats](#data-stream-stats)
- [Delete data stream](#delete-data-stream-and-backing-indices)
- [Delete data stream and backing indices](#delete-data-stream-and-backing-indices)
- [Point-In-Time API](#point-in-time-api)
- [Creating a point in time](#creating-a-point-in-time)
- [List all point in time](#list-all-point-in-time)
- [Delete point in time](#delete-point-in-time)
- [Cat API](#cat-api)
- [Cat Indices](#cat-indices)
- [Cat Aliases](#cat-aliases)
- [Cat Nodes](#cat-nodes)
- [Cat aliases](#cat-aliases)
- [Cat nodes](#cat-nodes)
- [Cat point in time segments](#cat-point-in-time-segments)
- [Using different transport options](#using-different-transport-options)
- [Amazon OpenSearch Service](#amazon-opensearch-service)

Expand Down Expand Up @@ -266,6 +271,42 @@ DeleteDataStreamRequest deleteDataStreamRequest = new DeleteDataStreamRequest.Bu
DeleteDataStreamResponse deleteDataStreamResponse = javaClient().indices().deleteDataStream(deleteDataStreamRequest);
```

## Point-In-Time API

### Creating a point in time

Creates a PIT. The keep_alive query parameter is required; it specifies how long to keep a PIT.

```java
CreatePitRequest createPitRequest = new CreatePitRequest.Builder()
.targetIndexes(Collections.singletonList(index))
.keepAlive(new Time.Builder().time("100m").build()).build();

CreatePitResponse createPitResponse = javaClient()
.createPit(createPitRequest);
```

### List all point in time

Returns all PITs in the OpenSearch cluster.

```java
ListAllPitResponse listAllPitResponse = javaClient().listAllPit();
```

### Delete point in time

Deletes one, several, or all PITs. PITs are automatically deleted when the keep_alive time period elapses. However, to deallocate resources, you can delete a PIT using the Delete PIT API. The Delete PIT API supports deleting a list of PITs by ID or deleting all PITs at once.

```java
DeletePitRequest deletePitRequest = new DeletePitRequest.Builder()
.pitId(Collections.singletonList("pit_id")).build();

DeletePitResponse deletePitResponse = javaClient()
.deletePit(deletePitRequest);
```


## Cat API

### Cat Indices
Expand All @@ -291,6 +332,13 @@ The following sample code cat nodes sorted by cpu
NodesResponse nodesResponse = javaClient().cat().nodes(r -> r.sort("cpu"));
```

### Cat point in time segments
Similarly to the CAT Segments API, the PIT Segments API provides low-level information about the disk utilization of a PIT by describing its Lucene segments.
```java
SegmentsResponse pitSegmentsResponse = javaClient().cat()
.pitSegments(r -> r.headers("index,shard,id,segment,size"));
```

# Using different transport options

## Amazon OpenSearch Service
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,12 @@
import org.opensearch.client.opensearch.core.UpdateByQueryRethrottleResponse;
import org.opensearch.client.opensearch.core.UpdateRequest;
import org.opensearch.client.opensearch.core.UpdateResponse;
import org.opensearch.client.opensearch.core.pit.CreatePitRequest;
import org.opensearch.client.opensearch.core.pit.CreatePitResponse;
import org.opensearch.client.opensearch.core.pit.DeletePitRequest;
import org.opensearch.client.opensearch.core.pit.DeletePitResponse;
import org.opensearch.client.opensearch.core.pit.ListAllPitRequest;
import org.opensearch.client.opensearch.core.pit.ListAllPitResponse;
import org.opensearch.client.opensearch.dangling_indices.OpenSearchDanglingIndicesAsyncClient;
import org.opensearch.client.opensearch.features.OpenSearchFeaturesAsyncClient;
import org.opensearch.client.opensearch.indices.OpenSearchIndicesAsyncClient;
Expand Down Expand Up @@ -362,6 +368,39 @@ public final <TDocument> CompletableFuture<CreateResponse> create(
return create(fn.apply(new CreateRequest.Builder<TDocument>()).build());
}

// ----- Endpoint: create_point_in_time

/**
* Provides low-level information about the disk utilization of a PIT by
* describing its Lucene segments.
*
*
*/

public CompletableFuture<CreatePitResponse> createPit(CreatePitRequest request)
throws IOException, OpenSearchException {
@SuppressWarnings("unchecked")
JsonEndpoint<CreatePitRequest, CreatePitResponse, ErrorResponse> endpoint = (JsonEndpoint<CreatePitRequest, CreatePitResponse, ErrorResponse>) CreatePitRequest._ENDPOINT;

return this.transport.performRequestAsync(request, endpoint, this.transportOptions);
}

/**
* Provides low-level information about the disk utilization of a PIT by
* describing its Lucene segments.
*
* @param fn
* a function that initializes a builder to create the
* {@link CreatePitRequest}
*
*/

public final CompletableFuture<CreatePitResponse> createPit(
Function<CreatePitRequest.Builder, ObjectBuilder<CreatePitRequest>> fn)
throws IOException, OpenSearchException {
return createPit(fn.apply(new CreatePitRequest.Builder()).build());
}

// ----- Endpoint: delete

/**
Expand Down Expand Up @@ -393,6 +432,37 @@ public final CompletableFuture<DeleteResponse> delete(
return delete(fn.apply(new DeleteRequest.Builder()).build());
}

// ----- Endpoint: delete_point_in_time

/**
* Delete Point In Time
*
*
*/

public CompletableFuture<DeletePitResponse> deletePit(DeletePitRequest request)
throws IOException, OpenSearchException {
@SuppressWarnings("unchecked")
JsonEndpoint<DeletePitRequest, DeletePitResponse, ErrorResponse> endpoint = (JsonEndpoint<DeletePitRequest, DeletePitResponse, ErrorResponse>) DeletePitRequest._ENDPOINT;

return this.transport.performRequestAsync(request, endpoint, this.transportOptions);
}

/**
* Delete Point In Time
*
* @param fn
* a function that initializes a builder to create the
* {@link DeletePitRequest}
*
*/

public final CompletableFuture<DeletePitResponse> deletePit(
Function<DeletePitRequest.Builder, ObjectBuilder<DeletePitRequest>> fn)
throws IOException, OpenSearchException {
return deletePit(fn.apply(new DeletePitRequest.Builder()).build());
}

// ----- Endpoint: delete_by_query

/**
Expand Down Expand Up @@ -801,6 +871,20 @@ public CompletableFuture<InfoResponse> info() throws IOException, OpenSearchExce
return this.transport.performRequestAsync(InfoRequest._INSTANCE, InfoRequest._ENDPOINT, this.transportOptions);
}

// ----- Endpoint: list_point_in_time

/**
* List all Point In Time
*
*
*/

public CompletableFuture<ListAllPitResponse> listAllPit()
throws IOException, OpenSearchException {
return this.transport.performRequestAsync(ListAllPitRequest._INSTANCE, ListAllPitRequest._ENDPOINT,
this.transportOptions);
}

// ----- Endpoint: mget

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,12 @@
import org.opensearch.client.opensearch.core.UpdateByQueryRethrottleResponse;
import org.opensearch.client.opensearch.core.UpdateRequest;
import org.opensearch.client.opensearch.core.UpdateResponse;
import org.opensearch.client.opensearch.core.pit.CreatePitRequest;
import org.opensearch.client.opensearch.core.pit.CreatePitResponse;
import org.opensearch.client.opensearch.core.pit.DeletePitRequest;
import org.opensearch.client.opensearch.core.pit.DeletePitResponse;
import org.opensearch.client.opensearch.core.pit.ListAllPitRequest;
import org.opensearch.client.opensearch.core.pit.ListAllPitResponse;
import org.opensearch.client.opensearch.dangling_indices.OpenSearchDanglingIndicesClient;
import org.opensearch.client.opensearch.features.OpenSearchFeaturesClient;
import org.opensearch.client.opensearch.indices.OpenSearchIndicesClient;
Expand Down Expand Up @@ -359,6 +365,39 @@ public final <TDocument> CreateResponse create(
return create(fn.apply(new CreateRequest.Builder<TDocument>()).build());
}

// ----- Endpoint: create_point_in_time

/**
* Provides low-level information about the disk utilization of a PIT by
* describing its Lucene segments.
*
*
*/

public CreatePitResponse createPit(CreatePitRequest request)
throws IOException, OpenSearchException {
@SuppressWarnings("unchecked")
JsonEndpoint<CreatePitRequest, CreatePitResponse, ErrorResponse> endpoint = (JsonEndpoint<CreatePitRequest, CreatePitResponse, ErrorResponse>) CreatePitRequest._ENDPOINT;

return this.transport.performRequest(request, endpoint, this.transportOptions);
}

/**
* Provides low-level information about the disk utilization of a PIT by
* describing its Lucene segments.
*
* @param fn
* a function that initializes a builder to create the
* {@link CreatePitRequest}
*
*/

public final CreatePitResponse createPit(
Function<CreatePitRequest.Builder, ObjectBuilder<CreatePitRequest>> fn)
throws IOException, OpenSearchException {
return createPit(fn.apply(new CreatePitRequest.Builder()).build());
}

// ----- Endpoint: delete

/**
Expand Down Expand Up @@ -389,6 +428,37 @@ public final DeleteResponse delete(Function<DeleteRequest.Builder, ObjectBuilder
return delete(fn.apply(new DeleteRequest.Builder()).build());
}

// ----- Endpoint: delete_point_in_time

/**
* Delete Point In Time
*
*
*/

public DeletePitResponse deletePit(DeletePitRequest request)
throws IOException, OpenSearchException {
@SuppressWarnings("unchecked")
JsonEndpoint<DeletePitRequest, DeletePitResponse, ErrorResponse> endpoint = (JsonEndpoint<DeletePitRequest, DeletePitResponse, ErrorResponse>) DeletePitRequest._ENDPOINT;

return this.transport.performRequest(request, endpoint, this.transportOptions);
}

/**
* Delete Point In Time
*
* @param fn
* a function that initializes a builder to create the
* {@link DeletePitRequest}
*
*/

public final DeletePitResponse deletePit(
Function<DeletePitRequest.Builder, ObjectBuilder<DeletePitRequest>> fn)
throws IOException, OpenSearchException {
return deletePit(fn.apply(new DeletePitRequest.Builder()).build());
}

// ----- Endpoint: delete_by_query

/**
Expand Down Expand Up @@ -790,6 +860,19 @@ public InfoResponse info() throws IOException, OpenSearchException {
return this.transport.performRequest(InfoRequest._INSTANCE, InfoRequest._ENDPOINT, this.transportOptions);
}

// ----- Endpoint: list_point_in_time

/**
* List all Point In Time
*
*
*/

public ListAllPitResponse listAllPit()
throws IOException, OpenSearchException {
return this.transport.performRequest(ListAllPitRequest._INSTANCE, ListAllPitRequest._ENDPOINT, this.transportOptions);
}

// ----- Endpoint: mget

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,20 @@ public CompletableFuture<NodesResponse> nodes() throws IOException, OpenSearchEx
this.transportOptions);
}

// ----- Endpoint: cat.point_in_time_segments

/**
* Provides low-level information about the disk utilization of a PIT by
* describing its Lucene segments.
*
*
*/
public CompletableFuture<SegmentsResponse> pitSegments() throws IOException, OpenSearchException {
return this.transport.performRequestAsync(new PitSegmentsRequest.Builder().build(),
PitSegmentsRequest._ENDPOINT,
this.transportOptions);
}

// ----- Endpoint: cat.pending_tasks

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,47 @@ public NodesResponse nodes() throws IOException, OpenSearchException {
this.transportOptions);
}

// ----- Endpoint: cat.point_in_time_segments

/**
* Provides low-level information about the disk utilization of a PIT by
* describing its Lucene segments.
*
*
*/
public SegmentsResponse pitSegments(PitSegmentsRequest request)
throws IOException, OpenSearchException {
@SuppressWarnings("unchecked")
JsonEndpoint<PitSegmentsRequest, SegmentsResponse, ErrorResponse> endpoint = (JsonEndpoint<PitSegmentsRequest, SegmentsResponse, ErrorResponse>) PitSegmentsRequest._ENDPOINT;

return this.transport.performRequest(request, endpoint, this.transportOptions);
}

/**
* Provides low-level information about the disk utilization of a PIT by
* describing its Lucene segments.
*
* * @param fn
* a function that initializes a builder to create the
* {@link PitSegmentsRequest}
*/

public final SegmentsResponse pitSegments(Function<PitSegmentsRequest.Builder, ObjectBuilder<PitSegmentsRequest>> fn)
throws IOException, OpenSearchException {
return pitSegments(fn.apply(new PitSegmentsRequest.Builder()).build());
}

/**
* Provides low-level information about the disk utilization of a PIT by
* describing its Lucene segments.
*
*/
public SegmentsResponse pitSegments() throws IOException, OpenSearchException {
return this.transport.performRequest(new PitSegmentsRequest.Builder().build(),
PitSegmentsRequest._ENDPOINT,
this.transportOptions);
}

// ----- Endpoint: cat.pending_tasks

/**
Expand Down
Loading

0 comments on commit 4ceb43b

Please sign in to comment.