Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Managed streaming - better estimation for decision for streaming vs queuing #368

Open
wants to merge 24 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 20 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,18 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## Unknown
ohadbitt marked this conversation as resolved.
Show resolved Hide resolved
### Added
- A new heuristic for choosing between queuing and streaming in Managed streaming client, the default behavior is using
an estimation against the 4mb limit after dividing or multiplying by some factor described by the consts:

This will also allow users to stream bigger than 4mb non-compressed data
- disableRetries option to client options - default is true only for streaming clients as if stream is not repeatable it
will simply fail - better to let the user deal with it (alternative considered was to always create repeatable streams
like in managed streaming, but this PR also tries to avoid that)
### Fixed
- Some better error messages

## [5.2.0] - 2024-08-27
### Fixed
- Used Msal user prompt old code which is deprecated in the new version coming from last bom update resulted in method not found exception.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@
package com.microsoft.azure.kusto.data;

import com.microsoft.azure.kusto.data.auth.ConnectionStringBuilder;
import com.microsoft.azure.kusto.data.http.HttpClientFactory;
import org.apache.http.impl.client.CloseableHttpClient;

import java.net.URISyntaxException;
import java.util.Optional;

public class ClientFactory {
private ClientFactory() {
Expand Down Expand Up @@ -73,7 +75,9 @@ public static StreamingClient createStreamingClient(ConnectionStringBuilder csb)
* @throws URISyntaxException if the cluster URL is invalid
*/
public static StreamingClient createStreamingClient(ConnectionStringBuilder csb, HttpClientProperties properties) throws URISyntaxException {
return new ClientImpl(csb, properties);
HttpClientProperties httpClientProperties = Optional.ofNullable(properties)
ohadbitt marked this conversation as resolved.
Show resolved Hide resolved
ohadbitt marked this conversation as resolved.
Show resolved Hide resolved
.orElse(HttpClientProperties.builder().disableRetries().build());
return new ClientImpl(csb, HttpClientFactory.create(httpClientProperties), false);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import com.microsoft.azure.kusto.data.exceptions.DataServiceException;
import com.microsoft.azure.kusto.data.exceptions.KustoClientInvalidConnectionStringException;
import com.microsoft.azure.kusto.data.exceptions.KustoServiceQueryError;
import com.microsoft.azure.kusto.data.exceptions.ExceptionsUtils;
import com.microsoft.azure.kusto.data.http.HttpClientFactory;
import com.microsoft.azure.kusto.data.http.HttpPostUtils;
import com.microsoft.azure.kusto.data.http.UncloseableStream;
Expand Down Expand Up @@ -176,9 +177,9 @@ private KustoOperationResult executeImpl(String database, String command, Client
return new KustoOperationResult(response, clusterEndpoint.endsWith("v2/rest/query") ? "v2" : "v1");
} catch (KustoServiceQueryError e) {
throw new DataServiceException(clusterEndpoint,
"Error found while parsing json response as KustoOperationResult:" + e.getMessage(), e, e.isPermanent());
"Error found while parsing json response as KustoOperationResult:" + e, e, e.isPermanent());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So here you remove the getMessage and don't use getMessageEx - why? is it for sure not give us worse results?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see my answer there

} catch (Exception e) {
throw new DataClientException(clusterEndpoint, e.getMessage(), e);
throw new DataClientException(clusterEndpoint, ExceptionsUtils.getMessageEx(e), e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ public class HttpClientProperties {
private final HttpHost proxy;
private final HttpRoutePlanner routePlanner;
private final String[] supportedProtocols;
private final boolean disableRetries;

private HttpClientProperties(HttpClientPropertiesBuilder builder) {
this.maxIdleTime = builder.maxIdleTime;
Expand All @@ -26,6 +27,7 @@ private HttpClientProperties(HttpClientPropertiesBuilder builder) {
this.proxy = builder.proxy;
this.routePlanner = builder.routePlanner;
this.supportedProtocols = builder.supportedProtocols;
this.disableRetries = builder.disableRetries;
}

/**
Expand All @@ -48,7 +50,7 @@ public Integer maxIdleTime() {
}

/**
* Indicates whether a custom connection keep-alive time should be used. If set to {@code false}, the HTTP
* Indicates whether or not a custom connection keep-alive time should be used. If set to {@code false}, the HTTP
* client will use the default connection keep-alive strategy, which is to use only the server instructions
* (if any) set in the {@code Keep-Alive} response header.
* If set to {@code true}, the HTTP client will use a custom connection keep-alive strategy which uses the
Expand Down Expand Up @@ -111,6 +113,10 @@ public String[] supportedProtocols() {
return supportedProtocols;
}

public boolean isDisableRetries() {
return disableRetries;
}

public static class HttpClientPropertiesBuilder {

private Integer maxIdleTime = 120;
Expand All @@ -121,6 +127,7 @@ public static class HttpClientPropertiesBuilder {
private HttpHost proxy = null;
private HttpRoutePlanner routePlanner = null;
private String[] supportedProtocols = null;
private boolean disableRetries;

private HttpClientPropertiesBuilder() {
}
Expand Down Expand Up @@ -226,9 +233,18 @@ public HttpClientPropertiesBuilder supportedProtocols(String[] tlsProtocols) {
return this;
}

/**
* Disable all http client internal retries.
*
* @return the builder instance
*/
public HttpClientPropertiesBuilder disableRetries() {
this.disableRetries = true;
return this;
}

public HttpClientProperties build() {
return new HttpClientProperties(this);
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.microsoft.azure.kusto.data.http.HttpPostUtils;

class IngestionSourceStorage {
public String sourceUri;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import com.microsoft.azure.kusto.data.exceptions.DataClientException;

import com.microsoft.azure.kusto.data.exceptions.ExceptionsUtils;
import org.apache.http.client.HttpClient;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
Expand Down Expand Up @@ -32,7 +33,7 @@ protected String acquireAccessTokenImpl() throws DataClientException {
try {
return tokenProvider.apply(httpClient);
} catch (Exception e) {
throw new DataClientException(clusterUrl, e.getMessage(), e);
throw new DataClientException(clusterUrl, ExceptionsUtils.getMessageEx(e), e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import com.microsoft.azure.kusto.data.ExponentialRetry;
import com.microsoft.azure.kusto.data.Utils;
import com.microsoft.azure.kusto.data.exceptions.DataClientException;
import com.microsoft.azure.kusto.data.exceptions.ExceptionsUtils;
import com.microsoft.azure.kusto.data.http.HttpClientFactory;
import com.microsoft.azure.kusto.data.instrumentation.SupplierOneException;
import com.microsoft.azure.kusto.data.UriUtils;
Expand Down Expand Up @@ -104,7 +105,8 @@ public static CloudInfo retrieveCloudInfoForCluster(String clusterUrl,
throw new DataServiceException(clusterUrl, "URISyntaxException when trying to retrieve cluster metadata:" + e.getMessage(), e, true);
} catch (IOException ex) {
if (!Utils.isRetriableIOException(ex)) {
throw new DataServiceException(clusterUrl, "IOException when trying to retrieve cluster metadata:" + ex.getMessage(), ex,
throw new DataServiceException(clusterUrl, "IOException when trying to retrieve cluster metadata:" + ExceptionsUtils.getMessageEx(ex),
ex,
Utils.isRetriableIOException(ex));
}
} catch (DataServiceException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,7 @@ private static WellKnownKustoEndpointsData readInstance() {
return objectMapper.readValue(resourceAsStream, WellKnownKustoEndpointsData.class);
}
} catch (Exception ex) {
ex.printStackTrace();
throw new RuntimeException("Failed to read WellKnownKustoEndpoints.json", ex);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you want you can annotate with @NotNull

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

who? instance?
May I not?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the readInstance methods.

Just so callers can know to not expect null

you don't have to

}
return null;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package com.microsoft.azure.kusto.data.exceptions;

public class ExceptionsUtils {
// Useful in IOException, where message might not propagate to the base IOException
public static String getMessageEx(Exception e) {
AsafMah marked this conversation as resolved.
Show resolved Hide resolved
return (e.getMessage() == null && e.getCause() != null) ? e.getCause().getMessage() : e.getMessage();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ public static CloseableHttpClient create(HttpClientProperties providedProperties
.evictIdleConnections(properties.maxIdleTime(), TimeUnit.SECONDS)
.disableRedirectHandling();

if (properties.isDisableRetries()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is in the http level, which I think is completely separate from what we do in streaming ingest.
Did you check what it actually does and when it runs? I think it does save the request, which will work for the "normal" streaming client

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well unless i missed it - it didnt - and thats why i use it here

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe i should try and make it throw again

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you?

Copy link
Collaborator Author

@ohadbitt ohadbitt Oct 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did not manage - will remove this disable retries

httpClientBuilder.disableAutomaticRetries();
}
if (properties.isKeepAlive()) {
final ConnectionKeepAliveStrategy keepAliveStrategy = new CustomConnectionKeepAliveStrategy(properties.maxKeepAliveTime());
httpClientBuilder.setKeepAliveStrategy(keepAliveStrategy);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public static String post(CloseableHttpClient httpClient, String urlStr, Abstrac
}
}
} catch (IOException e) {
throw new DataServiceException(urlStr, "IOException in post request:" + e.getMessage(), !Utils.isRetriableIOException(e));
throw new DataServiceException(urlStr, "IOException in post request:" + ExceptionsUtils.getMessageEx(e), e, !Utils.isRetriableIOException(e));
}

return null;
Expand Down
1 change: 1 addition & 0 deletions ingest/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@
</ignoredUnusedDeclaredDependencies>
<ignoredUsedUndeclaredDependencies>
<dependency>io.projectreactor:reactor-core:jar</dependency>
<dependency>com.fasterxml.jackson.core:jackson-core:jar</dependency>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why? don't we have problems with jackson in general?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It just means we get it from Data

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was sure we removed it as a dep because of blacklisting, maybe just the core part is ok? this is confusing me

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

im not sure we can have anything good here
but anyway Spark today releases a shaded Jar - i hope this will obsolete those problems

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The issue was with one of our connectors, which blacklisted jackson. But maybe it was only jackson json and not jackson core. We'll see I guess

</ignoredUsedUndeclaredDependencies>
<ignoreNonCompile>true</ignoreNonCompile>
</configuration>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
package com.microsoft.azure.kusto.ingest;

import com.microsoft.azure.kusto.data.exceptions.ExceptionsUtils;
import com.microsoft.azure.kusto.ingest.source.CompressionType;
import org.apache.http.conn.util.InetAddressUtils;

import java.net.InetAddress;
import java.io.IOException;
import java.net.URI;
import java.net.UnknownHostException;
import com.microsoft.azure.kusto.data.instrumentation.SupplierTwoExceptions;
import com.microsoft.azure.kusto.data.instrumentation.TraceableAttributes;
import com.microsoft.azure.kusto.data.instrumentation.MonitoredActivity;
Expand Down Expand Up @@ -130,14 +130,20 @@ public IngestionResult ingestFromResultSet(ResultSetSourceInfo resultSetSourceIn
* @see IngestionProperties
*/
protected abstract IngestionResult ingestFromStreamImpl(StreamSourceInfo streamSourceInfo, IngestionProperties ingestionProperties)
throws IngestionClientException, IngestionServiceException;
throws IngestionClientException, IngestionServiceException, IOException;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Breaking change

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its not - its not public


public IngestionResult ingestFromStream(StreamSourceInfo streamSourceInfo, IngestionProperties ingestionProperties)
throws IngestionClientException, IngestionServiceException {
// trace ingestFromStream
return MonitoredActivity.invoke(
(SupplierTwoExceptions<IngestionResult, IngestionClientException, IngestionServiceException>) () -> ingestFromStreamImpl(streamSourceInfo,
ingestionProperties),
(SupplierTwoExceptions<IngestionResult, IngestionClientException, IngestionServiceException>) () -> {
try {
return ingestFromStreamImpl(streamSourceInfo,
ingestionProperties);
} catch (IOException e) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that you don't even throw IOException, in the end

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thats right -

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then why do you need it in the signature? Shouldn't it always be wrapped and then it's also not a breaking change

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

its in signature of the Impl ..

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't remember now - I think what I meant is that you can catch and throw our exception inside the impl.

EIther way it probably doesn't matter if the impl isn't public

throw new IngestionServiceException(ExceptionsUtils.getMessageEx(e), e);
}
},
getClientType().concat(".ingestFromStream"));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,10 @@ public IngestionMapping.IngestionMappingKind getIngestionMappingKind() {
public boolean isCompressible() {
return compressible;
}

public boolean isJsonFormat() {
return this.equals(JSON) || this.equals(MULTIJSON) || this.equals(SINGLEJSON);
}
}

public enum IngestionReportLevel {
Expand Down
Loading
Loading