Skip to content

Commit

Permalink
Group metadata manager & coordinator (#23)
Browse files Browse the repository at this point in the history
Master Issue: #4 

*Motivation*

Support Kafka group coordinator protocols.
  • Loading branch information
sijie authored Aug 4, 2019
1 parent 511679d commit 2ea9f8b
Show file tree
Hide file tree
Showing 12 changed files with 2,205 additions and 13 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/**
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.streamnative.kop.coordinator.group;

import io.streamnative.kop.utils.delayed.DelayedOperation;
import java.util.Optional;

/**
* Delayed heartbeat operations that are added to the purgatory for session timeout checking.
* Heartbeats are paused during rebalance.
*/
class DelayedHeartbeat extends DelayedOperation {

private final GroupCoordinator coordinator;
private final GroupMetadata group;
private final MemberMetadata member;
private long heartbeatDeadline;

DelayedHeartbeat(GroupCoordinator coordinator,
GroupMetadata group,
MemberMetadata member,
long heartbeatDeadline,
long sessionTimeout) {
super(sessionTimeout, Optional.of(group.lock()));

this.coordinator = coordinator;
this.group = group;
this.member = member;
this.heartbeatDeadline = heartbeatDeadline;
}

@Override
public void onExpiration() {
coordinator.onExpireHeartbeat(group, member, heartbeatDeadline);
}

@Override
public void onComplete() {
coordinator.onCompleteHeartbeat();
}

@Override
public boolean tryComplete() {
return coordinator.tryCompleteHeartbeat(group, member, heartbeatDeadline, () -> forceComplete());
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/**
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.streamnative.kop.coordinator.group;

import io.streamnative.kop.utils.delayed.DelayedOperation;
import java.util.Optional;

/**
* Delayed rebalance operations that are added to the purgatory when group is preparing for rebalance.
*
* <p>Whenever a join-group request is received, check if all known group members have requested
* to re-join the group; if yes, complete this operation to proceed rebalance.
*
* <p>When the operation has expired, any known members that have not requested to re-join
* the group are marked as failed, and complete this operation to proceed rebalance with
* the rest of the group.
*/
class DelayedJoin extends DelayedOperation {

final GroupCoordinator coordinator;
final GroupMetadata group;

protected DelayedJoin(GroupCoordinator coordinator,
GroupMetadata group,
long rebalanceTimeout) {
super(rebalanceTimeout, Optional.of(group.lock()));
this.coordinator = coordinator;
this.group = group;
}

@Override
public void onExpiration() {
coordinator.onExpireJoin();
}

@Override
public void onComplete() {
coordinator.onCompleteJoin(group);
}

@Override
public boolean tryComplete() {
return coordinator.tryCompleteJoin(group, () -> forceComplete());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,15 @@
package io.streamnative.kop.coordinator.group;

import lombok.Data;
import lombok.Getter;
import lombok.experimental.Accessors;

/**
* Group configuration.
*/
@Data
@Accessors(fluent = true)
@Getter
public class GroupConfig {

private final int groupMinSessionTimeoutMs;
Expand Down
Loading

0 comments on commit 2ea9f8b

Please sign in to comment.