Skip to content

Commit

Permalink
ML: fix delayed data annotations on secured cluster (#37193)
Browse files Browse the repository at this point in the history
* changing executing context for writing annotation

* adjusting user

* removing unused import
  • Loading branch information
benwtrent committed Jan 7, 2019
1 parent 6205a9c commit 4554369
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts;
import org.elasticsearch.xpack.core.ml.job.results.Bucket;
import org.elasticsearch.xpack.core.security.user.SystemUser;
import org.elasticsearch.xpack.core.security.user.XPackUser;
import org.elasticsearch.xpack.ml.datafeed.delayeddatacheck.DelayedDataDetector;
import org.elasticsearch.xpack.ml.datafeed.delayeddatacheck.DelayedDataDetectorFactory.BucketWithMissingData;
import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory;
Expand Down Expand Up @@ -226,22 +226,24 @@ private Annotation createAnnotation(Date startTime, Date endTime, String msg) {
Date currentTime = new Date(currentTimeSupplier.get());
return new Annotation(msg,
currentTime,
SystemUser.NAME,
XPackUser.NAME,
startTime,
endTime,
jobId,
currentTime,
SystemUser.NAME,
XPackUser.NAME,
"annotation");
}

private String addAndSetDelayedDataAnnotation(Annotation annotation) {
try (XContentBuilder xContentBuilder = annotation.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS)) {
IndexRequest request = new IndexRequest(AnnotationIndex.WRITE_ALIAS_NAME, ElasticsearchMappings.DOC_TYPE);
request.source(xContentBuilder);
IndexResponse response = client.index(request).actionGet();
lastDataCheckAnnotation = annotation;
return response.getId();
try (ThreadContext.StoredContext ignore = stashWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN)) {
IndexResponse response = client.index(request).actionGet();
lastDataCheckAnnotation = annotation;
return response.getId();
}
} catch (IOException ex) {
String errorMessage = "[" + jobId + "] failed to create annotation for delayed data checker.";
LOGGER.error(errorMessage, ex);
Expand All @@ -252,7 +254,7 @@ private String addAndSetDelayedDataAnnotation(Annotation annotation) {

private void updateAnnotation(Annotation annotation) {
Annotation updatedAnnotation = new Annotation(lastDataCheckAnnotation);
updatedAnnotation.setModifiedUsername(SystemUser.NAME);
updatedAnnotation.setModifiedUsername(XPackUser.NAME);
updatedAnnotation.setModifiedTime(new Date(currentTimeSupplier.get()));
updatedAnnotation.setAnnotation(annotation.getAnnotation());
updatedAnnotation.setTimestamp(annotation.getTimestamp());
Expand All @@ -261,8 +263,10 @@ private void updateAnnotation(Annotation annotation) {
IndexRequest indexRequest = new IndexRequest(AnnotationIndex.WRITE_ALIAS_NAME, ElasticsearchMappings.DOC_TYPE);
indexRequest.id(lastDataCheckAnnotationId);
indexRequest.source(xContentBuilder);
client.index(indexRequest).actionGet();
lastDataCheckAnnotation = updatedAnnotation;
try (ThreadContext.StoredContext ignore = stashWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN)) {
client.index(indexRequest).actionGet();
lastDataCheckAnnotation = updatedAnnotation;
}
} catch (IOException ex) {
String errorMessage = "[" + jobId + "] failed to update annotation for delayed data checker.";
LOGGER.error(errorMessage, ex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings;
import org.elasticsearch.xpack.core.ml.job.results.Bucket;
import org.elasticsearch.xpack.core.security.user.SystemUser;
import org.elasticsearch.xpack.core.security.user.XPackUser;
import org.elasticsearch.xpack.ml.datafeed.delayeddatacheck.DelayedDataDetector;
import org.elasticsearch.xpack.ml.datafeed.delayeddatacheck.DelayedDataDetectorFactory.BucketWithMissingData;
import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory;
Expand Down Expand Up @@ -273,12 +273,12 @@ public void testRealtimeRun() throws Exception {

Annotation expectedAnnotation = new Annotation(msg,
new Date(currentTime),
SystemUser.NAME,
XPackUser.NAME,
bucket.getTimestamp(),
new Date((bucket.getEpoch() + bucket.getBucketSpan()) * 1000),
jobId,
new Date(currentTime),
SystemUser.NAME,
XPackUser.NAME,
"annotation");

IndexRequest request = new IndexRequest(AnnotationIndex.WRITE_ALIAS_NAME, ElasticsearchMappings.DOC_TYPE);
Expand Down Expand Up @@ -314,7 +314,7 @@ public void testRealtimeRun() throws Exception {
Annotation updatedAnnotation = new Annotation(expectedAnnotation);
updatedAnnotation.setAnnotation(msg);
updatedAnnotation.setModifiedTime(new Date(currentTime));
updatedAnnotation.setModifiedUsername(SystemUser.NAME);
updatedAnnotation.setModifiedUsername(XPackUser.NAME);
updatedAnnotation.setEndTimestamp(new Date((bucket2.getEpoch() + bucket2.getBucketSpan()) * 1000));
try (XContentBuilder xContentBuilder = updatedAnnotation.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS)) {
indexRequest.source(xContentBuilder);
Expand Down

0 comments on commit 4554369

Please sign in to comment.