Skip to content

Commit

Permalink
format
Browse files Browse the repository at this point in the history
Signed-off-by: Gregor Zeitlinger <gregor.zeitlinger@grafana.com>
  • Loading branch information
zeitlinger committed Oct 10, 2024
1 parent 7c683e3 commit 7278830
Showing 1 changed file with 90 additions and 89 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package io.prometheus.metrics.core.metrics;

import io.prometheus.metrics.model.snapshots.DataPointSnapshot;

import java.util.Arrays;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
Expand All @@ -12,102 +11,104 @@

/**
* Metrics support concurrent write and scrape operations.
* <p>
* This is implemented by switching to a Buffer when the scrape starts,
* and applying the values from the buffer after the scrape ends.
*
* <p>This is implemented by switching to a Buffer when the scrape starts, and applying the values
* from the buffer after the scrape ends.
*/
class Buffer {

private static final long bufferActiveBit = 1L << 63;
private final AtomicLong observationCount = new AtomicLong(0);
private double[] observationBuffer = new double[0];
private int bufferPos = 0;
private boolean reset = false;

ReentrantLock appendLock = new ReentrantLock();
ReentrantLock runLock = new ReentrantLock();
Condition bufferFilled = appendLock.newCondition();

boolean append(double value) {
long count = observationCount.incrementAndGet();
if ((count & bufferActiveBit) == 0) {
return false; // sign bit not set -> buffer not active.
private static final long bufferActiveBit = 1L << 63;
private final AtomicLong observationCount = new AtomicLong(0);
private double[] observationBuffer = new double[0];
private int bufferPos = 0;
private boolean reset = false;

ReentrantLock appendLock = new ReentrantLock();
ReentrantLock runLock = new ReentrantLock();
Condition bufferFilled = appendLock.newCondition();

boolean append(double value) {
long count = observationCount.incrementAndGet();
if ((count & bufferActiveBit) == 0) {
return false; // sign bit not set -> buffer not active.
} else {
doAppend(value);
return true;
}
}

private void doAppend(double amount) {
try {
appendLock.lock();
if (bufferPos >= observationBuffer.length) {
observationBuffer = Arrays.copyOf(observationBuffer, observationBuffer.length + 128);
}
observationBuffer[bufferPos] = amount;
bufferPos++;

bufferFilled.signalAll();
} finally {
appendLock.unlock();
}
}

/** Must be called by the runnable in the run() method. */
void reset() {
reset = true;
}

<T extends DataPointSnapshot> T run(
Function<Long, Boolean> complete,
Supplier<T> createResult,
Consumer<Double> observeFunction) {
double[] buffer;
int bufferSize;
T result;
try {
runLock.lock();

// Signal that the buffer is active.
Long expectedCount = observationCount.getAndAdd(bufferActiveBit);
try {
appendLock.lock();

while (!complete.apply(expectedCount)) {
// Wait until all in-flight threads have added their observations to the buffer.
bufferFilled.await();
}
result = createResult.get();

// Signal that the buffer is inactive.
int expectedBufferSize;
if (reset) {
expectedBufferSize =
(int) ((observationCount.getAndSet(0) & ~bufferActiveBit) - expectedCount);
reset = false;
} else {
doAppend(value);
return true;
expectedBufferSize = (int) (observationCount.addAndGet(bufferActiveBit) - expectedCount);
}
}

private void doAppend(double amount) {
try {
appendLock.lock();
if (bufferPos >= observationBuffer.length) {
observationBuffer = Arrays.copyOf(observationBuffer, observationBuffer.length + 128);
}
observationBuffer[bufferPos] = amount;
bufferPos++;

bufferFilled.signalAll();
} finally {
appendLock.unlock();
while (bufferPos < expectedBufferSize) {
// Wait until all in-flight threads have added their observations to the buffer.
bufferFilled.await();
}
} finally {
appendLock.unlock();
}

buffer = observationBuffer;
bufferSize = bufferPos;
observationBuffer = new double[0];
bufferPos = 0;
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
runLock.unlock();
}

/**
* Must be called by the runnable in the run() method.
*/
void reset() {
reset = true;
}

<T extends DataPointSnapshot> T run(Function<Long, Boolean> complete, Supplier<T> createResult, Consumer<Double> observeFunction) {
double[] buffer;
int bufferSize;
T result;
try {
runLock.lock();

// Signal that the buffer is active.
Long expectedCount = observationCount.getAndAdd(bufferActiveBit);
try {
appendLock.lock();

while (!complete.apply(expectedCount)) {
// Wait until all in-flight threads have added their observations to the buffer.
bufferFilled.await();
}
result = createResult.get();

// Signal that the buffer is inactive.
int expectedBufferSize;
if (reset) {
expectedBufferSize = (int) ((observationCount.getAndSet(0) & ~bufferActiveBit) - expectedCount);
reset = false;
} else {
expectedBufferSize = (int) (observationCount.addAndGet(bufferActiveBit) - expectedCount);
}

while (bufferPos < expectedBufferSize) {
// Wait until all in-flight threads have added their observations to the buffer.
bufferFilled.await();
}
} finally {
appendLock.unlock();
}

buffer = observationBuffer;
bufferSize = bufferPos;
observationBuffer = new double[0];
bufferPos = 0;
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
runLock.unlock();
}

for (int i = 0; i < bufferSize; i++) {
observeFunction.accept(buffer[i]);
}
return result;
for (int i = 0; i < bufferSize; i++) {
observeFunction.accept(buffer[i]);
}
return result;
}
}

0 comments on commit 7278830

Please sign in to comment.