Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into lucene_snapshot
Browse files Browse the repository at this point in the history
  • Loading branch information
elasticsearchmachine committed Aug 26, 2024
2 parents 8637199 + 785fe53 commit 1f3489e
Show file tree
Hide file tree
Showing 39 changed files with 773 additions and 230 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/111516.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 111516
summary: Adding support for `allow_partial_search_results` in PIT
area: Search
type: enhancement
issues: []
6 changes: 6 additions & 0 deletions docs/changelog/112139.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 112139
summary: Fix NPE when executing doc value queries over shape geometries with empty
segments
area: Geo
type: bug
issues: []
7 changes: 4 additions & 3 deletions docs/reference/ingest/processors/community-id.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,12 @@ configuration is required.
| `source_port` | no | `source.port` | Field containing the source port.
| `destination_ip` | no | `destination.ip` | Field containing the destination IP address.
| `destination_port` | no | `destination.port` | Field containing the destination port.
| `iana_number` | no | `network.iana_number` | Field containing the IANA number. The following protocol numbers are currently supported: `1` ICMP, `2` IGMP, `6` TCP, `17` UDP, `47` GRE, `58` ICMP IPv6, `88` EIGRP, `89` OSPF, `103` PIM, and `132` SCTP.
| `iana_number` | no | `network.iana_number` | Field containing the IANA number.
| `icmp_type` | no | `icmp.type` | Field containing the ICMP type.
| `icmp_code` | no | `icmp.code` | Field containing the ICMP code.
| `transport` | no | `network.transport` | Field containing the transport protocol.
Used only when the `iana_number` field is not present.
| `transport` | no | `network.transport` | Field containing the transport protocol name or number.
Used only when the `iana_number` field is not present. The following protocol names are currently supported:
`ICMP`, `IGMP`, `TCP`, `UDP`, `GRE`, `ICMP IPv6`, `EIGRP`, `OSPF`, `PIM`, and `SCTP`.
| `target_field` | no | `network.community_id` | Output field for the community ID.
| `seed` | no | `0` | Seed for the community ID hash. Must be between
0 and 65535 (inclusive). The seed can prevent hash collisions between network domains, such as
Expand Down
38 changes: 38 additions & 0 deletions docs/reference/search/point-in-time-api.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,44 @@ IMPORTANT: The open point in time request and each subsequent search request can
return different `id`; thus always use the most recently received `id` for the
next search request.

In addition to the `keep_alive` parameter, the `allow_partial_search_results` parameter
can also be defined.
This parameter determines whether the <<point-in-time-api, point in time (PIT)>>
should tolerate unavailable shards or <<shard-failures, shard failures>> when
initially creating the PIT.
If set to true, the PIT will be created with the available shards, along with a
reference to any missing ones.
If set to false, the operation will fail if any shard is unavailable.
The default value is false.

The PIT response includes a summary of the total number of shards, as well as the number
of successful shards when creating the PIT.

[source,console]
--------------------------------------------------
POST /my-index-000001/_pit?keep_alive=1m&allow_partial_search_results=true
--------------------------------------------------
// TEST[setup:my_index]

[source,js]
--------------------------------------------------
{
"id": "46ToAwMDaWR5BXV1aWQyKwZub2RlXzMAAAAAAAAAACoBYwADaWR4BXV1aWQxAgZub2RlXzEAAAAAAAAAAAEBYQADaWR5BXV1aWQyKgZub2RlXzIAAAAAAAAAAAwBYgACBXV1aWQyAAAFdXVpZDEAAQltYXRjaF9hbGw_gAAAAA=",
"_shards": {
"total": 10,
"successful": 10,
"skipped": 0,
"failed": 0
}
}
--------------------------------------------------
// NOTCONSOLE

When a PIT that contains shard failures is used in a search request, the missing are
always reported in the search response as a NoShardAvailableActionException exception.
To get rid of these exceptions, a new PIT needs to be created so that shards missing
from the previous PIT can be handled, assuming they become available in the meantime.

