Skip to content

Commit

Permalink
feat(clients): expose waitForTasks to batch helpers [skip-bc] (#4030)
Browse files Browse the repository at this point in the history
  • Loading branch information
shortcuts authored Oct 28, 2024
1 parent 8876e4b commit 7f5fe6b
Show file tree
Hide file tree
Showing 17 changed files with 844 additions and 646 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/check.yml
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ jobs:
runs-on: ubuntu-22.04
timeout-minutes: 10
env:
CACHE_VERSION: 1.02 # bump this to run all clients on the CI.
CACHE_VERSION: 1.10 # bump this to run all clients on the CI.
steps:
- name: debugging - dump GitHub context
env:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,35 +166,38 @@ public partial interface ISearchClient
/// </summary>
/// <param name="indexName">The index in which to perform the request.</param>
/// <param name="objects">The list of `objects` to store in the given Algolia `indexName`.</param>
/// <param name="waitForTasks">Whether or not we should wait until every `batch` tasks has been processed, this operation may slow the total execution time of this method but is more reliable..</param>
/// <param name="options">Add extra http header or query parameters to Algolia.</param>
/// <param name="cancellationToken">Cancellation Token to cancel the request.</param>
/// <typeparam name="T"></typeparam>
Task<List<BatchResponse>> SaveObjectsAsync<T>(string indexName, IEnumerable<T> objects, RequestOptions options = null, CancellationToken cancellationToken = default) where T : class;
Task<List<BatchResponse>> SaveObjectsAsync<T>(string indexName, IEnumerable<T> objects, bool waitForTasks = false, RequestOptions options = null, CancellationToken cancellationToken = default) where T : class;
/// <inheritdoc cref="SaveObjectsAsync{T}(string, IEnumerable{T}, RequestOptions, CancellationToken)"/>
List<BatchResponse> SaveObjects<T>(string indexName, IEnumerable<T> objects, RequestOptions options = null, CancellationToken cancellationToken = default) where T : class;
List<BatchResponse> SaveObjects<T>(string indexName, IEnumerable<T> objects, bool waitForTasks = false, RequestOptions options = null, CancellationToken cancellationToken = default) where T : class;

/// <summary>
/// Helper: Deletes every records for the given objectIDs. The `chunkedBatch` helper is used under the hood, which creates a `batch` requests with at most 1000 objectIDs in it.
/// </summary>
/// <param name="indexName">The index in which to perform the request.</param>
/// <param name="objectIDs">The list of `objectIDs` to remove from the given Algolia `indexName`.</param>
/// <param name="waitForTasks">Whether or not we should wait until every `batch` tasks has been processed, this operation may slow the total execution time of this method but is more reliable..</param>
/// <param name="options">Add extra http header or query parameters to Algolia.</param>
/// <param name="cancellationToken">Cancellation Token to cancel the request.</param>
Task<List<BatchResponse>> DeleteObjectsAsync(string indexName, IEnumerable<String> objectIDs, RequestOptions options = null, CancellationToken cancellationToken = default);
Task<List<BatchResponse>> DeleteObjectsAsync(string indexName, IEnumerable<String> objectIDs, bool waitForTasks = false, RequestOptions options = null, CancellationToken cancellationToken = default);
/// <inheritdoc cref="DeleteObjectsAsync(string, IEnumerable{String}, RequestOptions, CancellationToken)"/>
List<BatchResponse> DeleteObjects(string indexName, IEnumerable<String> objectIDs, RequestOptions options = null, CancellationToken cancellationToken = default);
List<BatchResponse> DeleteObjects(string indexName, IEnumerable<String> objectIDs, bool waitForTasks = false, RequestOptions options = null, CancellationToken cancellationToken = default);

/// <summary>
/// Helper: Replaces object content of all the given objects according to their respective `objectID` field. The `chunkedBatch` helper is used under the hood, which creates a `batch` requests with at most 1000 objects in it.
/// </summary>
/// <param name="indexName">The index in which to perform the request.</param>
/// <param name="objects">The list of `objects` to update in the given Algolia `indexName`.</param>
/// <param name="createIfNotExists">To be provided if non-existing objects are passed, otherwise, the call will fail.</param>
/// <param name="waitForTasks">Whether or not we should wait until every `batch` tasks has been processed, this operation may slow the total execution time of this method but is more reliable..</param>
/// <param name="options">Add extra http header or query parameters to Algolia.</param>
/// <param name="cancellationToken">Cancellation Token to cancel the request.</param>
Task<List<BatchResponse>> PartialUpdateObjectsAsync<T>(string indexName, IEnumerable<T> objects, bool createIfNotExists, RequestOptions options = null, CancellationToken cancellationToken = default) where T : class;
Task<List<BatchResponse>> PartialUpdateObjectsAsync<T>(string indexName, IEnumerable<T> objects, bool createIfNotExists, bool waitForTasks = false, RequestOptions options = null, CancellationToken cancellationToken = default) where T : class;
/// <inheritdoc cref="PartialUpdateObjectsAsync{T}(string, IEnumerable{T}, bool, RequestOptions, CancellationToken)"/>
List<BatchResponse> PartialUpdateObjects<T>(string indexName, IEnumerable<T> objects, bool createIfNotExists, RequestOptions options = null, CancellationToken cancellationToken = default) where T : class;
List<BatchResponse> PartialUpdateObjects<T>(string indexName, IEnumerable<T> objects, bool createIfNotExists, bool waitForTasks = false, RequestOptions options = null, CancellationToken cancellationToken = default) where T : class;

/// <summary>
/// Helper: Check if an index exists.
Expand Down Expand Up @@ -564,42 +567,44 @@ public List<BatchResponse> ChunkedBatch<T>(string indexName, IEnumerable<T> obje

/// <inheritdoc/>
public async Task<List<BatchResponse>> SaveObjectsAsync<T>(string indexName, IEnumerable<T> objects,
bool waitForTasks = false,
RequestOptions options = null,
CancellationToken cancellationToken = default) where T : class
{
return await ChunkedBatchAsync(indexName, objects, Action.AddObject, false, 1000, options, cancellationToken).ConfigureAwait(false);
return await ChunkedBatchAsync(indexName, objects, Action.AddObject, waitForTasks, 1000, options, cancellationToken).ConfigureAwait(false);
}

/// <inheritdoc/>
public List<BatchResponse> SaveObjects<T>(string indexName, IEnumerable<T> objects, RequestOptions options = null,
public List<BatchResponse> SaveObjects<T>(string indexName, IEnumerable<T> objects, bool waitForTasks = false, RequestOptions options = null,
CancellationToken cancellationToken = default) where T : class =>
AsyncHelper.RunSync(() => SaveObjectsAsync(indexName, objects, options, cancellationToken));
AsyncHelper.RunSync(() => SaveObjectsAsync(indexName, objects, waitForTasks, options, cancellationToken));

/// <inheritdoc/>
public async Task<List<BatchResponse>> DeleteObjectsAsync(string indexName, IEnumerable<String> objectIDs,
bool waitForTasks = false,
RequestOptions options = null,
CancellationToken cancellationToken = default)
{
return await ChunkedBatchAsync(indexName, objectIDs.Select(id => new { objectID = id }), Action.DeleteObject, false, 1000, options, cancellationToken).ConfigureAwait(false);
return await ChunkedBatchAsync(indexName, objectIDs.Select(id => new { objectID = id }), Action.DeleteObject, waitForTasks, 1000, options, cancellationToken).ConfigureAwait(false);
}

/// <inheritdoc/>
public List<BatchResponse> DeleteObjects(string indexName, IEnumerable<String> objectIDs, RequestOptions options = null,
public List<BatchResponse> DeleteObjects(string indexName, IEnumerable<String> objectIDs, bool waitForTasks = false, RequestOptions options = null,
CancellationToken cancellationToken = default) =>
AsyncHelper.RunSync(() => DeleteObjectsAsync(indexName, objectIDs, options, cancellationToken));
AsyncHelper.RunSync(() => DeleteObjectsAsync(indexName, objectIDs, waitForTasks, options, cancellationToken));

/// <inheritdoc/>
public async Task<List<BatchResponse>> PartialUpdateObjectsAsync<T>(string indexName, IEnumerable<T> objects, bool createIfNotExists,
public async Task<List<BatchResponse>> PartialUpdateObjectsAsync<T>(string indexName, IEnumerable<T> objects, bool createIfNotExists, bool waitForTasks = false,
RequestOptions options = null,
CancellationToken cancellationToken = default) where T : class
{
return await ChunkedBatchAsync(indexName, objects, createIfNotExists ? Action.PartialUpdateObject : Action.PartialUpdateObjectNoCreate, false, 1000, options, cancellationToken).ConfigureAwait(false);
return await ChunkedBatchAsync(indexName, objects, createIfNotExists ? Action.PartialUpdateObject : Action.PartialUpdateObjectNoCreate, waitForTasks, 1000, options, cancellationToken).ConfigureAwait(false);
}

/// <inheritdoc/>
public List<BatchResponse> PartialUpdateObjects<T>(string indexName, IEnumerable<T> objects, bool createIfNotExists,
public List<BatchResponse> PartialUpdateObjects<T>(string indexName, IEnumerable<T> objects, bool createIfNotExists, bool waitForTasks = false,
RequestOptions options = null, CancellationToken cancellationToken = default) where T : class =>
AsyncHelper.RunSync(() => PartialUpdateObjectsAsync(indexName, objects, createIfNotExists, options, cancellationToken));
AsyncHelper.RunSync(() => PartialUpdateObjectsAsync(indexName, objects, createIfNotExists, waitForTasks, options, cancellationToken));

private static async Task<List<TU>> CreateIterable<TU>(Func<TU, Task<TU>> executeQuery,
Func<TU, bool> stopCondition)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -369,20 +369,22 @@ public suspend fun SearchClient.chunkedBatch(
*
* @param indexName The index in which to perform the request.
* @param objects The list of objects to index.
* @param waitForTask If true, wait for the task to complete.
* @param requestOptions The requestOptions to send along with the query, they will be merged with the transporter requestOptions.
* @return The list of responses from the batch requests.
*
*/
public suspend fun SearchClient.saveObjects(
indexName: String,
objects: List<JsonObject>,
waitForTask: Boolean = false,
requestOptions: RequestOptions? = null,
): List<BatchResponse> {
return this.chunkedBatch(
indexName = indexName,
objects = objects,
action = Action.AddObject,
waitForTask = false,
waitForTask = waitForTask,
batchSize = 1000,
requestOptions = requestOptions,
)
Expand All @@ -393,20 +395,22 @@ public suspend fun SearchClient.saveObjects(
*
* @param indexName The index in which to perform the request.
* @param objectIDs The list of objectIDs to delete from the index.
* @param waitForTask If true, wait for the task to complete.
* @param requestOptions The requestOptions to send along with the query, they will be merged with the transporter requestOptions.
* @return The list of responses from the batch requests.
*
*/
public suspend fun SearchClient.deleteObjects(
indexName: String,
objectIDs: List<String>,
waitForTask: Boolean = false,
requestOptions: RequestOptions? = null,
): List<BatchResponse> {
return this.chunkedBatch(
indexName = indexName,
objects = objectIDs.map { id -> JsonObject(mapOf("objectID" to Json.encodeToJsonElement(id))) },
action = Action.DeleteObject,
waitForTask = false,
waitForTask = waitForTask,
batchSize = 1000,
requestOptions = requestOptions,
)
Expand All @@ -418,6 +422,7 @@ public suspend fun SearchClient.deleteObjects(
* @param indexName The index in which to perform the request.
* @param objects The list of objects to update in the index.
* @param createIfNotExists To be provided if non-existing objects are passed, otherwise, the call will fail..
* @param waitForTask If true, wait for the task to complete.
* @param requestOptions The requestOptions to send along with the query, they will be merged with the transporter requestOptions.
* @return The list of responses from the batch requests.
*
Expand All @@ -426,13 +431,14 @@ public suspend fun SearchClient.partialUpdateObjects(
indexName: String,
objects: List<JsonObject>,
createIfNotExists: Boolean,
waitForTask: Boolean = false,
requestOptions: RequestOptions? = null,
): List<BatchResponse> {
return this.chunkedBatch(
indexName = indexName,
objects = objects,
action = if (createIfNotExists) Action.PartialUpdateObject else Action.PartialUpdateObjectNoCreate,
waitForTask = false,
waitForTask = waitForTask,
batchSize = 1000,
requestOptions = requestOptions,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,8 @@ package object extension {
* The index in which to perform the request.
* @param objects
* The list of objects to save.
* @param waitForTasks
* Whether to wait for the tasks to complete.
* @param requestOptions
* Additional request configuration.
* @return
Expand All @@ -259,9 +261,10 @@ package object extension {
def saveObjects(
indexName: String,
objects: Seq[Any],
waitForTasks: Boolean = false,
requestOptions: Option[RequestOptions] = None
)(implicit ec: ExecutionContext): Future[Seq[BatchResponse]] = {
chunkedBatch(indexName, objects, Action.AddObject, false, 1000, requestOptions)
chunkedBatch(indexName, objects, Action.AddObject, waitForTasks, 1000, requestOptions)
}

/** Helper: Deletes every objects for the given objectIDs. The `chunkedBatch` helper is used under the hood, which
Expand All @@ -271,6 +274,8 @@ package object extension {
* The index in which to perform the request.
* @param objectIDs
* The list of objectIDs to delete.
* @param waitForTasks
* Whether to wait for the tasks to complete.
* @param requestOptions
* Additional request configuration.
* @return
Expand All @@ -279,13 +284,14 @@ package object extension {
def deleteObjects(
indexName: String,
objectIDs: Seq[String],
waitForTasks: Boolean = false,
requestOptions: Option[RequestOptions] = None
)(implicit ec: ExecutionContext): Future[Seq[BatchResponse]] = {
chunkedBatch(
indexName,
objectIDs.map(id => new { val objectID: String = id }),
Action.DeleteObject,
false,
waitForTasks,
1000,
requestOptions
)
Expand All @@ -300,6 +306,8 @@ package object extension {
* The list of objects to save.
* @param createIfNotExists
* To be provided if non-existing objects are passed, otherwise, the call will fail.
* @param waitForTasks
* Whether to wait for the tasks to complete.
* @param requestOptions
* Additional request configuration.
* @return
Expand All @@ -309,13 +317,14 @@ package object extension {
indexName: String,
objects: Seq[Any],
createIfNotExists: Boolean = false,
waitForTasks: Boolean = false,
requestOptions: Option[RequestOptions] = None
)(implicit ec: ExecutionContext): Future[Seq[BatchResponse]] = {
chunkedBatch(
indexName,
objects,
if (createIfNotExists) Action.PartialUpdateObject else Action.PartialUpdateObjectNoCreate,
false,
waitForTasks,
1000,
requestOptions
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -463,18 +463,20 @@ public extension SearchClient {
/// which creates a `batch` requests with at most 1000 objects in it.
/// - parameter indexName: The name of the index where to save the objects
/// - parameter objects: The new objects
/// - parameter waitForTasks: If we should wait for the batch task to be finished before processing the next one
/// - parameter requestOptions: The request options
/// - returns: [BatchResponse]
func saveObjects(
indexName: String,
objects: [some Encodable],
waitForTasks: Bool = false,
requestOptions: RequestOptions? = nil
) async throws -> [BatchResponse] {
try await self.chunkedBatch(
indexName: indexName,
objects: objects,
action: .addObject,
waitForTasks: false,
waitForTasks: waitForTasks,
batchSize: 1000,
requestOptions: requestOptions
)
Expand All @@ -484,18 +486,20 @@ public extension SearchClient {
/// creates a `batch` requests with at most 1000 objectIDs in it.
/// - parameter indexName: The name of the index to delete objectIDs from
/// - parameter objectIDs: The objectIDs to delete
/// - parameter waitForTasks: If we should wait for the batch task to be finished before processing the next one
/// - parameter requestOptions: The request options
/// - returns: [BatchResponse]
func deleteObjects(
indexName: String,
objectIDs: [String],
waitForTasks: Bool = false,
requestOptions: RequestOptions? = nil
) async throws -> [BatchResponse] {
try await self.chunkedBatch(
indexName: indexName,
objects: objectIDs.map { AnyCodable(["objectID": $0]) },
action: .deleteObject,
waitForTasks: false,
waitForTasks: waitForTasks,
batchSize: 1000,
requestOptions: requestOptions
)
Expand All @@ -507,19 +511,21 @@ public extension SearchClient {
/// - parameter objects: The objects to update
/// - parameter createIfNotExists: To be provided if non-existing objects are passed, otherwise, the call will
/// fail..
/// - parameter waitForTasks: If we should wait for the batch task to be finished before processing the next one
/// - parameter requestOptions: The request options
/// - returns: [BatchResponse]
func partialUpdateObjects(
indexName: String,
objects: [some Encodable],
createIfNotExists: Bool = false,
waitForTasks: Bool = false,
requestOptions: RequestOptions? = nil
) async throws -> [BatchResponse] {
try await self.chunkedBatch(
indexName: indexName,
objects: objects,
action: createIfNotExists ? .partialUpdateObject : .partialUpdateObjectNoCreate,
waitForTasks: false,
waitForTasks: waitForTasks,
batchSize: 1000,
requestOptions: requestOptions
)
Expand Down
20 changes: 10 additions & 10 deletions playground/python/app/search.py
Original file line number Diff line number Diff line change
@@ -1,30 +1,30 @@
from asyncio import run
from os import environ

from algoliasearch.search.client import SearchClient
from algoliasearch.search.client import SearchClientSync
from algoliasearch.search import __version__
from dotenv import load_dotenv

load_dotenv("../.env")


async def main():
def main():
print("SearchClient version", __version__)

client = SearchClient(
client = SearchClientSync(
environ.get("ALGOLIA_APPLICATION_ID"), environ.get("ALGOLIA_ADMIN_KEY")
)
print("client initialized", client)

try:
resp = await client.search(search_method_params={
"requests": [{"indexName": "api-clients-automation"}]
})
print(resp.to_dict())
resp = client.save_objects("foo", [{"foo": "bar"}])
print(resp)

for r in resp:
client.wait_for_task(index_name="foo", task_id=r.task_id)
finally:
await client.close()
client.close()

print("client closed")


run(main())
main()
Loading

0 comments on commit 7f5fe6b

Please sign in to comment.