Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Managed streaming - better estimation for decision for streaming vs queuing #368

Open
wants to merge 24 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,15 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [6.0.0] - 2024-09-21
## [6.0.0] - 2024-10-27
### Added
- A new heuristic for choosing between queuing and streaming in Managed streaming client, the default behavior is using
an estimation against the 4mb limit after dividing or multiplying by some factor described by the consts:

This will also allow users to stream bigger than 4mb non-compressed data
### Fixed
- Some better error messages

### Changed
- Replaced Apache CloseableHttpClient with configurable azure-core client.
- [BREAKING] HttpClientFactory now accepts clients implementing azure-core HttpClient.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,6 @@ private String processResponseBody(HttpResponse response) throws DataServiceExce
}
}

protected InputStream postToStreamingOutput(HttpRequest request) throws DataServiceException {
return postToStreamingOutput(request, 0);
}

// Todo: Implement async version of this method
protected InputStream postToStreamingOutput(HttpRequest request, int redirectCount) throws DataServiceException {

Expand Down Expand Up @@ -143,7 +139,7 @@ public static DataServiceException createExceptionFromResponse(String url, HttpR
}

private static void closeResourcesIfNeeded(boolean returnInputStream, HttpResponse httpResponse) {
// If we close the resources after returning the InputStream to the user, he won't be able to read from it - used in streaming query
// If we close the resources after returning the InputStream to the user, he won't be able to read from it - used in streaming query
if (!returnInputStream) {
if (httpResponse != null) {
httpResponse.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public static StreamingClient createStreamingClient(ConnectionStringBuilder csb,
* is customized with the given properties.
*
* @param csb the connection string builder
* @param httpClient HTTP client
* @param httpClient HTTAP client
* @return a fully constructed {@linkplain StreamingClient} instance
* @throws URISyntaxException if the cluster URL is invalid
*/
Expand Down
15 changes: 11 additions & 4 deletions data/src/main/java/com/microsoft/azure/kusto/data/ClientImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,20 @@
import com.microsoft.azure.kusto.data.auth.TokenProviderBase;
import com.microsoft.azure.kusto.data.auth.TokenProviderFactory;
import com.microsoft.azure.kusto.data.auth.endpoints.KustoTrustedEndpoints;
import com.microsoft.azure.kusto.data.exceptions.*;
import com.microsoft.azure.kusto.data.http.*;
import com.microsoft.azure.kusto.data.instrumentation.*;
import com.microsoft.azure.kusto.data.req.KustoRequest;
import com.microsoft.azure.kusto.data.req.KustoRequestContext;
import com.microsoft.azure.kusto.data.res.JsonResult;
import com.microsoft.azure.kusto.data.exceptions.DataClientException;
import com.microsoft.azure.kusto.data.exceptions.DataServiceException;
import com.microsoft.azure.kusto.data.exceptions.KustoServiceQueryError;
import com.microsoft.azure.kusto.data.exceptions.ExceptionsUtils;
import com.microsoft.azure.kusto.data.http.HttpClientFactory;
import com.microsoft.azure.kusto.data.http.UncloseableStream;
import com.microsoft.azure.kusto.data.instrumentation.MonitoredActivity;
import com.microsoft.azure.kusto.data.instrumentation.SupplierTwoExceptions;
import com.microsoft.azure.kusto.data.instrumentation.TraceableAttributes;
import org.apache.commons.lang3.StringUtils;

import org.jetbrains.annotations.NotNull;
Expand Down Expand Up @@ -119,9 +127,9 @@ private KustoOperationResult processJsonResult(JsonResult res) throws DataServic
return new KustoOperationResult(res.getResult(), res.getEndpoint().endsWith("v2/rest/query") ? "v2" : "v1");
} catch (KustoServiceQueryError e) {
throw new DataServiceException(res.getEndpoint(),
"Error found while parsing json response as KustoOperationResult:" + e.getMessage(), e, e.isPermanent());
"Error found while parsing json response as KustoOperationResult:" + e, e, e.isPermanent());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So here you remove the getMessage and don't use getMessageEx - why? is it for sure not give us worse results?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see my answer there

} catch (Exception e) {
throw new DataClientException(res.getEndpoint(), e.getMessage(), e);
throw new DataClientException(res.getEndpoint(), ExceptionsUtils.getMessageEx(e), e);
}
}

Expand Down Expand Up @@ -367,5 +375,4 @@ public String getClusterUrl() {
ClientDetails getClientDetails() {
return clientDetails;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
public class ClientRequestProperties implements Serializable, TraceableAttributes {
public static final String OPTION_SERVER_TIMEOUT = "servertimeout";

// If set and positive, indicates the maximum number of HTTP redirects that the client will process. [Integer]
// If set and positive, indicates the maximum number of HTTP redirects that the client will process. [Integer]
public static final String OPTION_CLIENT_MAX_REDIRECT_COUNT = "client_max_redirect_count";
/*
* Matches valid Kusto Timespans: Optionally negative, optional number of days followed by a period, optionally up to 24 as hours followed by a colon,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import com.azure.core.http.HttpClient;
import com.microsoft.azure.kusto.data.exceptions.DataClientException;

import com.microsoft.azure.kusto.data.exceptions.ExceptionsUtils;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

Expand All @@ -32,7 +33,7 @@ protected String acquireAccessTokenImpl() throws DataClientException {
try {
return tokenProvider.apply(httpClient);
} catch (Exception e) {
throw new DataClientException(clusterUrl, e.getMessage(), e);
throw new DataClientException(clusterUrl, ExceptionsUtils.getMessageEx(e), e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import com.microsoft.azure.kusto.data.ExponentialRetry;
import com.microsoft.azure.kusto.data.Utils;
import com.microsoft.azure.kusto.data.exceptions.DataClientException;
import com.microsoft.azure.kusto.data.exceptions.ExceptionsUtils;
import com.microsoft.azure.kusto.data.http.HttpClientFactory;
import com.microsoft.azure.kusto.data.UriUtils;
import com.microsoft.azure.kusto.data.exceptions.DataServiceException;
Expand Down Expand Up @@ -100,7 +101,8 @@ public static CloudInfo retrieveCloudInfoForCluster(String clusterUrl,
throw new DataServiceException(clusterUrl, "URISyntaxException when trying to retrieve cluster metadata:" + e.getMessage(), e, true);
} catch (IOException ex) {
if (!Utils.isRetriableIOException(ex)) {
throw new DataServiceException(clusterUrl, "IOException when trying to retrieve cluster metadata:" + ex.getMessage(), ex,
throw new DataServiceException(clusterUrl, "IOException when trying to retrieve cluster metadata:" + ExceptionsUtils.getMessageEx(ex),
ex,
Utils.isRetriableIOException(ex));
}
} catch (DataServiceException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,7 @@ private static WellKnownKustoEndpointsData readInstance() {
return objectMapper.readValue(resourceAsStream, WellKnownKustoEndpointsData.class);
}
} catch (Exception ex) {
ex.printStackTrace();
throw new RuntimeException("Failed to read WellKnownKustoEndpoints.json", ex);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you want you can annotate with @NotNull

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

who? instance?
May I not?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the readInstance methods.

Just so callers can know to not expect null

you don't have to

}
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,10 @@ public OneApiError getApiError() {
try {
apiError = OneApiError.fromJsonObject(objectMapper.readTree(getMessage()).get("error"));
} catch (JsonProcessingException e) {
log.error("failed to parse error from message {} {} ", e.getMessage(), e);
log.error(String.format("failed to parse error from message: '%s' ", e.getMessage()), e);
}
}

return apiError;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package com.microsoft.azure.kusto.data.exceptions;

public class ExceptionsUtils {
// Useful in IOException, where message might not propagate to the base IOException
public static String getMessageEx(Exception e) {
AsafMah marked this conversation as resolved.
Show resolved Hide resolved
return (e.getMessage() == null && e.getCause() != null) ? e.getCause().getMessage() : e.getMessage();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,5 +67,4 @@ public static HttpClient create(HttpClientProperties properties) {

return HttpClient.createDefault(options);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Callable;

public class MonitoredActivity {
public static void invoke(Runnable runnable, String nameOfSpan) {
Expand Down
1 change: 1 addition & 0 deletions ingest/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@
<dependency>org.reactivestreams:reactive-streams:jar</dependency>
<dependency>com.microsoft.azure:msal4j:jar</dependency>
<dependency>io.projectreactor:reactor-core:jar</dependency>
<dependency>com.fasterxml.jackson.core:jackson-core:jar</dependency>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why? don't we have problems with jackson in general?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It just means we get it from Data

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was sure we removed it as a dep because of blacklisting, maybe just the core part is ok? this is confusing me

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

im not sure we can have anything good here
but anyway Spark today releases a shaded Jar - i hope this will obsolete those problems

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The issue was with one of our connectors, which blacklisted jackson. But maybe it was only jackson json and not jackson core. We'll see I guess

</ignoredUsedUndeclaredDependencies>
<ignoreNonCompile>true</ignoreNonCompile>
<ignoredNonTestScopedDependencies>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
package com.microsoft.azure.kusto.ingest;

import com.microsoft.azure.kusto.data.exceptions.ExceptionsUtils;
import com.microsoft.azure.kusto.ingest.source.CompressionType;
import org.apache.http.conn.util.InetAddressUtils;

import java.net.InetAddress;
import java.io.IOException;
import java.net.URI;
import java.net.UnknownHostException;
import com.microsoft.azure.kusto.data.instrumentation.SupplierTwoExceptions;
import com.microsoft.azure.kusto.data.instrumentation.TraceableAttributes;
import com.microsoft.azure.kusto.data.instrumentation.MonitoredActivity;
Expand Down Expand Up @@ -130,14 +130,20 @@ public IngestionResult ingestFromResultSet(ResultSetSourceInfo resultSetSourceIn
* @see IngestionProperties
*/
protected abstract IngestionResult ingestFromStreamImpl(StreamSourceInfo streamSourceInfo, IngestionProperties ingestionProperties)
throws IngestionClientException, IngestionServiceException;
throws IngestionClientException, IngestionServiceException, IOException;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Breaking change

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its not - its not public


public IngestionResult ingestFromStream(StreamSourceInfo streamSourceInfo, IngestionProperties ingestionProperties)
throws IngestionClientException, IngestionServiceException {
// trace ingestFromStream
return MonitoredActivity.invoke(
(SupplierTwoExceptions<IngestionResult, IngestionClientException, IngestionServiceException>) () -> ingestFromStreamImpl(streamSourceInfo,
ingestionProperties),
(SupplierTwoExceptions<IngestionResult, IngestionClientException, IngestionServiceException>) () -> {
try {
return ingestFromStreamImpl(streamSourceInfo,
ingestionProperties);
} catch (IOException e) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that you don't even throw IOException, in the end

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thats right -

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then why do you need it in the signature? Shouldn't it always be wrapped and then it's also not a breaking change

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

its in signature of the Impl ..

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't remember now - I think what I meant is that you can catch and throw our exception inside the impl.

EIther way it probably doesn't matter if the impl isn't public

throw new IngestionServiceException(ExceptionsUtils.getMessageEx(e), e);
}
},
getClientType().concat(".ingestFromStream"));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,10 @@ public IngestionMapping.IngestionMappingKind getIngestionMappingKind() {
public boolean isCompressible() {
return compressible;
}

public boolean isJsonFormat() {
return this.equals(JSON) || this.equals(MULTIJSON) || this.equals(SINGLEJSON);
}
}

public enum IngestionReportLevel {
Expand Down
Loading
Loading