[[point-in-time-keep-alive]]
==== Keeping point in time alive
The `keep_alive` parameter, which is passed to a open point in time request and
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,9 +106,9 @@ The search response includes an array of `sort` values for each hit:
"_id" : "654322",
"_score" : null,
"_source" : ...,
"sort" : [
"sort" : [
1463538855,
"654322"
"654322"
]
},
{
Expand All @@ -118,7 +118,7 @@ The search response includes an array of `sort` values for each hit:
"_source" : ...,
"sort" : [ <1>
1463538857,
"654323"
"654323"
]
}
]
Expand Down Expand Up @@ -150,7 +150,7 @@ GET twitter/_search
--------------------------------------------------
//TEST[continued]

Repeat this process by updating the `search_after` array every time you retrieve a
Repeat this process by updating the `search_after` array every time you retrieve a
new page of results. If a <<near-real-time,refresh>> occurs between these requests,
the order of your results may change, causing inconsistent results across pages. To
prevent this, you can create a <<point-in-time-api,point in time (PIT)>> to
Expand All @@ -167,10 +167,12 @@ The API returns a PIT ID.
[source,console-result]
----
{
"id": "46ToAwMDaWR5BXV1aWQyKwZub2RlXzMAAAAAAAAAACoBYwADaWR4BXV1aWQxAgZub2RlXzEAAAAAAAAAAAEBYQADaWR5BXV1aWQyKgZub2RlXzIAAAAAAAAAAAwBYgACBXV1aWQyAAAFdXVpZDEAAQltYXRjaF9hbGw_gAAAAA=="
"id": "46ToAwMDaWR5BXV1aWQyKwZub2RlXzMAAAAAAAAAACoBYwADaWR4BXV1aWQxAgZub2RlXzEAAAAAAAAAAAEBYQADaWR5BXV1aWQyKgZub2RlXzIAAAAAAAAAAAwBYgACBXV1aWQyAAAFdXVpZDEAAQltYXRjaF9hbGw_gAAAAA==",
"_shards": ...
}
----
// TESTRESPONSE[s/"id": "46ToAwMDaWR5BXV1aWQyKwZub2RlXzMAAAAAAAAAACoBYwADaWR4BXV1aWQxAgZub2RlXzEAAAAAAAAAAAEBYQADaWR5BXV1aWQyKgZub2RlXzIAAAAAAAAAAAwBYgACBXV1aWQyAAAFdXVpZDEAAQltYXRjaF9hbGw_gAAAAA=="/"id": $body.id/]
// TESTRESPONSE[s/"_shards": \.\.\./"_shards": "$body._shards"/]

To get the first page of results, submit a search request with a `sort`
argument. If using a PIT, specify the PIT ID in the `pit.id` parameter and omit
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ public class MergingDigest extends AbstractTDigest {
private final int[] order;

// if true, alternate upward and downward merge passes
public boolean useAlternatingSort = false;
public boolean useAlternatingSort = true;
// if true, use higher working value of compression during construction, then reduce on presentation
public boolean useTwoLevelCompression = true;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.cluster.metadata.ComponentTemplate;
import org.elasticsearch.cluster.metadata.ComposableIndexTemplate;
import org.elasticsearch.cluster.metadata.Template;
Expand All @@ -35,6 +36,7 @@
import org.elasticsearch.index.query.RangeQueryBuilder;
import org.elasticsearch.indices.InvalidIndexTemplateException;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.test.ESSingleNodeTestCase;
import org.elasticsearch.test.InternalSettingsPlugin;
Expand Down Expand Up @@ -457,6 +459,16 @@ public void testTrimId() throws Exception {
indexName = bulkResponse.getItems()[0].getIndex();
}
client().admin().indices().refresh(new RefreshRequest(dataStreamName)).actionGet();

// In rare cases we can end up with a single segment shard, which means we can't trim away the _id later.
// So update an existing doc to create a new segment without adding a new document after force merging:
var indexRequest = new IndexRequest(indexName).setIfPrimaryTerm(1L)
.setIfSeqNo((numBulkRequests * numDocsPerBulk) - 1)
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
indexRequest.source(DOC.replace("$time", formatInstant(time.minusMillis(1))), XContentType.JSON);
var res = client().index(indexRequest).actionGet();
assertThat(res.status(), equalTo(RestStatus.OK));
assertThat(res.getVersion(), equalTo(2L));
}

