Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IGNITE-22977 #4227

Open
wants to merge 19 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,13 @@ public CompletableFuture<ReplicaMeta> getPrimaryReplica(ReplicationGroupId repli
return awaitPrimaryReplica(replicationGroupId, timestamp, 0, TimeUnit.MILLISECONDS);
}

@Override
public ReplicaMeta getCurrentPrimaryReplica(ReplicationGroupId replicationGroupId, HybridTimestamp timestamp) {
TablePartitionId id = (TablePartitionId) replicationGroupId;

return primaryReplicas.get(id.partitionId());
}

@Override
public CompletableFuture<Void> previousPrimaryExpired(ReplicationGroupId grpId) {
return nullCompletedFuture();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.ignite.internal.hlc;

import static java.lang.Math.max;
import static java.time.Clock.systemUTC;
import static org.apache.ignite.internal.hlc.HybridTimestamp.LOGICAL_TIME_BITS_SIZE;
import static org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestamp;

Expand All @@ -29,6 +28,7 @@
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.tostring.S;
import org.apache.ignite.internal.util.FastTimestamps;

/**
* A Hybrid Logical Clock implementation.
Expand Down Expand Up @@ -61,7 +61,7 @@ public HybridClockImpl() {
}

private static long currentTime() {
return systemUTC().instant().toEpochMilli() << LOGICAL_TIME_BITS_SIZE;
return FastTimestamps.coarseCurrentTimeMillis() << LOGICAL_TIME_BITS_SIZE;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,12 @@ public final class IgniteSystemProperties {
/** Name of the property controlling whether, when a thread assertion is triggered, it should also be written to the log. */
public static final String THREAD_ASSERTIONS_LOG_BEFORE_THROWING = "THREAD_ASSERTIONS_LOG_BEFORE_THROWING";

/** Skip replication in a benchmark. */
public static final String IGNITE_SKIP_REPLICATION_IN_BENCHMARK = "IGNITE_SKIP_REPLICATION_IN_BENCHMARK";

/** Skip storage update in a benchmark. */
public static final String IGNITE_SKIP_STORAGE_UPDATE_IN_BENCHMARK = "IGNITE_SKIP_STORAGE_UPDATE_IN_BENCHMARK";

/**
* Enforces singleton.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@
public class FastTimestamps {
private static volatile long coarseCurrentTimeMillis = System.currentTimeMillis();

private static final long UPDATE_FREQUENCY_MS = 10;
/** Note: don't change this value, because it's crucial for a timestamp generation. */
private static final long UPDATE_FREQUENCY_MS = 1;
rpuch marked this conversation as resolved.
Show resolved Hide resolved

static {
startUpdater();
Expand All @@ -46,7 +47,7 @@ public void run() {
};

updater.setDaemon(true);
updater.setPriority(10);
updater.setPriority(Thread.MAX_PRIORITY);
updater.start();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,12 @@
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.mockStatic;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import java.time.Clock;
import java.time.Instant;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
import org.apache.ignite.internal.util.FastTimestamps;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
Expand All @@ -46,7 +43,7 @@ class HybridClockTest extends BaseIgniteAbstractTest {
/**
* Mock of a system clock.
*/
private static MockedStatic<Clock> clockMock;
private static MockedStatic<FastTimestamps> clockMock;

@Mock
private ClockUpdateListener updateListener;
Expand All @@ -61,7 +58,7 @@ public void afterEach() {
*/
@Test
public void testNow() {
clockMock = mockToEpochMilli(100);
clockMock = mockCurrentTimestamp(100);

HybridClock clock = new HybridClockImpl();

Expand All @@ -79,7 +76,7 @@ public void testNow() {
*/
@Test
public void testTick() {
clockMock = mockToEpochMilli(100);
clockMock = mockCurrentTimestamp(100);

HybridClock clock = new HybridClockImpl();

Expand Down Expand Up @@ -108,7 +105,7 @@ public void testTick() {
private void assertTimestampEquals(long sysTime, HybridTimestamp expTs, Supplier<HybridTimestamp> clo) {
closeClockMock();

clockMock = mockToEpochMilli(sysTime);
clockMock = mockCurrentTimestamp(sysTime);

assertEquals(expTs, clo.get());
}
Expand Down Expand Up @@ -167,12 +164,10 @@ void updateListenerIsNotUpdatedAfterRemoval() {
verify(updateListener, never()).onUpdate(anyLong());
}

private static MockedStatic<Clock> mockToEpochMilli(long expected) {
Clock spyClock = spy(Clock.class);
MockedStatic<Clock> clockMock = mockStatic(Clock.class);
private static MockedStatic<FastTimestamps> mockCurrentTimestamp(long expected) {
MockedStatic<FastTimestamps> clockMock = mockStatic(FastTimestamps.class);

clockMock.when(Clock::systemUTC).thenReturn(spyClock);
when(spyClock.instant()).thenReturn(Instant.ofEpochMilli(expected));
clockMock.when(FastTimestamps::coarseCurrentTimeMillis).thenReturn(expected);

return clockMock;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ private void onReceiveIndexNetworkMessage(NetworkMessage message, ClusterNode se
}

/**
* Returns {@code true} iff the requested catalog version is active and all RW transactions started on versions strictly before that
* Returns {@code true} if the requested catalog version is active and all RW transactions started on versions strictly before that
* version have finished on the node.
*
* @param catalogVersion Catalog version of interest.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEventParameters;
import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.internal.replicator.TablePartitionId;
import org.jetbrains.annotations.Nullable;

/** Implementation for tests. */
class TestPlacementDriver extends AbstractEventProducer<PrimaryReplicaEvent, PrimaryReplicaEventParameters> implements PlacementDriver {
Expand All @@ -53,6 +54,11 @@ public CompletableFuture<ReplicaMeta> getPrimaryReplica(ReplicationGroupId repli
return primaryReplicaMetaFutureById.get(replicationGroupId);
}

@Override
public @Nullable ReplicaMeta getCurrentPrimaryReplica(ReplicationGroupId replicationGroupId, HybridTimestamp timestamp) {
return primaryReplicaMetaFutureById.get(replicationGroupId).join();
}

@Override
public CompletableFuture<Void> previousPrimaryExpired(ReplicationGroupId grpId) {
throw new UnsupportedOperationException();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ void testStartAndEndCheckpoint() throws Exception {

assertThat(tracker.checkpointStartTime(), equalTo(checkpointStartTime));

assertThat(tracker.totalDuration(), greaterThanOrEqualTo(10L));
assertThat(tracker.totalDuration(), greaterThanOrEqualTo(1L));
}

@Test
Expand Down Expand Up @@ -97,7 +97,7 @@ void testSplitAndSortCheckpointPages() throws Exception {

tracker.onSplitAndSortCheckpointPagesEnd();

assertThat(tracker.splitAndSortCheckpointPagesDuration(), greaterThanOrEqualTo(10L));
assertThat(tracker.splitAndSortCheckpointPagesDuration(), greaterThanOrEqualTo(1L));
}

@Test
Expand All @@ -114,7 +114,7 @@ void testFsync() throws Exception {

tracker.onCheckpointEnd();

assertThat(tracker.fsyncDuration(), greaterThanOrEqualTo(10L));
assertThat(tracker.fsyncDuration(), greaterThanOrEqualTo(1L));
}

@Test
Expand All @@ -131,7 +131,7 @@ void testPagesWrite() throws Exception {

tracker.onFsyncStart();

assertThat(tracker.pagesWriteDuration(), greaterThanOrEqualTo(10L));
assertThat(tracker.pagesWriteDuration(), greaterThanOrEqualTo(1L));
}

@Test
Expand All @@ -148,7 +148,7 @@ void testOnMarkCheckpointBegin() throws Exception {

tracker.onMarkCheckpointBeginEnd();

assertThat(tracker.onMarkCheckpointBeginDuration(), greaterThanOrEqualTo(10L));
assertThat(tracker.onMarkCheckpointBeginDuration(), greaterThanOrEqualTo(1L));
}

@Test
Expand All @@ -165,7 +165,7 @@ void testWriteLock() throws Exception {

long beforeWriteLockDuration = tracker.beforeWriteLockDuration();

assertThat(beforeWriteLockDuration, greaterThanOrEqualTo(10L));
assertThat(beforeWriteLockDuration, greaterThanOrEqualTo(1L));
assertThat(tracker.writeLockWaitDuration(), lessThan(0L));
assertThat(tracker.writeLockHoldDuration(), equalTo(0L));

Expand All @@ -176,7 +176,7 @@ void testWriteLock() throws Exception {
long writeLockWaitDuration = tracker.writeLockWaitDuration();

assertThat(tracker.beforeWriteLockDuration(), equalTo(beforeWriteLockDuration));
assertThat(writeLockWaitDuration, greaterThanOrEqualTo(10L));
assertThat(writeLockWaitDuration, greaterThanOrEqualTo(1L));
assertThat(tracker.writeLockHoldDuration(), lessThan(0L));

waitForChangeCoarseCurrentTimeMillis();
Expand All @@ -185,7 +185,7 @@ void testWriteLock() throws Exception {

assertThat(tracker.beforeWriteLockDuration(), equalTo(beforeWriteLockDuration));
assertThat(tracker.writeLockWaitDuration(), equalTo(writeLockWaitDuration));
assertThat(tracker.writeLockHoldDuration(), greaterThanOrEqualTo(10L));
assertThat(tracker.writeLockHoldDuration(), greaterThanOrEqualTo(1L));
}

private void waitForChangeCoarseCurrentTimeMillis() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,12 @@ public CompletableFuture<ReplicaMeta> getPrimaryReplica(ReplicationGroupId repli
return getPrimaryReplicaMeta();
}

@Override
public ReplicaMeta getCurrentPrimaryReplica(ReplicationGroupId replicationGroupId,
HybridTimestamp timestamp) {
return getPrimaryReplicaMeta().join();
}

@Override
public CompletableFuture<Void> previousPrimaryExpired(ReplicationGroupId grpId) {
return nullCompletedFuture();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEvent;
import org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEventParameters;
import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.jetbrains.annotations.Nullable;

/**
* Service that provides an ability to await and retrieve primary replicas for replication groups.
Expand Down Expand Up @@ -70,6 +71,15 @@ CompletableFuture<ReplicaMeta> awaitPrimaryReplica(
*/
CompletableFuture<ReplicaMeta> getPrimaryReplica(ReplicationGroupId replicationGroupId, HybridTimestamp timestamp);

/**
* Returns the current primary replica without waiting.
*
* @param replicationGroupId Replication group id.
* @param timestamp The timestamp.
* @return Metadata information or null if not available.
*/
@Nullable ReplicaMeta getCurrentPrimaryReplica(ReplicationGroupId replicationGroupId, HybridTimestamp timestamp);

/**
* Returns a future that completes when all expiration event {@link PrimaryReplicaEvent#PRIMARY_REPLICA_EXPIRED} listeners of previous
* primary complete.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEventParameters;
import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.network.ClusterNode;
import org.jetbrains.annotations.Nullable;
import org.jetbrains.annotations.TestOnly;

/**
Expand Down Expand Up @@ -75,6 +76,11 @@ public CompletableFuture<ReplicaMeta> getPrimaryReplica(ReplicationGroupId repli
return getReplicaMetaFuture();
}

@Override
public @Nullable ReplicaMeta getCurrentPrimaryReplica(ReplicationGroupId replicationGroupId, HybridTimestamp timestamp) {
return getReplicaMetaFuture().join();
}

@Override
public CompletableFuture<Void> previousPrimaryExpired(ReplicationGroupId grpId) {
return nullCompletedFuture();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,11 @@ public CompletableFuture<ReplicaMeta> getPrimaryReplica(ReplicationGroupId repli
return leaseTracker.getPrimaryReplica(replicationGroupId, timestamp);
}

@Override
public ReplicaMeta getCurrentPrimaryReplica(ReplicationGroupId replicationGroupId, HybridTimestamp timestamp) {
return leaseTracker.getCurrentPrimaryReplica(replicationGroupId, timestamp);
}

@Override
public CompletableFuture<Void> previousPrimaryExpired(ReplicationGroupId replicationGroupId) {
return leaseTracker.previousPrimaryExpired(replicationGroupId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,17 @@ public CompletableFuture<ReplicaMeta> getPrimaryReplica(ReplicationGroupId repli
});
}

@Override
public @Nullable ReplicaMeta getCurrentPrimaryReplica(ReplicationGroupId replicationGroupId, HybridTimestamp timestamp) {
Lease lease = getLease(replicationGroupId);

if (lease.isAccepted() && clockService.after(lease.getExpirationTime(), timestamp)) {
return lease;
}

return null;
}

/**
* Helper method that checks whether tracker for given groupId is present in {@code primaryReplicaWaiters} map, whether it's empty
* and removes it if it's true.
Expand Down
Loading