Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AKI-608 Remove EventMgr in the AionPendingState #1102

Merged
merged 5 commits into from
Jan 22, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import static org.aion.util.conversions.Hex.toHexString;

import java.util.EnumMap;
import org.aion.zero.impl.blockchain.AionHub.BestBlockImportCallback;
import org.aion.zero.impl.blockchain.AionHub.SelfNodeStatusCallback;
import org.aion.zero.impl.core.IDifficultyCalculator;
import static org.aion.zero.impl.core.ImportResult.EXIST;
Expand Down Expand Up @@ -197,6 +198,7 @@ public class AionBlockchainImpl implements IAionBlockchain {
.synchronizedMap(new LRUMap<>(64));

private SelfNodeStatusCallback callback;
private BestBlockImportCallback bestBlockCallback;

public AionBlockchainImpl(CfgAion cfgAion, boolean forTest) {
this(generateBCConfig(cfgAion), AionRepositoryImpl.inst(),
Expand Down Expand Up @@ -1045,26 +1047,26 @@ && getBlockStore().isBlockStored(block.getHash(), block.getNumber())) {
if (callback != null) {
callback.updateBlockStatus(block.getNumber(), block.getHash().clone(), block.getTotalDifficulty());
}

if (bestBlockCallback != null) {
long t1 = System.currentTimeMillis();

bestBlockCallback.applyBlockUpdate(block, summary.getReceipts());

AionLoggerFactory.getLogger(LogEnum.TX.toString())
.debug("Pending state update took {} ms", System.currentTimeMillis() - t1);
}
}

// fire block events
if (ret.isSuccessful()) {
if (this.evtMgr != null) {

List<IEvent> evts = new ArrayList<>();
IEvent evtOnBlock = new EventBlock(EventBlock.CALLBACK.ONBLOCK0);
evtOnBlock.setFuncArgs(Collections.singletonList(summary));
evts.add(evtOnBlock);

IEvent evtTrace = new EventBlock(EventBlock.CALLBACK.ONTRACE0);
String str = String.format("Block chain size: [ %d ]", this.getSizeInternal());
evtTrace.setFuncArgs(Collections.singletonList(str));
evts.add(evtTrace);

if (ret == IMPORTED_BEST) {
if (LOG.isTraceEnabled()) {
LOG.trace("IMPORTED_BEST");
}
IEvent evtOnBest = new EventBlock(EventBlock.CALLBACK.ONBEST0);
evtOnBest.setFuncArgs(Arrays.asList(block, summary.getReceipts()));
evts.add(evtOnBest);
Expand Down Expand Up @@ -2646,4 +2648,8 @@ public Block getBestBlockWithInfo() {
void setNodeStatusCallback(SelfNodeStatusCallback callback) {
this.callback = callback;
}

void setBestBlockImportCallback(BestBlockImportCallback callback) {
bestBlockCallback = callback;
}
}
37 changes: 30 additions & 7 deletions modAionImpl/src/org/aion/zero/impl/blockchain/AionHub.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import java.util.ServiceLoader;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantLock;
import org.aion.base.AionTxReceipt;
import org.aion.base.ConstantUtil;
import org.aion.evtmgr.EventMgrModule;
import org.aion.evtmgr.IEvent;
Expand All @@ -23,6 +24,7 @@
import org.aion.mcf.blockchain.BlockHeader;
import org.aion.zero.impl.SystemExitCodes;
import org.aion.zero.impl.Version;
import org.aion.zero.impl.blockchain.AionImpl.PendingTxCallback;
import org.aion.zero.impl.config.CfgNetP2p;
import org.aion.mcf.db.Repository;
import org.aion.p2p.Handler;
Expand All @@ -32,7 +34,7 @@
import org.aion.util.bytes.ByteUtil;
import org.aion.zero.impl.db.AionBlockStore;
import org.aion.zero.impl.pendingState.AionPendingStateImpl;
import org.aion.zero.impl.types.A0BlockHeader;
import org.aion.zero.impl.pendingState.IPendingState;
import org.aion.zero.impl.types.AionBlock;
import org.aion.zero.impl.types.AionGenesis;
import org.aion.zero.impl.config.CfgAion;
Expand Down Expand Up @@ -96,14 +98,15 @@ public boolean isRunning() {

private ReentrantLock blockTemplateLock;

public AionHub() {
initializeHub(CfgAion.inst(), null, AionRepositoryImpl.inst(), false);
public AionHub(PendingTxCallback pendingTxCallback) {
initializeHub(CfgAion.inst(), null, AionRepositoryImpl.inst(), pendingTxCallback, false);
}

private void initializeHub(
CfgAion _cfgAion,
AionBlockchainImpl _blockchain,
AionRepositoryImpl _repository,
PendingTxCallback pendingTxCallback,
boolean forTest) {

this.cfg = _cfgAion;
Expand All @@ -120,7 +123,7 @@ private void initializeHub(

this.repository = _repository;

this.mempool = AionPendingStateImpl.create(cfg, blockchain, repository, forTest);
this.mempool = AionPendingStateImpl.create(cfg, blockchain, repository, pendingTxCallback, forTest);

try {
loadBlockchain();
Expand Down Expand Up @@ -206,19 +209,22 @@ private void initializeHub(
callback.updateBlockStatus(blockchain.getBestBlock().getNumber(), blockchain.getBestBlock().getHash(), blockchain.getTotalDifficulty());

blockchain.setNodeStatusCallback(callback);
blockchain.setBestBlockImportCallback(new BestBlockImportCallback(mempool));
}

public static AionHub createForTesting(
CfgAion _cfgAion, AionBlockchainImpl _blockchain, AionRepositoryImpl _repository) {
return new AionHub(_cfgAion, _blockchain, _repository, true);
CfgAion _cfgAion, AionBlockchainImpl _blockchain, AionRepositoryImpl _repository,
PendingTxCallback pendingTxCallback) {
return new AionHub(_cfgAion, _blockchain, _repository, pendingTxCallback, true);
}

private AionHub(
CfgAion _cfgAion,
AionBlockchainImpl _blockchain,
AionRepositoryImpl _repository,
PendingTxCallback pendingTxCallback,
boolean forTest) {
initializeHub(_cfgAion, _blockchain, _repository, forTest);
initializeHub(_cfgAion, _blockchain, _repository, pendingTxCallback, forTest);
}

private void registerCallback() {
Expand Down Expand Up @@ -667,4 +673,21 @@ void updateBlockStatus(long number, byte[] hash, BigInteger td) {
p2pMgr.updateChainInfo(number, hash, td);
}
}

class BestBlockImportCallback {
final IPendingState mempool;

BestBlockImportCallback(IPendingState mempool) {
if (mempool == null) {
throw new IllegalStateException("Mempool input is null!");
}

this.mempool = mempool;
}

void applyBlockUpdate(Block block, List<AionTxReceipt> receipts) {
mempool.applyBlockUpdate(block, receipts);
}

}
}
41 changes: 38 additions & 3 deletions modAionImpl/src/org/aion/zero/impl/blockchain/AionImpl.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
package org.aion.zero.impl.blockchain;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import org.aion.zero.impl.types.PendingTxDetails;
import org.aion.zero.impl.vm.common.VmFatalException;
import org.aion.base.AionTransaction;
import org.aion.crypto.ECKey;
Expand All @@ -22,7 +25,6 @@
import org.aion.zero.impl.config.CfgAion;
import org.aion.zero.impl.db.AionRepositoryImpl;
import org.aion.zero.impl.tx.TxCollector;
import org.aion.zero.impl.types.AionBlock;
import org.aion.base.AionTxReceipt;
import org.slf4j.Logger;

Expand All @@ -41,13 +43,15 @@ public class AionImpl implements IAionChain {

private EquihashMiner equihashMiner;

private List<BlockchainCallbackInterface> blockchainCallbackInterfaces = Collections.synchronizedList(new ArrayList<>());

private AionImpl(boolean forTest) {
this.cfg = CfgAion.inst();
if (forTest) {
cfg.setGenesisForTest();
aionHub = AionHub.createForTesting(cfg, new AionBlockchainImpl(cfg, true), AionRepositoryImpl.inst());
aionHub = AionHub.createForTesting(cfg, new AionBlockchainImpl(cfg, true), AionRepositoryImpl.inst(), new PendingTxCallback(blockchainCallbackInterfaces));
} else {
aionHub = new AionHub();
aionHub = new AionHub(new PendingTxCallback(blockchainCallbackInterfaces));
}

LOG_GEN.info(
Expand Down Expand Up @@ -255,6 +259,11 @@ public Optional<Long> getNetworkBestBlockNumber() {
}
}

@Override
public void setApiServiceCallback(BlockchainCallbackInterface blockchainCallbackForApiServer) {
blockchainCallbackInterfaces.add(blockchainCallbackForApiServer);
}

@Override
public Optional<Long> getInitialStartingBlockNumber() {
try {
Expand Down Expand Up @@ -339,4 +348,30 @@ private static class Holder {
private static class HolderForTest {
static final AionImpl INSTANCE = new AionImpl(true);
}

public static class PendingTxCallback {
List<BlockchainCallbackInterface> callbackInterfaces;

public PendingTxCallback(List<BlockchainCallbackInterface> callbackInterfaces) {
this.callbackInterfaces = callbackInterfaces;
}

public void pendingTxReceivedCallback(List<AionTransaction> newPendingTx) {
for (BlockchainCallbackInterface callbackInterface : callbackInterfaces) {
if (callbackInterface.isForApiServer()) {
for (AionTransaction tx : newPendingTx) {
callbackInterface.pendingTxReceived(tx);
}
}
}
}

public void pendingTxStateUpdateCallback(PendingTxDetails txDetails) {
for (BlockchainCallbackInterface callbackInterface : callbackInterfaces) {
if (callbackInterface.isForApiServer()) {
callbackInterface.pendingTxUpdated(txDetails);
}
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package org.aion.zero.impl.blockchain;

import org.aion.base.AionTransaction;
import org.aion.zero.impl.types.PendingTxDetails;

public interface BlockchainCallbackInterface {

boolean isForApiServer();

void pendingTxReceived(AionTransaction tx);

void pendingTxUpdated(PendingTxDetails txDetails);
}
4 changes: 4 additions & 0 deletions modAionImpl/src/org/aion/zero/impl/blockchain/IAionChain.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@

import java.util.List;
import java.util.Optional;
import java.util.concurrent.BlockingQueue;
import org.aion.base.AionTransaction;
import org.aion.equihash.EquihashMiner;
import org.aion.mcf.blockchain.Block;
import org.aion.mcf.db.Repository;
import org.aion.base.AionTxReceipt;
import org.aion.zero.impl.types.PendingTxDetails;

/** Aion chain interface. */
public interface IAionChain {
Expand Down Expand Up @@ -42,4 +44,6 @@ public interface IAionChain {
Optional<Long> getLocalBestBlockNumber();

Optional<Long> getNetworkBestBlockNumber();

void setApiServiceCallback(BlockchainCallbackInterface blockchainCallbackForApiServer);
}
Loading