Skip to content

Commit

Permalink
[INGEST] Interrupt the current thread if evaluation grok expressions …
Browse files Browse the repository at this point in the history
…take too long (#31024)

This adds a thread interrupter that allows us to encapsulate calls to org.joni.Matcher#search()
This method can hang forever if the regex expression is too complex.

The thread interrupter in the background checks every 3 seconds whether there are threads
execution the org.joni.Matcher#search() method for longer than 5 seconds and
if so interrupts these threads.

Joni has checks that that for every 30k iterations it checks if the current thread is interrupted and
if so returns org.joni.Matcher#INTERRUPTED

Closes #28731
  • Loading branch information
martijnvg authored Jun 12, 2018
1 parent 1dbe554 commit 6030d4b
Show file tree
Hide file tree
Showing 11 changed files with 385 additions and 53 deletions.
18 changes: 18 additions & 0 deletions docs/reference/ingest/ingest-node.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -1512,6 +1512,24 @@ The above request will return a response body containing a key-value representat

This can be useful to reference as the built-in patterns change across versions.

[[grok-watchdog]]
==== Grok watchdog

Grok expressions that take too long to execute are interrupted and
the grok processor then fails with an exception. The grok
processor has a watchdog thread that determines when evaluation of
a grok expression takes too long and is controlled by the following
settings:

[[grok-watchdog-options]]
.Grok watchdog settings
[options="header"]
|======
| Name | Default | Description
| `ingest.grok.watchdog.interval` | 1s | How often to check whether there are grok evaluations that take longer than the maximum allowed execution time.
| `ingest.grok.watchdog.max_execution_time` | 1s | The maximum allowed execution of a grok expression evaluation.
|======

[[gsub-processor]]
=== Gsub Processor
Converts a string field by applying a regular expression and a replacement.
Expand Down
53 changes: 41 additions & 12 deletions libs/grok/src/main/java/org/elasticsearch/grok/Grok.java
Original file line number Diff line number Diff line change
Expand Up @@ -76,15 +76,24 @@ public final class Grok {
private final Map<String, String> patternBank;
private final boolean namedCaptures;
private final Regex compiledExpression;
private final ThreadWatchdog threadWatchdog;

public Grok(Map<String, String> patternBank, String grokPattern) {
this(patternBank, grokPattern, true);
this(patternBank, grokPattern, true, ThreadWatchdog.noop());
}

@SuppressWarnings("unchecked")

public Grok(Map<String, String> patternBank, String grokPattern, ThreadWatchdog threadWatchdog) {
this(patternBank, grokPattern, true, threadWatchdog);
}

Grok(Map<String, String> patternBank, String grokPattern, boolean namedCaptures) {
this(patternBank, grokPattern, namedCaptures, ThreadWatchdog.noop());
}

private Grok(Map<String, String> patternBank, String grokPattern, boolean namedCaptures, ThreadWatchdog threadWatchdog) {
this.patternBank = patternBank;
this.namedCaptures = namedCaptures;
this.threadWatchdog = threadWatchdog;

for (Map.Entry<String, String> entry : patternBank.entrySet()) {
String name = entry.getKey();
Expand Down Expand Up @@ -163,7 +172,13 @@ public String toRegex(String grokPattern) {
byte[] grokPatternBytes = grokPattern.getBytes(StandardCharsets.UTF_8);
Matcher matcher = GROK_PATTERN_REGEX.matcher(grokPatternBytes);

int result = matcher.search(0, grokPatternBytes.length, Option.NONE);
int result;
try {
threadWatchdog.register();
result = matcher.search(0, grokPatternBytes.length, Option.NONE);
} finally {
threadWatchdog.unregister();
}
if (result != -1) {
Region region = matcher.getEagerRegion();
String namedPatternRef = groupMatch(NAME_GROUP, region, grokPattern);
Expand Down Expand Up @@ -205,7 +220,13 @@ public String toRegex(String grokPattern) {
*/
public boolean match(String text) {
Matcher matcher = compiledExpression.matcher(text.getBytes(StandardCharsets.UTF_8));
int result = matcher.search(0, text.length(), Option.DEFAULT);
int result;
try {
threadWatchdog.register();
result = matcher.search(0, text.length(), Option.DEFAULT);
} finally {
threadWatchdog.unregister();
}
return (result != -1);
}

Expand All @@ -220,8 +241,20 @@ public Map<String, Object> captures(String text) {
byte[] textAsBytes = text.getBytes(StandardCharsets.UTF_8);
Map<String, Object> fields = new HashMap<>();
Matcher matcher = compiledExpression.matcher(textAsBytes);
int result = matcher.search(0, textAsBytes.length, Option.DEFAULT);
if (result != -1 && compiledExpression.numberOfNames() > 0) {
int result;
try {
threadWatchdog.register();
result = matcher.search(0, textAsBytes.length, Option.DEFAULT);
} finally {
threadWatchdog.unregister();
}
if (result == Matcher.INTERRUPTED) {
throw new RuntimeException("grok pattern matching was interrupted after [" +
threadWatchdog.maxExecutionTimeInMillis() + "] ms");
} else if (result == Matcher.FAILED) {
// TODO: I think we should throw an error here?
return null;
} else if (compiledExpression.numberOfNames() > 0) {
Region region = matcher.getEagerRegion();
for (Iterator<NameEntry> entry = compiledExpression.namedBackrefIterator(); entry.hasNext();) {
NameEntry e = entry.next();
Expand All @@ -235,13 +268,9 @@ public Map<String, Object> captures(String text) {
break;
}
}

}
return fields;
} else if (result != -1) {
return fields;
}
return null;
return fields;
}

public static Map<String, String> getBuiltinPatterns() {
Expand Down
148 changes: 148 additions & 0 deletions libs/grok/src/main/java/org/elasticsearch/grok/ThreadWatchdog.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.grok;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledFuture;
import java.util.function.BiFunction;
import java.util.function.LongSupplier;

/**
* Protects against long running operations that happen between the register and unregister invocations.
* Threads that invoke {@link #register()}, but take too long to invoke the {@link #unregister()} method
* will be interrupted.
*
* This is needed for Joni's {@link org.joni.Matcher#search(int, int, int)} method, because
* it can end up spinning endlessly if the regular expression is too complex. Joni has checks
* that for every 30k iterations it checks if the current thread is interrupted and if so
* returns {@link org.joni.Matcher#INTERRUPTED}.
*/
public interface ThreadWatchdog {

/**
* Registers the current thread and interrupts the current thread
* if the takes too long for this thread to invoke {@link #unregister()}.
*/
void register();

/**
* @return The maximum allowed time in milliseconds for a thread to invoke {@link #unregister()}
* after {@link #register()} has been invoked before this ThreadWatchDog starts to interrupting that thread.
*/
long maxExecutionTimeInMillis();

/**
* Unregisters the current thread and prevents it from being interrupted.
*/
void unregister();

/**
* Returns an implementation that checks for each fixed interval if there are threads that have invoked {@link #register()}
* and not {@link #unregister()} and have been in this state for longer than the specified max execution interval and
* then interrupts these threads.
*
* @param interval The fixed interval to check if there are threads to interrupt
* @param maxExecutionTime The time a thread has the execute an operation.
* @param relativeTimeSupplier A supplier that returns relative time
* @param scheduler A scheduler that is able to execute a command for each fixed interval
*/
static ThreadWatchdog newInstance(long interval,
long maxExecutionTime,
LongSupplier relativeTimeSupplier,
BiFunction<Long, Runnable, ScheduledFuture<?>> scheduler) {
return new Default(interval, maxExecutionTime, relativeTimeSupplier, scheduler);
}

/**
* @return A noop implementation that does not interrupt threads and is useful for testing and pre-defined grok expressions.
*/
static ThreadWatchdog noop() {
return Noop.INSTANCE;
}

class Noop implements ThreadWatchdog {

private static final Noop INSTANCE = new Noop();

private Noop() {
}

@Override
public void register() {
}

@Override
public long maxExecutionTimeInMillis() {
return Long.MAX_VALUE;
}

@Override
public void unregister() {
}
}

class Default implements ThreadWatchdog {

private final long interval;
private final long maxExecutionTime;
private final LongSupplier relativeTimeSupplier;
private final BiFunction<Long, Runnable, ScheduledFuture<?>> scheduler;
final ConcurrentHashMap<Thread, Long> registry = new ConcurrentHashMap<>();

private Default(long interval,
long maxExecutionTime,
LongSupplier relativeTimeSupplier,
BiFunction<Long, Runnable, ScheduledFuture<?>> scheduler) {
this.interval = interval;
this.maxExecutionTime = maxExecutionTime;
this.relativeTimeSupplier = relativeTimeSupplier;
this.scheduler = scheduler;
scheduler.apply(interval, this::interruptLongRunningExecutions);
}

public void register() {
Long previousValue = registry.put(Thread.currentThread(), relativeTimeSupplier.getAsLong());
assert previousValue == null;
}

@Override
public long maxExecutionTimeInMillis() {
return maxExecutionTime;
}

public void unregister() {
Long previousValue = registry.remove(Thread.currentThread());
assert previousValue != null;
}

private void interruptLongRunningExecutions() {
final long currentRelativeTime = relativeTimeSupplier.getAsLong();
for (Map.Entry<Thread, Long> entry : registry.entrySet()) {
if ((currentRelativeTime - entry.getValue()) > maxExecutionTime) {
entry.getKey().interrupt();
// not removing the entry here, this happens in the unregister() method.
}
}
scheduler.apply(interval, this::interruptLongRunningExecutions);
}

}

}
38 changes: 31 additions & 7 deletions libs/grok/src/test/java/org/elasticsearch/grok/GrokTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,27 +20,24 @@
package org.elasticsearch.grok;

import org.elasticsearch.test.ESTestCase;
import org.junit.Before;

import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiFunction;

import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;


public class GrokTests extends ESTestCase {
private Map<String, String> basePatterns;

@Before
public void setup() {
basePatterns = Grok.getBuiltinPatterns();
}
private static final Map<String, String> basePatterns = Grok.getBuiltinPatterns();

public void testMatchWithoutCaptures() {
String line = "value";
Expand Down Expand Up @@ -415,4 +412,31 @@ public void testMultipleNamedCapturesWithSameName() {
expected.put("num", "1");
assertThat(grok.captures("12"), equalTo(expected));
}

public void testExponentialExpressions() {
AtomicBoolean run = new AtomicBoolean(true); // to avoid a lingering thread when test has completed

String grokPattern = "Bonsuche mit folgender Anfrage: Belegart->\\[%{WORD:param2},(?<param5>(\\s*%{NOTSPACE})*)\\] " +
"Zustand->ABGESCHLOSSEN Kassennummer->%{WORD:param9} Bonnummer->%{WORD:param10} Datum->%{DATESTAMP_OTHER:param11}";
String logLine = "Bonsuche mit folgender Anfrage: Belegart->[EINGESCHRAENKTER_VERKAUF, VERKAUF, NACHERFASSUNG] " +
"Zustand->ABGESCHLOSSEN Kassennummer->2 Bonnummer->6362 Datum->Mon Jan 08 00:00:00 UTC 2018";
BiFunction<Long, Runnable, ScheduledFuture<?>> scheduler = (delay, command) -> {
try {
Thread.sleep(delay);
} catch (InterruptedException e) {
throw new AssertionError(e);
}
Thread t = new Thread(() -> {
if (run.get()) {
command.run();
}
});
t.start();
return null;
};
Grok grok = new Grok(basePatterns, grokPattern, ThreadWatchdog.newInstance(10, 200, System::currentTimeMillis, scheduler));
Exception e = expectThrows(RuntimeException.class, () -> grok.captures(logLine));
run.set(false);
assertThat(e.getMessage(), equalTo("grok pattern matching was interrupted after [200] ms"));
}
}
Loading

0 comments on commit 6030d4b

Please sign in to comment.