Skip to content

Commit

Permalink
chore: refactoring and cleanups
Browse files Browse the repository at this point in the history
  • Loading branch information
ohager committed Jan 10, 2024
1 parent 6d4c97d commit 8340754
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 93 deletions.
52 changes: 17 additions & 35 deletions src/brs/web/api/ws/BlockchainEventNotifier.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,9 @@

import brs.*;
import brs.props.Props;
import brs.web.api.ws.common.Debouncer2;
import brs.web.api.ws.common.Debouncer;
import brs.web.api.ws.common.SimpleScheduler;
import brs.web.api.ws.emitter.data.ConnectedEventData;
import brs.web.api.ws.common.Debouncer;
import brs.web.api.ws.emitter.*;
import brs.web.server.WebServerContext;
import org.slf4j.Logger;
Expand All @@ -17,16 +16,15 @@
public class BlockchainEventNotifier {

private final static int SHUTDOWN_TIMEOUT_SECS = 5;
private final static int HEARTBEAT_INTERVAL_SECS = 10; // make this configurable
private final static int BLOCK_PUSHED_DEBOUNCE_SECS = 1;
private final static int IO_THREAD_COUNT = 10;
private static BlockchainEventNotifier instance;
private final ExecutorService notifyExecutor;
// private final Debouncer blockPushedDebouncer;
// private final Debouncer blockPushedDebouncer;
private final Logger logger = LoggerFactory.getLogger(BlockchainEventNotifier.class);
private final WebServerContext context;
private final ConcurrentHashMap<String, WebSocketConnection> connections = new ConcurrentHashMap<>();
private final Debouncer2 blockPushedDebouncer2;
private final Debouncer blockPushedDebouncer = new Debouncer(BLOCK_PUSHED_DEBOUNCE_SECS * 1000);
private SimpleScheduler heartbeat;

public static BlockchainEventNotifier getInstance(WebServerContext context) {
Expand All @@ -42,22 +40,20 @@ private BlockchainEventNotifier(WebServerContext context) {
this.context.getBlockchainProcessor().addListener(this::onBlockGeneratedEvent, BlockchainProcessor.Event.BLOCK_GENERATED);
this.context.getBlockchainProcessor().addListener(this::onBlockPushedEvent, BlockchainProcessor.Event.BLOCK_PUSHED);
this.context.getTransactionProcessor().addListener(this::onPendingTransactionEvent, TransactionProcessor.Event.ADDED_UNCONFIRMED_TRANSACTIONS);
// this.blockPushedDebouncer = new Debouncer(BLOCK_PUSHED_DEBOUNCE_SECS * 1000);
this.blockPushedDebouncer2 = new Debouncer2();
initializeHeartBeat();
}


private void withActiveConnectionsOnly(Runnable fn) {
if (!this.connections.isEmpty()) {
if (!connections.isEmpty()) {
fn.run();
}
}

public void addConnection(WebSocketConnection connection) {
connections.put(connection.getId(), connection);

this.notifyExecutor.submit(() -> {
notifyExecutor.submit(() -> {
ConnectedEventData data = new ConnectedEventData();
data.version = Burst.VERSION.toString();
data.networkName = context.getPropertyService().getString(Props.NETWORK_NAME);
Expand Down Expand Up @@ -87,47 +83,33 @@ private void initializeHeartBeat() {
);
}


public void removeConnection(WebSocketConnection connection) {
connections.remove(connection.getId());
}

public void onBlockGeneratedEvent(Block block) {
withActiveConnectionsOnly(() -> {
notifyExecutor.submit(() -> {
connections.values().forEach(connection -> new BlockGeneratedEventEmitter(connection).emit(block));
});
});
withActiveConnectionsOnly(() -> notifyExecutor.submit(() -> {
connections.values().forEach(connection -> new BlockGeneratedEventEmitter(connection).emit(block));
}));
}

public void onBlockPushedEvent(Block block) {
withActiveConnectionsOnly(() -> {
blockPushedDebouncer2.debounce(
BlockchainEventNotifier.class,
() -> {
notifyExecutor.submit(() -> {
int currentHeight = context.getBlockchainProcessor().getLastBlockchainFeederHeight();
connections.values().forEach(connection -> new BlockPushedEventEmitter(connection, currentHeight).emit(block));
});
},
BLOCK_PUSHED_DEBOUNCE_SECS, TimeUnit.SECONDS);
});
withActiveConnectionsOnly(() -> blockPushedDebouncer.debounce(() -> notifyExecutor.submit(() -> {
int currentHeight = context.getBlockchainProcessor().getLastBlockchainFeederHeight();
connections.values().forEach(connection -> new BlockPushedEventEmitter(connection, currentHeight).emit(block));
})));
}

private void onPendingTransactionEvent(List<? extends Transaction> transactions) {
withActiveConnectionsOnly(() -> {
notifyExecutor.submit(() -> {
connections.values().forEach(connection -> new PendingTransactionsAddedEventEmitter(connection).emit(transactions));
});
});
withActiveConnectionsOnly(() -> notifyExecutor.submit(() -> {
connections.values().forEach(connection -> new PendingTransactionsAddedEventEmitter(connection).emit(transactions));
}));
}

private void shutdownNotifyExecutor() {
if (!notifyExecutor.isTerminated()) {
logger.info("Closing {} websocket connection(s)...", connections.size());
notifyExecutor.submit(() -> {
connections.values().forEach(WebSocketConnection::close);
});
notifyExecutor.submit(() -> connections.values().forEach(WebSocketConnection::close));
notifyExecutor.shutdown();
try {
if (!notifyExecutor.awaitTermination(SHUTDOWN_TIMEOUT_SECS, TimeUnit.SECONDS)) {
Expand All @@ -146,7 +128,7 @@ public void shutdown() {
if (heartbeat != null) {
heartbeat.shutdown();
}
blockPushedDebouncer2.shutdown(SHUTDOWN_TIMEOUT_SECS);
blockPushedDebouncer.shutdown();
shutdownNotifyExecutor();
} catch (Exception e) {
logger.warn("Graceful WebSocket shutdown not successful: {}", e.getMessage());
Expand Down
3 changes: 1 addition & 2 deletions src/brs/web/api/ws/common/Debouncer.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package brs.web.api.ws.common;

import brs.web.api.ws.WebSocketConnection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -13,7 +12,7 @@ public class Debouncer {
private static final Logger logger = LoggerFactory.getLogger(Debouncer.class);

private final int delay;
private Timer timer;
private final Timer timer;
private final AtomicBoolean isRunning = new AtomicBoolean(false);

public Debouncer(int delayMillies) {
Expand Down
53 changes: 0 additions & 53 deletions src/brs/web/api/ws/common/Debouncer2.java

This file was deleted.

11 changes: 8 additions & 3 deletions src/brs/web/server/ServerConnectorWebsocketBuilderImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,17 @@
import brs.props.PropertyService;
import brs.props.Props;
import brs.web.api.ws.WebSocketConnectionCreator;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.server.*;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.eclipse.jetty.websocket.server.config.JettyWebSocketServletContainerInitializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class ServerConnectorWebsocketBuilderImpl implements ServerConnectorBuilder {

Expand All @@ -24,8 +27,9 @@ public ServerConnectorWebsocketBuilderImpl(WebServerContext context, ServletCont
}
@Override
public ServerConnector build(Server server) {
ServerConnector connector = new ServerConnector(server);
PropertyService propertyService = context.getPropertyService();
// TODO: WSS Support
ServerConnector connector = new ServerConnector(server);
connector.setHost(propertyService.getString(Props.API_LISTEN));
connector.setPort(propertyService.getInt(Props.API_WEBSOCKET_PORT));
connector.setReuseAddress(true);
Expand All @@ -37,4 +41,5 @@ public ServerConnector build(Server server) {
});
return connector;
}

}

0 comments on commit 8340754

Please sign in to comment.