// Check whether there are multiple segments:
Expand Down Expand Up @@ -494,7 +506,7 @@ public void testTrimId() throws Exception {
assertThat(retentionLeasesStats.retentionLeases().leases(), hasSize(1));
assertThat(
retentionLeasesStats.retentionLeases().leases().iterator().next().retainingSequenceNumber(),
equalTo((long) numBulkRequests * numDocsPerBulk)
equalTo((long) numBulkRequests * numDocsPerBulk + 1)
);
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ private static Flow buildFlow(
}
flow.protocol = Transport.fromObject(protocol);

switch (flow.protocol) {
switch (flow.protocol.getType()) {
case Tcp, Udp, Sctp -> {
flow.sourcePort = parseIntFromObjectOrString(sourcePort.get(), "source port");
if (flow.sourcePort < 1 || flow.sourcePort > 65535) {
Expand Down Expand Up @@ -336,12 +336,12 @@ public CommunityIdProcessor create(
*/
public static final class Flow {

private static final List<Transport> TRANSPORTS_WITH_PORTS = List.of(
Transport.Tcp,
Transport.Udp,
Transport.Sctp,
Transport.Icmp,
Transport.IcmpIpV6
private static final List<Transport.Type> TRANSPORTS_WITH_PORTS = List.of(
Transport.Type.Tcp,
Transport.Type.Udp,
Transport.Type.Sctp,
Transport.Type.Icmp,
Transport.Type.IcmpIpV6
);

InetAddress source;
Expand All @@ -362,20 +362,21 @@ boolean isOrdered() {
}

byte[] toBytes() {
boolean hasPort = TRANSPORTS_WITH_PORTS.contains(protocol);
Transport.Type protoType = protocol.getType();
boolean hasPort = TRANSPORTS_WITH_PORTS.contains(protoType);
int len = source.getAddress().length + destination.getAddress().length + 2 + (hasPort ? 4 : 0);
ByteBuffer bb = ByteBuffer.allocate(len);

boolean isOneWay = false;
if (protocol == Transport.Icmp || protocol == Transport.IcmpIpV6) {
if (protoType == Transport.Type.Icmp || protoType == Transport.Type.IcmpIpV6) {
// ICMP protocols populate port fields with ICMP data
Integer equivalent = IcmpType.codeEquivalent(icmpType, protocol == Transport.IcmpIpV6);
Integer equivalent = IcmpType.codeEquivalent(icmpType, protoType == Transport.Type.IcmpIpV6);
isOneWay = equivalent == null;
sourcePort = icmpType;
destinationPort = equivalent == null ? icmpCode : equivalent;
}

boolean keepOrder = isOrdered() || ((protocol == Transport.Icmp || protocol == Transport.IcmpIpV6) && isOneWay);
boolean keepOrder = isOrdered() || ((protoType == Transport.Type.Icmp || protoType == Transport.Type.IcmpIpV6) && isOneWay);
bb.put(keepOrder ? source.getAddress() : destination.getAddress());
bb.put(keepOrder ? destination.getAddress() : source.getAddress());
bb.put(toUint16(protocol.getTransportNumber() << 8));
Expand All @@ -397,68 +398,99 @@ String toCommunityId(byte[] seed) {
}
}

public enum Transport {
Icmp(1),
Igmp(2),
Tcp(6),
Udp(17),
Gre(47),
IcmpIpV6(58),
Eigrp(88),
Ospf(89),
Pim(103),
Sctp(132);

private final int transportNumber;
static class Transport {
public enum Type {
Unknown(-1),
Icmp(1),
Igmp(2),
Tcp(6),
Udp(17),
Gre(47),
IcmpIpV6(58),
Eigrp(88),
Ospf(89),
Pim(103),
Sctp(132);

private final int transportNumber;

private static final Map<String, Type> TRANSPORT_NAMES;

static {
TRANSPORT_NAMES = new HashMap<>();
TRANSPORT_NAMES.put("icmp", Icmp);
TRANSPORT_NAMES.put("igmp", Igmp);
TRANSPORT_NAMES.put("tcp", Tcp);
TRANSPORT_NAMES.put("udp", Udp);
TRANSPORT_NAMES.put("gre", Gre);
TRANSPORT_NAMES.put("ipv6-icmp", IcmpIpV6);
TRANSPORT_NAMES.put("icmpv6", IcmpIpV6);
TRANSPORT_NAMES.put("eigrp", Eigrp);
TRANSPORT_NAMES.put("ospf", Ospf);
TRANSPORT_NAMES.put("pim", Pim);
TRANSPORT_NAMES.put("sctp", Sctp);
}

private static final Map<String, Transport> TRANSPORT_NAMES;
Type(int transportNumber) {
this.transportNumber = transportNumber;
}

static {
TRANSPORT_NAMES = new HashMap<>();
TRANSPORT_NAMES.put("icmp", Icmp);
TRANSPORT_NAMES.put("igmp", Igmp);
TRANSPORT_NAMES.put("tcp", Tcp);
TRANSPORT_NAMES.put("udp", Udp);
TRANSPORT_NAMES.put("gre", Gre);
TRANSPORT_NAMES.put("ipv6-icmp", IcmpIpV6);
TRANSPORT_NAMES.put("icmpv6", IcmpIpV6);
TRANSPORT_NAMES.put("eigrp", Eigrp);
TRANSPORT_NAMES.put("ospf", Ospf);
TRANSPORT_NAMES.put("pim", Pim);
TRANSPORT_NAMES.put("sctp", Sctp);
public int getTransportNumber() {
return transportNumber;
}
}

Transport(int transportNumber) {
private Type type;
private int transportNumber;

Transport(int transportNumber, Type type) { // Change constructor to public
this.transportNumber = transportNumber;
this.type = type;
}

Transport(Type type) { // Change constructor to public
this.transportNumber = type.getTransportNumber();
this.type = type;
}

public Type getType() {
return this.type;
}

public int getTransportNumber() {
return transportNumber;
}

public static Transport fromNumber(int transportNumber) {
return switch (transportNumber) {
case 1 -> Icmp;
case 2 -> Igmp;
case 6 -> Tcp;
case 17 -> Udp;
case 47 -> Gre;
case 58 -> IcmpIpV6;
case 88 -> Eigrp;
case 89 -> Ospf;
case 103 -> Pim;
case 132 -> Sctp;
default -> throw new IllegalArgumentException("unknown transport protocol number [" + transportNumber + "]");
if (transportNumber < 0 || transportNumber >= 255) {
// transport numbers range https://www.iana.org/assignments/protocol-numbers/protocol-numbers.xhtml
throw new IllegalArgumentException("invalid transport protocol number [" + transportNumber + "]");
}

Type type = switch (transportNumber) {
case 1 -> Type.Icmp;
case 2 -> Type.Igmp;
case 6 -> Type.Tcp;
case 17 -> Type.Udp;
case 47 -> Type.Gre;
case 58 -> Type.IcmpIpV6;
case 88 -> Type.Eigrp;
case 89 -> Type.Ospf;
case 103 -> Type.Pim;
case 132 -> Type.Sctp;
default -> Type.Unknown;
};

return new Transport(transportNumber, type);
}

public static Transport fromObject(Object o) {
if (o instanceof Number number) {
return fromNumber(number.intValue());
} else if (o instanceof String protocolStr) {
// check if matches protocol name
if (TRANSPORT_NAMES.containsKey(protocolStr.toLowerCase(Locale.ROOT))) {
return TRANSPORT_NAMES.get(protocolStr.toLowerCase(Locale.ROOT));
if (Type.TRANSPORT_NAMES.containsKey(protocolStr.toLowerCase(Locale.ROOT))) {
return new Transport(Type.TRANSPORT_NAMES.get(protocolStr.toLowerCase(Locale.ROOT)));
}

// check if convertible to protocol number
Expand Down
Loading

0 comments on commit 1f3489e

Please sign in to comment.