Skip to content

Commit

Permalink
fix: min and max should use proper offsets (#41)
Browse files Browse the repository at this point in the history
  • Loading branch information
tylerburdsall authored Apr 2, 2024
1 parent f9b0a62 commit 81dc443
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -286,13 +286,13 @@ sealed interface Aggregation {
// now just re-ingest
for ((oldIndex, count) in oldPositives.withIndex()) {
if (0 < count) {
val value = lowerBoundary(oldScale, (oldBucketStartOffset + oldIndex.toUInt()))
val value = lowerBoundary(oldScale, oldBucketStartOffset, oldIndex.toUInt())
accumulateCount(value, count)
}
}
for ((oldIndex, count) in oldNegatives.withIndex()) {
if (0 < count) {
val value = -lowerBoundary(oldScale, (oldBucketStartOffset + oldIndex.toUInt()))
val value = -lowerBoundary(oldScale, oldBucketStartOffset, oldIndex.toUInt())
accumulateCount(value, count)
}
}
Expand Down Expand Up @@ -340,18 +340,18 @@ sealed interface Aggregation {
* This is an approximation, just using the positive buckets for the sum.
*/
fun sum(): Double = synchronized(lock) {
positiveBuckets.mapIndexed { index, count -> lowerBoundary(actualScale.toInt(), index.toUInt()) * count }.sum()
positiveBuckets.mapIndexed { index, count -> lowerBoundary(actualScale.toInt(), bucketStartOffset, index.toUInt()) * count }.sum()
}

/**
* This is an approximation, just using the positive buckets for the min.
*/
fun min(): Double = synchronized(lock) {
return positiveBuckets.withIndex().firstOrNull { 0 < it.value }?.let { lowerBoundary(actualScale.toInt(), it.index.toUInt()) } ?: 0.0
return positiveBuckets.withIndex().firstOrNull { 0 < it.value }?.let { lowerBoundary(actualScale.toInt(), bucketStartOffset, it.index.toUInt()) } ?: 0.0
}

fun max(): Double = synchronized(lock) {
return positiveBuckets.withIndex().lastOrNull { 0 < it.value }?.let { lowerBoundary(actualScale.toInt(), it.index.toUInt()) } ?: 0.0
return positiveBuckets.withIndex().lastOrNull { 0 < it.value }?.let { lowerBoundary(actualScale.toInt(), bucketStartOffset, it.index.toUInt()) } ?: 0.0
}

fun scale(): Int = synchronized(lock) { actualScale.toInt() }
Expand All @@ -375,12 +375,12 @@ sealed interface Aggregation {
fun valueCounts(): Sequence<Pair<Double, Long>> = synchronized(lock) {
return this.negativeBuckets.mapIndexed { index, count ->
Pair(
lowerBoundary(actualScale.toInt(), bucketStartOffset + index.toUInt()),
lowerBoundary(actualScale.toInt(), bucketStartOffset, index.toUInt()),
count
)
}.asSequence() + this.positiveBuckets.mapIndexed { index, count ->
Pair(
lowerBoundary(actualScale.toInt(), bucketStartOffset + index.toUInt()),
lowerBoundary(actualScale.toInt(), bucketStartOffset, index.toUInt()),
count
)
}.asSequence()
Expand Down Expand Up @@ -426,9 +426,9 @@ fun mapValueToScaleIndex(scale: Int, rawValue: Number): UInt {
* > their absolute value into the negative range using the same scale as the positive range. Note that in the
* > negative range, therefore, histogram buckets use lower-inclusive boundaries.
*/
fun lowerBoundary(scale: Number, index: UInt): Double {
fun lowerBoundary(scale: Number, offset: UInt, index: UInt): Double {
val inverseScaleFactor = LN_2 * 2.0.pow(-scale.toInt())
return exp(index.toDouble() * inverseScaleFactor)
return exp((offset + index).toDouble() * inverseScaleFactor)
}

fun UInt.saturatingSub(other: UInt): UInt {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,16 @@ internal class AggregatorTest {

private fun assertValueLowerBoundary(exponentialHistogram: Aggregation.ExponentialHistogram, value: Number, expectedLowerBoundary: Number) {
val observedIndex = mapValueToScaleIndex(exponentialHistogram.scale(), value.toDouble())
val observedBoundary = lowerBoundary(exponentialHistogram.scale(), observedIndex)
val observedBoundary = lowerBoundary(exponentialHistogram.scale(), 0u, observedIndex)
assertEqualsEpsilon(expectedLowerBoundary.toDouble(), observedBoundary, "boundary matches")
if (0 < observedIndex.toInt()) {
val observedOffsetBoundary = lowerBoundary(exponentialHistogram.scale(), 1u, observedIndex - 1u)
assertEqualsEpsilon(
observedBoundary,
observedOffsetBoundary,
"offset must result in the same boundary"
)
}
}

@BeforeTest
Expand Down Expand Up @@ -242,8 +250,8 @@ internal class AggregatorTest {
assertValueLowerBoundary(e, 24_040_000, 23984931.775)
assertValueLowerBoundary(e, 24_050_000, 24049961.522)

assertEqualsEpsilon(19313750.368, lowerBoundary(8, 6196u), "lower boundary of histogram")
assertEqualsEpsilon(29785874.896, lowerBoundary(8, 6196u + 160u), "upper boundary of histogram")
assertEqualsEpsilon(19313750.368, lowerBoundary(8, 0u, 6196u), "lower boundary of histogram")
assertEqualsEpsilon(29785874.896, lowerBoundary(8, 0u, 6196u + 160u), "upper boundary of histogram")

// Accumulate some data in a bucket's range
for (i in 0 until 40_000) {
Expand All @@ -270,7 +278,7 @@ internal class AggregatorTest {
assertEquals(83, e.positiveBuckets.size, "bucket count should not increase when a new bucket value is observed within the covered range")
assertEquals(1, e.positiveBuckets[0], "index 0 has a new count")
assertEquals(6195, e.bucketStartOffset(), "bucket start offset changes because we rotated down 1 position")
assertEqualsEpsilon(29705335.561, lowerBoundary(8, 6195u + 160u), "new upper boundary of histogram")
assertEqualsEpsilon(29705335.561, lowerBoundary(8, 0u, 6195u + 160u), "new upper boundary of histogram")

//
// -------- Expand histogram range with a big number --------
Expand All @@ -287,9 +295,9 @@ internal class AggregatorTest {
//

val recursiveScaleStartCount = e.count()
assertEquals(2199023255551.996, lowerBoundary(2, 164u), "this value gets us down into scale 2")
assertEqualsEpsilon(4.0, lowerBoundary(2, 8u), "this value gets us down into scale 2")
assertEqualsEpsilon(4.757, lowerBoundary(2, 9u), "this value gets us down into scale 2")
assertEquals(2199023255551.996, lowerBoundary(2, 0u, 164u), "this value gets us down into scale 2")
assertEqualsEpsilon(4.0, lowerBoundary(2, 0u, 8u), "this value gets us down into scale 2")
assertEqualsEpsilon(4.757, lowerBoundary(2, 0u, 9u), "this value gets us down into scale 2")
// pin the bucket's low value, at scale 2's index 8. It's not in scale 2 yet though!
e.accumulate(4.25)
// now blow the range wide, way past scale 7, resulting in a recursive scale down from 7 to precision 2.
Expand Down

0 comments on commit 81dc443

Please sign in to comment.