Skip to content

Commit

Permalink
wip: implemented block and pending tx ws events - tests missing
Browse files Browse the repository at this point in the history
  • Loading branch information
ohager committed Jan 8, 2024
1 parent 9f94fb8 commit 61f8c68
Show file tree
Hide file tree
Showing 17 changed files with 317 additions and 69 deletions.
9 changes: 6 additions & 3 deletions src/brs/props/Props.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ public class Props {
public static final Prop<Integer> DECIMAL_PLACES = new Prop<>("node.decimalPlaces", 8);
public static final Prop<Integer> ONE_COIN_NQT = new Prop<>("node.coinFactor", 100_000_000);
public static final Prop<Integer> API_PORT = new Prop<>("API.Port", 8125);
public static final Prop<Integer> API_WEBSOCKET_PORT = new Prop<>("API.WebSocketPort", 8126);
public static final Prop<Boolean> API_WEBSOCKET_ENABLE = new Prop<>("API.WebSocketEnable", true);

public static final Prop<String> NETWORK_NAME = new Prop<>("node.networkName", Constants.SIGNUM_NETWORK_NAME);
public static final Prop<String> GENESIS_BLOCK_ID = new Prop<>("node.genesisBlockId", Convert.toUnsignedLong(Genesis.GENESIS_BLOCK_ID));
public static final Prop<Integer> GENESIS_TIMESTAMP = new Prop<>("node.genesisTimestamp", 0);
Expand All @@ -27,8 +30,8 @@ public class Props {
public static final Prop<Integer> BLOCK_REWARD_CYCLE_PERCENTAGE = new Prop<>("node.blockRewardCycle", 95);
public static final Prop<Integer> BLOCK_REWARD_LIMIT_HEIGHT = new Prop<>("node.blockLimitHeight", 972_000);
public static final Prop<Integer> BLOCK_REWARD_LIMIT_AMOUNT = new Prop<>("node.blockLimitAmount", 100);
public static final Prop<Integer> ALIAS_RENEWAL_FREQUENCY = new Prop<>("node.aliasRenewalSeconds", 7776000);

public static final Prop<Integer> ALIAS_RENEWAL_FREQUENCY = new Prop<>("node.aliasRenewalSeconds", 7776000);

// Transaction fee cash back options
public static final Prop<String> CASH_BACK_ID = new Prop<>("node.cashBackId", "8952122635653861124");
Expand Down Expand Up @@ -105,7 +108,7 @@ public class Props {

// DB options
public static final Prop<Boolean> DB_SKIP_CHECK = new Prop<>("DB.SkipCheck", false);
public static final Prop<String> DB_URL = new Prop<>("DB.Url", "jdbc:h2:file:./db/signum;DB_CLOSE_ON_EXIT=FALSE");
public static final Prop<String> DB_URL = new Prop<>("DB.Url", "jdbc:h2:file:./db/signum-v2;DB_CLOSE_ON_EXIT=FALSE");
public static final Prop<String> DB_USERNAME = new Prop<>("DB.Username", "");
public static final Prop<String> DB_PASSWORD = new Prop<>("DB.Password", "");
public static final Prop<Integer> DB_CONNECTIONS = new Prop<>("DB.Connections", 30);
Expand Down
2 changes: 1 addition & 1 deletion src/brs/util/ThreadPool.java
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ public synchronized void start(int timeMultiplier) {
scheduledThreadPool.scheduleWithFixedDelay(toRun, 0, Math.max(entry.getValue() / timeMultiplier, 1), TimeUnit.MILLISECONDS);
}
backgroundJobs.clear();

// Starting multicore-Threads:
for (Map.Entry<Runnable,Long> entry : backgroundJobsCores.entrySet()) {
for (int i=0; i < cores; i++)
Expand Down
99 changes: 99 additions & 0 deletions src/brs/web/api/ws/BlockchainEventNotifier.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package brs.web.api.ws;

import brs.*;
import brs.web.api.ws.handler.BlockGeneratedEventHandler;
import brs.web.api.ws.handler.PendingTransactionsAddedEventHandler;
import brs.web.api.ws.handler.TestEventHandler;
import brs.web.server.WebServerContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.*;

public class BlockchainEventNotifier {

private final static int SHUTDOWN_TIMEOUT_SECS = 5;
private final static int IO_THREAD_COUNT = 10;
private final ExecutorService executor;
private Logger logger = LoggerFactory.getLogger(BlockchainEventNotifier.class);
private static BlockchainEventNotifier instance;
private final WebServerContext context;

private ConcurrentHashMap<String, WebSocketConnection> connections = new ConcurrentHashMap<>();

public static BlockchainEventNotifier getInstance(WebServerContext context) {
if (BlockchainEventNotifier.instance == null) {
BlockchainEventNotifier.instance = new BlockchainEventNotifier(context);
}
return BlockchainEventNotifier.instance;
}

private BlockchainEventNotifier(WebServerContext context) {
this.context = context;
this.executor = Executors.newFixedThreadPool(IO_THREAD_COUNT);
this.context.getBlockchainProcessor().addListener(this::onNewBlockEvent, BlockchainProcessor.Event.BLOCK_GENERATED);
this.context.getTransactionProcessor().addListener(this::onPendingTransactionEvent, TransactionProcessor.Event.ADDED_UNCONFIRMED_TRANSACTIONS);

Timer timer = new Timer();
timer.scheduleAtFixedRate(wrap(this::onTestEvent), 0, 10_000);
}

private static TimerTask wrap(Runnable r) {
return new TimerTask() {

@Override
public void run() {
r.run();
}
};
}

public void addConnection(WebSocketConnection connection) {
connections.put(connection.getId(), connection);
}

public void removeConnection(WebSocketConnection connection) {
connections.remove(connection.getId());
}

public void onTestEvent() {
this.executor.submit(() -> {
connections.values().forEach(connection -> new TestEventHandler(connection).notify("some test data"));
});
}

public void onNewBlockEvent(Block block) {
this.executor.submit(() -> {
connections.values().forEach(connection -> new BlockGeneratedEventHandler(connection).notify(block));
});
}

private void onPendingTransactionEvent(List<? extends Transaction> transactions) {
this.executor.submit(() -> {
connections.values().forEach(connection -> new PendingTransactionsAddedEventHandler(connection).notify(transactions));
});
}

public void shutdown() {
if (!executor.isTerminated()) {
logger.info("Closing {} websocket connection(s)...", connections.size());
executor.submit(() -> {
connections.values().forEach(WebSocketConnection::close);
});
executor.shutdown();
try {
executor.awaitTermination(SHUTDOWN_TIMEOUT_SECS, TimeUnit.SECONDS); // should be able to close all connections within 5 seconds
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
if (!executor.isTerminated()) {
logger.warn("Some threads didn't terminate, forcing shutdown");
executor.shutdownNow();
}
}
}

}
39 changes: 39 additions & 0 deletions src/brs/web/api/ws/WebSocketConnection.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package brs.web.api.ws;

import org.eclipse.jetty.websocket.api.RemoteEndpoint;
import org.eclipse.jetty.websocket.api.Session;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;

public class WebSocketConnection {
private static final Logger logger = LoggerFactory.getLogger(WebSocketConnection.class);

private final Session session;

public WebSocketConnection(Session session) {
this.session = session;
}

public String getId() {
return session.getRemoteAddress().toString();
}

public Session getSession() {
return session;
}

public void sendMessage(String message) {
RemoteEndpoint remote = this.getSession().getRemote();
try {
remote.sendString(message);
} catch (IOException e) {
logger.debug("Error sending message to {}: {}", remote.getRemoteAddress().toString(), e.getMessage());
}
}

public void close() {
this.getSession().close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,63 +13,39 @@

package brs.web.api.ws;

import java.util.*;
import java.util.concurrent.CountDownLatch;

import brs.Block;
import brs.BlockchainProcessor;
import brs.web.server.WebServerContext;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.StatusCode;
import org.eclipse.jetty.websocket.api.WebSocketAdapter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


// https://itnext.io/writing-a-web-socket-server-with-embedded-jetty-46fe9ab1c435 -- check this.
public class EventHandler extends WebSocketAdapter {
private static final Logger logger = LoggerFactory.getLogger(EventHandler.class);
private final CountDownLatch closureLatch = new CountDownLatch(1);
private final WebServerContext context;
public class WebSocketConnectionAdapter extends WebSocketAdapter {
private static final Logger logger = LoggerFactory.getLogger(WebSocketConnectionAdapter.class);

private Map<String, Session> connections = Collections.synchronizedMap(new HashMap<>());
private WebSocketConnection connection;
private final WebServerContext context;
private final BlockchainEventNotifier notifier;

public EventHandler(WebServerContext context) {
public WebSocketConnectionAdapter(WebServerContext context) {
this.context = context;
this.notifier = BlockchainEventNotifier.getInstance(context);
}


@Override
public void onWebSocketConnect(Session sess) {
super.onWebSocketConnect(sess);
logger.debug("Endpoint connected: {}", sess);
connections.put(sess.toString(), sess);

// add listeners to blockchain and transaction processor
// must be thread safe
// context.getBlockchainProcessor().addListener(EventHandler::notify, BlockchainProcessor.Event.BLOCK_GENERATED)
this.connection = new WebSocketConnection(sess);
this.notifier.addConnection(connection);
}

@Override
public void onWebSocketText(String message) {
super.onWebSocketText(message);
logger.debug("Received TEXT message: {}", message);
try {
this.getRemote().sendString("Echo: " + message);
} catch (Exception e) {
logger.warn("Failed to send WS message", e);
}
if (message.toLowerCase(Locale.US).contains("bye")) {
getSession().close(StatusCode.NORMAL, "Thanks");
}
}


@Override
public void onWebSocketClose(int statusCode, String reason) {
super.onWebSocketClose(statusCode, reason);
logger.debug("Socket Closed: [{}] {}", statusCode, reason);
closureLatch.countDown();
this.notifier.removeConnection(this.connection);
}

@Override
Expand All @@ -78,9 +54,4 @@ public void onWebSocketError(Throwable cause) {
cause.printStackTrace(System.err);
}


public void awaitClosure() throws InterruptedException {
logger.debug("Awaiting closure from remote");
closureLatch.await();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,16 @@
import org.eclipse.jetty.websocket.server.JettyServerUpgradeResponse;
import org.eclipse.jetty.websocket.server.JettyWebSocketCreator;

public class EventHandlerCreator implements JettyWebSocketCreator {
public class WebSocketConnectionCreator implements JettyWebSocketCreator {

private final WebServerContext context;

public EventHandlerCreator(WebServerContext context) {
public WebSocketConnectionCreator(WebServerContext context) {
this.context = context;
}

@Override
public Object createWebSocket(JettyServerUpgradeRequest jettyServerUpgradeRequest, JettyServerUpgradeResponse jettyServerUpgradeResponse) {
return new EventHandler(this.context);
return new WebSocketConnectionAdapter(this.context);
}
}
22 changes: 22 additions & 0 deletions src/brs/web/api/ws/common/BaseWebSocketResponse.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package brs.web.api.ws.common;

public abstract class BaseWebSocketResponse<T> {

private final String eventName;
private final T payload;

public BaseWebSocketResponse(String eventName, T payload) {
this.eventName = eventName;
this.payload = payload;
}

public abstract String toString();

public T getPayload() {
return payload;
}

public String getEventName() {
return eventName;
}
}
15 changes: 15 additions & 0 deletions src/brs/web/api/ws/common/JSONWebSocketResponse.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package brs.web.api.ws.common;

import com.google.gson.Gson;

public class JSONWebSocketResponse<T> extends BaseWebSocketResponse<T> {
public JSONWebSocketResponse(String eventName, T payload) {
super(eventName, payload);
}

@Override
public String toString() {
Gson gson = new Gson();
return gson.toJson(this);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package brs.web.api.ws.handler;

import brs.web.api.ws.WebSocketConnection;

public abstract class AbstractWebSocketOutgoingEventHandlerImpl<T> implements WebSocketOutgoingEventHandler<T> {

private final WebSocketConnection connection;

protected AbstractWebSocketOutgoingEventHandlerImpl(WebSocketConnection connection) {
this.connection = connection;
}

@Override
public abstract void notify(T t);

public WebSocketConnection getConnection() {
return connection;
}
}
13 changes: 0 additions & 13 deletions src/brs/web/api/ws/handler/BlockEventHandler.java

This file was deleted.

32 changes: 32 additions & 0 deletions src/brs/web/api/ws/handler/BlockGeneratedEventHandler.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package brs.web.api.ws.handler;

import brs.Block;
import brs.Transaction;
import brs.web.api.ws.WebSocketConnection;
import brs.web.api.ws.common.JSONWebSocketResponse;

public class BlockGeneratedEventHandler extends AbstractWebSocketOutgoingEventHandlerImpl<Block> {
public BlockGeneratedEventHandler(WebSocketConnection connection) {
super(connection);
}

@Override
public void notify(Block block) {
JSONWebSocketResponse<GeneratedBlockPayload> response = new JSONWebSocketResponse<>("BlockGenerated", new GeneratedBlockPayload(block));
this.getConnection().sendMessage(response.toString());
}

private class GeneratedBlockPayload {
private final String blockId;
private final int height;
private final int timestamp;
private final String[] transactionIds;

public GeneratedBlockPayload(Block block) {
this.blockId = block.getStringId();
this.height = block.getHeight();
this.timestamp = block.getTimestamp();
this.transactionIds = block.getTransactions().stream().map(Transaction::getStringId).toArray(String[]::new);
}
}
}
Loading

0 comments on commit 61f8c68

Please sign in to comment.