1 | /* | |
2 | * Copyright OpenSearch Contributors | |
3 | * SPDX-License-Identifier: Apache-2.0 | |
4 | */ | |
5 | ||
6 | ||
7 | package org.opensearch.sql.planner.physical; | |
8 | ||
9 | import com.google.common.collect.ImmutableList; | |
10 | import java.util.Collections; | |
11 | import java.util.List; | |
12 | import java.util.Map; | |
13 | import java.util.concurrent.ConcurrentHashMap; | |
14 | import java.util.function.BiFunction; | |
15 | import java.util.function.Predicate; | |
16 | import lombok.EqualsAndHashCode; | |
17 | import lombok.Getter; | |
18 | import lombok.NonNull; | |
19 | import lombok.RequiredArgsConstructor; | |
20 | import org.opensearch.sql.data.model.ExprValue; | |
21 | import org.opensearch.sql.expression.Expression; | |
22 | import org.opensearch.sql.storage.bindingtuple.BindingTuple; | |
23 | ||
24 | /** | |
25 | * Dedupe operator. Dedupe the input {@link ExprValue} by using the {@link | |
26 | * DedupeOperator#dedupeList} The result order follow the input order. | |
27 | */ | |
28 | @Getter | |
29 | @EqualsAndHashCode(callSuper = false) | |
30 | public class DedupeOperator extends PhysicalPlan { | |
31 | @Getter | |
32 | private final PhysicalPlan input; | |
33 | @Getter | |
34 | private final List<Expression> dedupeList; | |
35 | @Getter | |
36 | private final Integer allowedDuplication; | |
37 | @Getter | |
38 | private final Boolean keepEmpty; | |
39 | @Getter | |
40 | private final Boolean consecutive; | |
41 | ||
42 | @EqualsAndHashCode.Exclude | |
43 | private final Deduper<List<ExprValue>> deduper; | |
44 | @EqualsAndHashCode.Exclude | |
45 | private ExprValue next; | |
46 | ||
47 | private static final Integer ALL_ONE_DUPLICATION = 1; | |
48 | private static final Boolean IGNORE_EMPTY = false; | |
49 | private static final Boolean NON_CONSECUTIVE = false; | |
50 |
3
1. lambda$static$0 : negated conditional → KILLED 2. lambda$static$0 : negated conditional → KILLED 3. lambda$static$0 : replaced boolean return with true for org/opensearch/sql/planner/physical/DedupeOperator::lambda$static$0 → KILLED |
private static final Predicate<ExprValue> NULL_OR_MISSING = v -> v.isNull() || v.isMissing(); |
51 | private static final Integer SEEN_FIRST_TIME = 1; | |
52 | ||
53 | @NonNull | |
54 | public DedupeOperator(PhysicalPlan input, List<Expression> dedupeList) { | |
55 | this(input, dedupeList, ALL_ONE_DUPLICATION, IGNORE_EMPTY, NON_CONSECUTIVE); | |
56 | } | |
57 | ||
58 | /** | |
59 | * Dedup Constructor. | |
60 | * @param input input {@link PhysicalPlan} | |
61 | * @param dedupeList list of dedupe {@link Expression} | |
62 | * @param allowedDuplication max allowed duplication | |
63 | * @param keepEmpty keep empty | |
64 | * @param consecutive consecutive mode | |
65 | */ | |
66 | @NonNull | |
67 | public DedupeOperator( | |
68 | PhysicalPlan input, | |
69 | List<Expression> dedupeList, | |
70 | Integer allowedDuplication, | |
71 | Boolean keepEmpty, | |
72 | Boolean consecutive) { | |
73 | this.input = input; | |
74 | this.dedupeList = dedupeList; | |
75 | this.allowedDuplication = allowedDuplication; | |
76 | this.keepEmpty = keepEmpty; | |
77 | this.consecutive = consecutive; | |
78 |
1
1. <init> : negated conditional → KILLED |
this.deduper = this.consecutive ? Deduper.consecutiveDeduper() : Deduper.historicalDeduper(); |
79 | } | |
80 | ||
81 | @Override | |
82 | public <R, C> R accept(PhysicalPlanNodeVisitor<R, C> visitor, C context) { | |
83 |
1
1. accept : replaced return value with null for org/opensearch/sql/planner/physical/DedupeOperator::accept → KILLED |
return visitor.visitDedupe(this, context); |
84 | } | |
85 | ||
86 | @Override | |
87 | public List<PhysicalPlan> getChild() { | |
88 |
1
1. getChild : replaced return value with Collections.emptyList for org/opensearch/sql/planner/physical/DedupeOperator::getChild → KILLED |
return Collections.singletonList(input); |
89 | } | |
90 | ||
91 | @Override | |
92 | public boolean hasNext() { | |
93 |
1
1. hasNext : negated conditional → KILLED |
while (input.hasNext()) { |
94 | ExprValue next = input.next(); | |
95 |
1
1. hasNext : negated conditional → KILLED |
if (keep(next)) { |
96 | this.next = next; | |
97 |
1
1. hasNext : replaced boolean return with false for org/opensearch/sql/planner/physical/DedupeOperator::hasNext → KILLED |
return true; |
98 | } | |
99 | } | |
100 |
1
1. hasNext : replaced boolean return with true for org/opensearch/sql/planner/physical/DedupeOperator::hasNext → TIMED_OUT |
return false; |
101 | } | |
102 | ||
103 | @Override | |
104 | public ExprValue next() { | |
105 |
1
1. next : replaced return value with null for org/opensearch/sql/planner/physical/DedupeOperator::next → KILLED |
return this.next; |
106 | } | |
107 | ||
108 | /** | |
109 | * Test the {@link ExprValue} should be keep or ignore | |
110 | * | |
111 | * <p>If any value evaluted by {@link DedupeOperator#dedupeList} is NULL or MISSING, then the * | |
112 | * return value is decided by keepEmpty option, default value is ignore. | |
113 | * | |
114 | * @param value {@link ExprValue}. | |
115 | * @return true: keep, false: ignore | |
116 | */ | |
117 | public boolean keep(ExprValue value) { | |
118 | BindingTuple bindingTuple = value.bindingTuples(); | |
119 | ImmutableList.Builder<ExprValue> dedupeKeyBuilder = new ImmutableList.Builder<>(); | |
120 | for (Expression expression : dedupeList) { | |
121 | ExprValue exprValue = expression.valueOf(bindingTuple); | |
122 |
1
1. keep : negated conditional → KILLED |
if (NULL_OR_MISSING.test(exprValue)) { |
123 |
2
1. keep : replaced boolean return with false for org/opensearch/sql/planner/physical/DedupeOperator::keep → KILLED 2. keep : replaced boolean return with true for org/opensearch/sql/planner/physical/DedupeOperator::keep → KILLED |
return keepEmpty; |
124 | } | |
125 | dedupeKeyBuilder.add(exprValue); | |
126 | } | |
127 | List<ExprValue> dedupeKey = dedupeKeyBuilder.build(); | |
128 | int seenTimes = deduper.seenTimes(dedupeKey); | |
129 |
3
1. keep : changed conditional boundary → KILLED 2. keep : negated conditional → KILLED 3. keep : replaced boolean return with true for org/opensearch/sql/planner/physical/DedupeOperator::keep → KILLED |
return seenTimes <= allowedDuplication; |
130 | } | |
131 | ||
132 | /** | |
133 | * Return how many times the dedupeKey has been seen before. The side effect is the seen times | |
134 | * will add 1 times after calling this function. | |
135 | * | |
136 | * @param <K> dedupe key | |
137 | */ | |
138 | @RequiredArgsConstructor | |
139 | static class Deduper<K> { | |
140 | private final BiFunction<Map<K, Integer>, K, Integer> seenFirstTime; | |
141 | private final Map<K, Integer> seenMap = new ConcurrentHashMap<>(); | |
142 | ||
143 | /** | |
144 | * The Historical Deduper monitor the duplicated element with all the seen value. | |
145 | */ | |
146 | public static <K> Deduper<K> historicalDeduper() { | |
147 |
1
1. historicalDeduper : replaced return value with null for org/opensearch/sql/planner/physical/DedupeOperator$Deduper::historicalDeduper → KILLED |
return new Deduper<>( |
148 | (map, key) -> { | |
149 | map.put(key, SEEN_FIRST_TIME); | |
150 |
1
1. lambda$historicalDeduper$0 : replaced Integer return value with 0 for org/opensearch/sql/planner/physical/DedupeOperator$Deduper::lambda$historicalDeduper$0 → KILLED |
return SEEN_FIRST_TIME; |
151 | }); | |
152 | } | |
153 | ||
154 | /** | |
155 | * The Consecutive Deduper monitor the duplicated element with consecutive seen value. It means | |
156 | * only the consecutive duplicated value will be counted. | |
157 | */ | |
158 | public static <K> Deduper<K> consecutiveDeduper() { | |
159 |
1
1. consecutiveDeduper : replaced return value with null for org/opensearch/sql/planner/physical/DedupeOperator$Deduper::consecutiveDeduper → KILLED |
return new Deduper<>( |
160 | (map, key) -> { | |
161 |
1
1. lambda$consecutiveDeduper$1 : removed call to java/util/Map::clear → KILLED |
map.clear(); |
162 | map.put(key, SEEN_FIRST_TIME); | |
163 |
1
1. lambda$consecutiveDeduper$1 : replaced Integer return value with 0 for org/opensearch/sql/planner/physical/DedupeOperator$Deduper::lambda$consecutiveDeduper$1 → KILLED |
return SEEN_FIRST_TIME; |
164 | }); | |
165 | } | |
166 | ||
167 | public int seenTimes(K dedupeKey) { | |
168 |
1
1. seenTimes : negated conditional → KILLED |
if (seenMap.containsKey(dedupeKey)) { |
169 |
3
1. lambda$seenTimes$2 : Replaced integer addition with subtraction → KILLED 2. lambda$seenTimes$2 : replaced Integer return value with 0 for org/opensearch/sql/planner/physical/DedupeOperator$Deduper::lambda$seenTimes$2 → KILLED 3. seenTimes : replaced int return with 0 for org/opensearch/sql/planner/physical/DedupeOperator$Deduper::seenTimes → KILLED |
return seenMap.computeIfPresent(dedupeKey, (k, v) -> v + 1); |
170 | } else { | |
171 |
1
1. seenTimes : replaced int return with 0 for org/opensearch/sql/planner/physical/DedupeOperator$Deduper::seenTimes → KILLED |
return seenFirstTime.apply(seenMap, dedupeKey); |
172 | } | |
173 | } | |
174 | } | |
175 | } | |
Mutations | ||
50 |
1.1 2.2 3.3 |
|
78 |
1.1 |
|
83 |
1.1 |
|
88 |
1.1 |
|
93 |
1.1 |
|
95 |
1.1 |
|
97 |
1.1 |
|
100 |
1.1 |
|
105 |
1.1 |
|
122 |
1.1 |
|
123 |
1.1 2.2 |
|
129 |
1.1 2.2 3.3 |
|
147 |
1.1 |
|
150 |
1.1 |
|
159 |
1.1 |
|
161 |
1.1 |
|
163 |
1.1 |
|
168 |
1.1 |
|
169 |
1.1 2.2 3.3 |
|
171 |
1.1 |