Skip to content

Commit

Permalink
chore: fix code format and make CI pass
Browse files Browse the repository at this point in the history
Signed-off-by: Li Zhanhui <lizhanhui@gmail.com>
  • Loading branch information
lizhanhui committed Oct 21, 2024
1 parent b5fb5b1 commit 6f91098
Showing 1 changed file with 16 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -71,15 +71,15 @@ public ConcurrentMap<String, ConcurrentMap<Channel, ClientChannelInfo>> getGroup
public ProducerTableInfo getProducerTable() {
Map<String, List<ProducerInfo>> map = new HashMap<>();
for (String group : this.groupChannelTable.keySet()) {
for (Entry<Channel, ClientChannelInfo> entry: this.groupChannelTable.get(group).entrySet()) {
for (Entry<Channel, ClientChannelInfo> entry : this.groupChannelTable.get(group).entrySet()) {
ClientChannelInfo clientChannelInfo = entry.getValue();
if (map.containsKey(group)) {
map.get(group).add(new ProducerInfo(
clientChannelInfo.getClientId(),
clientChannelInfo.getChannel().remoteAddress().toString(),
clientChannelInfo.getLanguage(),
clientChannelInfo.getVersion(),
clientChannelInfo.getLastUpdateTimestamp()
clientChannelInfo.getClientId(),
clientChannelInfo.getChannel().remoteAddress().toString(),
clientChannelInfo.getLanguage(),
clientChannelInfo.getVersion(),
clientChannelInfo.getLastUpdateTimestamp()
));
} else {
map.put(group, new ArrayList<>(Collections.singleton(new ProducerInfo(
Expand Down Expand Up @@ -118,8 +118,8 @@ public void scanNotActiveChannel() {
clientChannelTable.remove(info.getClientId());
}
log.warn(
"ProducerManager#scanNotActiveChannel: remove expired channel[{}] from ProducerManager groupChannelTable, producer group name: {}",
RemotingHelper.parseChannelRemoteAddr(info.getChannel()), group);
"ProducerManager#scanNotActiveChannel: remove expired channel[{}] from ProducerManager groupChannelTable, producer group name: {}",
RemotingHelper.parseChannelRemoteAddr(info.getChannel()), group);
callProducerChangeListener(ProducerGroupEvent.CLIENT_UNREGISTER, group, info);
RemotingHelper.closeChannel(info.getChannel());
}
Expand All @@ -144,8 +144,8 @@ public boolean doChannelCloseEvent(final String remoteAddr, final Channel channe
clientChannelTable.remove(clientChannelInfo.getClientId());
removed = true;
log.info(
"NETTY EVENT: remove channel[{}][{}] from ProducerManager groupChannelTable, producer group: {}",
clientChannelInfo.toString(), remoteAddr, group);
"NETTY EVENT: remove channel[{}][{}] from ProducerManager groupChannelTable, producer group: {}",
clientChannelInfo.toString(), remoteAddr, group);
callProducerChangeListener(ProducerGroupEvent.CLIENT_UNREGISTER, group, clientChannelInfo);
if (clientChannelInfoTable.isEmpty()) {
ConcurrentMap<Channel, ClientChannelInfo> oldGroupTable = this.groupChannelTable.remove(group);
Expand All @@ -169,21 +169,24 @@ public void registerProducer(final String group, final ClientChannelInfo clientC
channelTable = new ConcurrentHashMap<>();
// Make sure channelTable will NOT be cleaned by #scanNotActiveChannel
channelTable.put(clientChannelInfo.getChannel(), clientChannelInfo);
clientChannelInfo.setLastUpdateTimestamp(System.currentTimeMillis());

ConcurrentMap<Channel, ClientChannelInfo> prev = this.groupChannelTable.putIfAbsent(group, channelTable);
if (null != prev) {
if (null == prev) {
// Add client-id to channel mapping for new producer group
clientChannelTable.put(clientChannelInfo.getClientId(), clientChannelInfo.getChannel());
} else {
channelTable = prev;
}
}

clientChannelInfoFound = channelTable.get(clientChannelInfo.getChannel());
// Add client-channel info to existing producer group
if (null == clientChannelInfoFound) {
channelTable.put(clientChannelInfo.getChannel(), clientChannelInfo);
clientChannelTable.put(clientChannelInfo.getClientId(), clientChannelInfo.getChannel());
log.info("new producer connected, group: {} channel: {}", group, clientChannelInfo.toString());
}

// Refresh existing client-channel-info update-timestamp
if (clientChannelInfoFound != null) {
clientChannelInfoFound.setLastUpdateTimestamp(System.currentTimeMillis());
}
Expand Down

0 comments on commit 6f91098

Please sign in to comment.