From 95abacafe7a21643aface9092c3d81fc8186d043 Mon Sep 17 00:00:00 2001 From: AionJayT Date: Tue, 3 Dec 2019 13:29:43 -0500 Subject: [PATCH 1/5] AKI-608 Removed TXBACKUP0 event & renamed the txbackupflag in the pendingState --- .../pendingState/AionPendingStateImpl.java | 56 ++++++------------- 1 file changed, 18 insertions(+), 38 deletions(-) diff --git a/modAionImpl/src/org/aion/zero/impl/pendingState/AionPendingStateImpl.java b/modAionImpl/src/org/aion/zero/impl/pendingState/AionPendingStateImpl.java index 362ea13ffd..56a574a339 100644 --- a/modAionImpl/src/org/aion/zero/impl/pendingState/AionPendingStateImpl.java +++ b/modAionImpl/src/org/aion/zero/impl/pendingState/AionPendingStateImpl.java @@ -37,7 +37,6 @@ import org.aion.txpool.TxPoolA0; import org.aion.zero.impl.blockchain.AionBlockchainImpl; import org.aion.zero.impl.blockchain.AionImpl; -import org.aion.zero.impl.blockchain.IAionBlockchain; import org.aion.zero.impl.types.TxResponse; import org.aion.zero.impl.config.CfgFork; import org.aion.base.AccountState; @@ -134,7 +133,7 @@ public TransactionSortedSet() { private boolean loadPendingTx; - private boolean poolBackUp; + private boolean poolBackUpEnable; private Map backupPendingPoolAdd; private Map backupPendingCacheAdd; @@ -241,15 +240,6 @@ public void run() { long t2 = System.currentTimeMillis(); LOGGER_TX.debug("Pending state update took {} ms", t2 - t1); } - } else if (e.getEventType() == IHandler.TYPE.TX0.getValue() - && e.getCallbackType() == EventTx.CALLBACK.TXBACKUP0.getValue()) { - long t1 = System.currentTimeMillis(); - backupPendingTx(); - - if (LOGGER_TX.isDebugEnabled()) { - long t2 = System.currentTimeMillis(); - LOGGER_TX.debug("Pending state backupPending took {} ms", t2 - t1); - } } else if (e.getEventType() == IHandler.TYPE.POISONPILL.getValue()) { go = false; } @@ -332,10 +322,10 @@ public void init(final AionBlockchainImpl blockchain, boolean test) { this.transactionStore = blockchain.getTransactionStore(); this.evtMgr = blockchain.getEventMgr(); - this.poolBackUp = CfgAion.inst().getTx().getPoolBackup(); + this.poolBackUpEnable = CfgAion.inst().getTx().getPoolBackup(); this.replayTxBuffer = new ArrayList<>(); this.pendingTxCache = - new PendingTxCache(CfgAion.inst().getTx().getCacheMax(), poolBackUp); + new PendingTxCache(CfgAion.inst().getTx().getCacheMax(), poolBackUpEnable); this.pendingState = repository.startTracking(); this.dumpPool = test || CfgAion.inst().getTx().getPoolDump(); @@ -350,16 +340,10 @@ public void init(final AionBlockchainImpl blockchain, boolean test) { blkHandler.eventCallback(new EventCallback(ees, LOGGER_TX)); } - if (poolBackUp) { + if (poolBackUpEnable) { this.backupPendingPoolAdd = new HashMap<>(); this.backupPendingCacheAdd = new HashMap<>(); this.backupPendingPoolRemove = new HashSet<>(); - - regTxEvents(); - IHandler txHandler = this.evtMgr.getHandler(IHandler.TYPE.TX0.getValue()); - if (txHandler != null) { - txHandler.eventCallback(new EventCallback(ees, LOGGER_TX)); - } } this.bufferEnable = !test && CfgAion.inst().getTx().getBuffer(); @@ -383,7 +367,7 @@ private Set setEvtFilter() { int sn = IHandler.TYPE.BLOCK0.getValue() << 8; eventSN.add(sn + EventBlock.CALLBACK.ONBEST0.getValue()); - if (poolBackUp) { + if (poolBackUpEnable) { sn = IHandler.TYPE.TX0.getValue() << 8; eventSN.add(sn + EventTx.CALLBACK.TXBACKUP0.getValue()); } @@ -399,13 +383,6 @@ private void regBlockEvents() { this.evtMgr.registerEvent(evts); } - private void regTxEvents() { - List evts = new ArrayList<>(); - evts.add(new EventTx(EventTx.CALLBACK.TXBACKUP0)); - - this.evtMgr.registerEvent(evts); - } - public synchronized RepositoryCache getRepository() { // Todo : no class use this method. return pendingState; @@ -473,7 +450,7 @@ public synchronized List addPendingTransactions( newLargeNonceTx.add(tx); addToTxCache(tx); - if (poolBackUp) { + if (poolBackUpEnable) { backupPendingCacheAdd.put(tx.getTransactionHash(), tx.getEncoded()); } @@ -498,7 +475,7 @@ else if (cmp == 0) { newLargeNonceTx.add(tx); addToTxCache(tx); - if (poolBackUp) { + if (poolBackUpEnable) { backupPendingCacheAdd.put(tx.getTransactionHash(), tx.getEncoded()); } @@ -544,7 +521,7 @@ else if (cmp == 0) { if (implResponse.equals(TxResponse.SUCCESS)) { newPending.add(tx); - if (poolBackUp) { + if (poolBackUpEnable) { backupPendingPoolAdd.put(tx.getTransactionHash(), tx.getEncoded()); } } else { @@ -572,7 +549,7 @@ else if (bestRepoNonce(txFrom).compareTo(txNonce) < 1) { newPending.add(tx); txResponses.add(TxResponse.REPAID); - if (poolBackUp) { + if (poolBackUpEnable) { backupPendingPoolAdd.put(tx.getTransactionHash(), tx.getEncoded()); } } else { @@ -745,7 +722,7 @@ private TxResponse addPendingTransactionImpl(final AionTransaction tx) { AionTxReceipt rp = new AionTxReceipt(); rp.setTransaction(rtn.tx); - if (poolBackUp) { + if (poolBackUpEnable) { backupPendingPoolRemove.add(tx.getTransactionHash().clone()); } fireTxUpdate(rp, PendingTransactionState.DROPPED, best.get()); @@ -878,8 +855,11 @@ public synchronized void processBest(Block newBlock, List receipts) { List events = new ArrayList<>(); events.add(new EventTx(EventTx.CALLBACK.PENDINGTXSTATECHANGE0)); - if (poolBackUp) { - events.add(new EventTx(EventTx.CALLBACK.TXBACKUP0)); + if (poolBackUpEnable) { + long t1 = System.currentTimeMillis(); + backupPendingTx(); + long t2 = System.currentTimeMillis(); + LOGGER_TX.debug("Pending state backupPending took {} ms", t2 - t1); } this.evtMgr.newEvents(events); @@ -935,7 +915,7 @@ private void clearOutdated(final long blockNumber) { for (PooledTransaction pooledTx : this.txPool.getOutdatedList()) { outdated.add(pooledTx); - if (poolBackUp) { + if (poolBackUpEnable) { backupPendingPoolRemove.add(pooledTx.tx.getTransactionHash().clone()); } // @Jay @@ -993,7 +973,7 @@ private void clearPending(Block block, List receipts) { receipt = info.getReceipt(); } - if (poolBackUp) { + if (poolBackUpEnable) { backupPendingPoolRemove.add(tx.getTransactionHash().clone()); } fireTxUpdate(receipt, PendingTransactionState.INCLUDED, block); @@ -1049,7 +1029,7 @@ private List rerunTxsInPool(Block block) { } txPool.remove(new PooledTransaction(tx, receipt.getEnergyUsed())); - if (poolBackUp) { + if (poolBackUpEnable) { backupPendingPoolRemove.add(tx.getTransactionHash().clone()); } fireTxUpdate(receipt, PendingTransactionState.DROPPED, block); From 0c9df8c3560e483e2c490a4369a7e431be29bc83 Mon Sep 17 00:00:00 2001 From: AionJayT Date: Wed, 18 Dec 2019 14:03:50 -0500 Subject: [PATCH 2/5] AKI-608 Removed unused ONBLOCK events, setup callback from aionblockchainimpl to pendingstate. --- .../impl/blockchain/AionBlockchainImpl.java | 24 ++++++++------ .../aion/zero/impl/blockchain/AionHub.java | 21 ++++++++++++- .../pendingState/AionPendingStateImpl.java | 31 ++++++------------- .../zero/impl/pendingState/IPendingState.java | 4 +++ .../impl/pendingState/PendingStateTest.java | 10 +++--- 5 files changed, 54 insertions(+), 36 deletions(-) diff --git a/modAionImpl/src/org/aion/zero/impl/blockchain/AionBlockchainImpl.java b/modAionImpl/src/org/aion/zero/impl/blockchain/AionBlockchainImpl.java index b4be566224..a47c7c25cb 100644 --- a/modAionImpl/src/org/aion/zero/impl/blockchain/AionBlockchainImpl.java +++ b/modAionImpl/src/org/aion/zero/impl/blockchain/AionBlockchainImpl.java @@ -8,6 +8,7 @@ import static org.aion.util.conversions.Hex.toHexString; import java.util.EnumMap; +import org.aion.zero.impl.blockchain.AionHub.BestBlockImportCallback; import org.aion.zero.impl.blockchain.AionHub.SelfNodeStatusCallback; import org.aion.zero.impl.core.IDifficultyCalculator; import static org.aion.zero.impl.core.ImportResult.EXIST; @@ -197,6 +198,7 @@ public class AionBlockchainImpl implements IAionBlockchain { .synchronizedMap(new LRUMap<>(64)); private SelfNodeStatusCallback callback; + private BestBlockImportCallback bestBlockCallback; public AionBlockchainImpl(CfgAion cfgAion, boolean forTest) { this(generateBCConfig(cfgAion), AionRepositoryImpl.inst(), @@ -1045,26 +1047,26 @@ && getBlockStore().isBlockStored(block.getHash(), block.getNumber())) { if (callback != null) { callback.updateBlockStatus(block.getNumber(), block.getHash().clone(), block.getTotalDifficulty()); } + + if (bestBlockCallback != null) { + long t1 = System.currentTimeMillis(); + + bestBlockCallback.applyBlockUpdate(block, summary.getReceipts()); + + AionLoggerFactory.getLogger(LogEnum.TX.toString()) + .debug("Pending state update took {} ms", System.currentTimeMillis() - t1); + } } // fire block events if (ret.isSuccessful()) { if (this.evtMgr != null) { - List evts = new ArrayList<>(); IEvent evtOnBlock = new EventBlock(EventBlock.CALLBACK.ONBLOCK0); evtOnBlock.setFuncArgs(Collections.singletonList(summary)); evts.add(evtOnBlock); - IEvent evtTrace = new EventBlock(EventBlock.CALLBACK.ONTRACE0); - String str = String.format("Block chain size: [ %d ]", this.getSizeInternal()); - evtTrace.setFuncArgs(Collections.singletonList(str)); - evts.add(evtTrace); - if (ret == IMPORTED_BEST) { - if (LOG.isTraceEnabled()) { - LOG.trace("IMPORTED_BEST"); - } IEvent evtOnBest = new EventBlock(EventBlock.CALLBACK.ONBEST0); evtOnBest.setFuncArgs(Arrays.asList(block, summary.getReceipts())); evts.add(evtOnBest); @@ -2646,4 +2648,8 @@ public Block getBestBlockWithInfo() { void setNodeStatusCallback(SelfNodeStatusCallback callback) { this.callback = callback; } + + void setBestBlockImportCallback(BestBlockImportCallback callback) { + bestBlockCallback = callback; + } } diff --git a/modAionImpl/src/org/aion/zero/impl/blockchain/AionHub.java b/modAionImpl/src/org/aion/zero/impl/blockchain/AionHub.java index f5d2bbeba3..dbc4a78927 100644 --- a/modAionImpl/src/org/aion/zero/impl/blockchain/AionHub.java +++ b/modAionImpl/src/org/aion/zero/impl/blockchain/AionHub.java @@ -11,6 +11,7 @@ import java.util.ServiceLoader; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.ReentrantLock; +import org.aion.base.AionTxReceipt; import org.aion.base.ConstantUtil; import org.aion.evtmgr.EventMgrModule; import org.aion.evtmgr.IEvent; @@ -32,7 +33,7 @@ import org.aion.util.bytes.ByteUtil; import org.aion.zero.impl.db.AionBlockStore; import org.aion.zero.impl.pendingState.AionPendingStateImpl; -import org.aion.zero.impl.types.A0BlockHeader; +import org.aion.zero.impl.pendingState.IPendingState; import org.aion.zero.impl.types.AionBlock; import org.aion.zero.impl.types.AionGenesis; import org.aion.zero.impl.config.CfgAion; @@ -206,6 +207,7 @@ private void initializeHub( callback.updateBlockStatus(blockchain.getBestBlock().getNumber(), blockchain.getBestBlock().getHash(), blockchain.getTotalDifficulty()); blockchain.setNodeStatusCallback(callback); + blockchain.setBestBlockImportCallback(new BestBlockImportCallback(mempool)); } public static AionHub createForTesting( @@ -667,4 +669,21 @@ void updateBlockStatus(long number, byte[] hash, BigInteger td) { p2pMgr.updateChainInfo(number, hash, td); } } + + class BestBlockImportCallback { + final IPendingState mempool; + + BestBlockImportCallback(IPendingState mempool) { + if (mempool == null) { + throw new IllegalStateException("Mempool input is null!"); + } + + this.mempool = mempool; + } + + void applyBlockUpdate(Block block, List receipts) { + mempool.applyBlockUpdate(block, receipts); + } + + } } diff --git a/modAionImpl/src/org/aion/zero/impl/pendingState/AionPendingStateImpl.java b/modAionImpl/src/org/aion/zero/impl/pendingState/AionPendingStateImpl.java index 56a574a339..92074948e9 100644 --- a/modAionImpl/src/org/aion/zero/impl/pendingState/AionPendingStateImpl.java +++ b/modAionImpl/src/org/aion/zero/impl/pendingState/AionPendingStateImpl.java @@ -231,16 +231,7 @@ public void run() { while (go) { IEvent e = ees.take(); - if (e.getEventType() == IHandler.TYPE.BLOCK0.getValue() - && e.getCallbackType() == EventBlock.CALLBACK.ONBEST0.getValue()) { - long t1 = System.currentTimeMillis(); - processBest((Block) e.getFuncArgs().get(0), (List) e.getFuncArgs().get(1)); - - if (LOGGER_TX.isDebugEnabled()) { - long t2 = System.currentTimeMillis(); - LOGGER_TX.debug("Pending state update took {} ms", t2 - t1); - } - } else if (e.getEventType() == IHandler.TYPE.POISONPILL.getValue()) { + if (e.getEventType() == IHandler.TYPE.POISONPILL.getValue()) { go = false; } } @@ -333,8 +324,6 @@ public void init(final AionBlockchainImpl blockchain, boolean test) { ees = new EventExecuteService(1000, "EpPS", Thread.MAX_PRIORITY, LOGGER_TX); ees.setFilter(setEvtFilter()); - regBlockEvents(); - IHandler blkHandler = this.evtMgr.getHandler(IHandler.TYPE.BLOCK0.getValue()); if (blkHandler != null) { blkHandler.eventCallback(new EventCallback(ees, LOGGER_TX)); @@ -375,14 +364,6 @@ private Set setEvtFilter() { return eventSN; } - private void regBlockEvents() { - List evts = new ArrayList<>(); - evts.add(new EventBlock(EventBlock.CALLBACK.ONBLOCK0)); - evts.add(new EventBlock(EventBlock.CALLBACK.ONBEST0)); - - this.evtMgr.registerEvent(evts); - } - public synchronized RepositoryCache getRepository() { // Todo : no class use this method. return pendingState; @@ -772,7 +753,15 @@ private Block findCommonAncestor(Block b1, Block b2) { return b1; } - public synchronized void processBest(Block newBlock, List receipts) { + /** + * AKI-608 + * The method called by the AionblockchainImpl through callback, currently it will block the block import. + * TODO : Sync or Async from the callback. + * @param newBlock + * @param receipts + */ + @Override + public synchronized void applyBlockUpdate(Block newBlock, List receipts) { if (isSeed) { // seed mode doesn't need to update the pendingState diff --git a/modAionImpl/src/org/aion/zero/impl/pendingState/IPendingState.java b/modAionImpl/src/org/aion/zero/impl/pendingState/IPendingState.java index 83582cdab2..99d07338cb 100644 --- a/modAionImpl/src/org/aion/zero/impl/pendingState/IPendingState.java +++ b/modAionImpl/src/org/aion/zero/impl/pendingState/IPendingState.java @@ -2,7 +2,11 @@ import java.util.List; import org.aion.base.AionTransaction; +import org.aion.base.AionTxReceipt; +import org.aion.mcf.blockchain.Block; public interface IPendingState { List getPendingTransactions(); + + void applyBlockUpdate(Block newBlock, List receipts); } diff --git a/modAionImpl/test/org/aion/zero/impl/pendingState/PendingStateTest.java b/modAionImpl/test/org/aion/zero/impl/pendingState/PendingStateTest.java index dd0bef193c..357a7fd5a4 100644 --- a/modAionImpl/test/org/aion/zero/impl/pendingState/PendingStateTest.java +++ b/modAionImpl/test/org/aion/zero/impl/pendingState/PendingStateTest.java @@ -437,7 +437,7 @@ public void replayTransactionWithDoubleEnergyPrice() { Pair connectResult = blockchain.tryToConnectAndFetchSummary(block); assertEquals(connectResult.getLeft(), ImportResult.IMPORTED_BEST); - (pendingState).processBest(block, connectResult.getRight().getReceipts()); + (pendingState).applyBlockUpdate(block, connectResult.getRight().getReceipts()); assertEquals(pendingState.getPendingTransactions().get(0), tx2); } @@ -477,7 +477,7 @@ public void replayTransactionWithDoubleEnergyPriceAfterSealing() { Pair connectResult = blockchain.tryToConnectAndFetchSummary(block); assertEquals(connectResult.getLeft(), ImportResult.IMPORTED_BEST); - pendingState.processBest(block, connectResult.getRight().getReceipts()); + pendingState.applyBlockUpdate(block, connectResult.getRight().getReceipts()); assertEquals(0, pendingState.getPendingTxSize()); } @@ -545,7 +545,7 @@ public void replayTransactionThatUsesEntireBalance() { Pair connectResult = blockchain.tryToConnectAndFetchSummary(block); assertEquals(connectResult.getLeft(), ImportResult.IMPORTED_BEST); - pendingState.processBest(block, connectResult.getRight().getReceipts()); + pendingState.applyBlockUpdate(block, connectResult.getRight().getReceipts()); // tx3 should replace tx2, and tx4 will now have insufficient funds so it will get dropped assertEquals(2, pendingState.getPendingTxSize()); assertEquals(pendingState.getPendingTransactions().get(1), tx3); @@ -627,7 +627,7 @@ public void replayTransactionThatThatInvalidatesMiddleTx() { Pair connectResult = blockchain.tryToConnectAndFetchSummary(block); assertEquals(connectResult.getLeft(), ImportResult.IMPORTED_BEST); - pendingState.processBest(block, connectResult.getRight().getReceipts()); + pendingState.applyBlockUpdate(block, connectResult.getRight().getReceipts()); // tx3 should replace tx2, and tx4 will now have insufficient funds so it will get dropped assertEquals(2, pendingState.getPendingTxSize()); assertEquals(pendingState.getPendingTransactions().get(1), tx3); @@ -695,7 +695,7 @@ public void replayInvalidTransactionInMiddle() { Pair connectResult = blockchain.tryToConnectAndFetchSummary(block); assertEquals(connectResult.getLeft(), ImportResult.IMPORTED_BEST); - pendingState.processBest(block, connectResult.getRight().getReceipts()); + pendingState.applyBlockUpdate(block, connectResult.getRight().getReceipts()); assertEquals(1, pendingState.getPendingTxSize()); } From 49715ca549f73f77867aa7fc679872c53d749ab0 Mon Sep 17 00:00:00 2001 From: AionJayT Date: Tue, 3 Dec 2019 15:08:10 -0500 Subject: [PATCH 3/5] AKI-608 Removed unused PENDINGTXSTATECHANGE0 event in the pendingState --- .../aion/zero/impl/pendingState/AionPendingStateImpl.java | 8 -------- modAionImpl/src/org/aion/zero/impl/pow/AionPoW.java | 1 - 2 files changed, 9 deletions(-) diff --git a/modAionImpl/src/org/aion/zero/impl/pendingState/AionPendingStateImpl.java b/modAionImpl/src/org/aion/zero/impl/pendingState/AionPendingStateImpl.java index 92074948e9..213b3aea65 100644 --- a/modAionImpl/src/org/aion/zero/impl/pendingState/AionPendingStateImpl.java +++ b/modAionImpl/src/org/aion/zero/impl/pendingState/AionPendingStateImpl.java @@ -556,9 +556,6 @@ else if (bestRepoNonce(txFrom).compareTo(txNonce) < 1) { IEvent evtRecv = new EventTx(EventTx.CALLBACK.PENDINGTXRECEIVED0); evtRecv.setFuncArgs(Collections.singletonList(newPending)); this.evtMgr.newEvent(evtRecv); - - IEvent evtChange = new EventTx(EventTx.CALLBACK.PENDINGTXSTATECHANGE0); - this.evtMgr.newEvent(evtChange); } if (!loadPendingTx) { @@ -841,9 +838,6 @@ public synchronized void applyBlockUpdate(Block newBlock, List re flushCachePendingTx(); - List events = new ArrayList<>(); - events.add(new EventTx(EventTx.CALLBACK.PENDINGTXSTATECHANGE0)); - if (poolBackUpEnable) { long t1 = System.currentTimeMillis(); backupPendingTx(); @@ -851,8 +845,6 @@ public synchronized void applyBlockUpdate(Block newBlock, List re LOGGER_TX.debug("Pending state backupPending took {} ms", t2 - t1); } - this.evtMgr.newEvents(events); - // This is for debug purpose, do not use in the regular kernel running. if (this.dumpPool) { DumpPool(); diff --git a/modAionImpl/src/org/aion/zero/impl/pow/AionPoW.java b/modAionImpl/src/org/aion/zero/impl/pow/AionPoW.java index 9cf594548d..8a3efb5d2f 100644 --- a/modAionImpl/src/org/aion/zero/impl/pow/AionPoW.java +++ b/modAionImpl/src/org/aion/zero/impl/pow/AionPoW.java @@ -148,7 +148,6 @@ private void setupHandler() { List txEvts = new ArrayList<>(); txEvts.add(new EventTx(EventTx.CALLBACK.PENDINGTXRECEIVED0)); txEvts.add(new EventTx(EventTx.CALLBACK.PENDINGTXUPDATE0)); - txEvts.add(new EventTx(EventTx.CALLBACK.PENDINGTXSTATECHANGE0)); eventMgr.registerEvent(txEvts); List events = new ArrayList<>(); From 8a4203d50702573e4c895a8458cf804ceac85cf7 Mon Sep 17 00:00:00 2001 From: AionJayT Date: Thu, 2 Jan 2020 14:09:31 -0500 Subject: [PATCH 4/5] AKI-608 Removed PENDINGTXSTATE events, Added blockchainCallbackInterface into the AionImpl and use it as a bridge between the apiServer and the pendingState --- .../aion/zero/impl/blockchain/AionHub.java | 16 +++-- .../aion/zero/impl/blockchain/AionImpl.java | 41 +++++++++++- .../BlockchainCallbackInterface.java | 13 ++++ .../aion/zero/impl/blockchain/IAionChain.java | 4 ++ .../pendingState/AionPendingStateImpl.java | 67 ++++++++++--------- .../zero/impl/pendingState/IPendingState.java | 2 + .../src/org/aion/zero/impl/pow/AionPoW.java | 40 ++++------- .../zero/impl/types/PendingTxDetails.java | 15 +++++ .../zero/impl/blockchain/AionHubTest.java | 13 ++-- .../impl/pendingState/PendingStateTest.java | 8 ++- .../zero/impl/sync/BlockPropagationTest.java | 9 +-- modApiServer/src/module-info.java | 1 + .../src/org/aion/api/server/ApiAion.java | 27 +------- .../BlockchainCallbackForApiServer.java | 32 +++++++++ .../src/org/aion/api/server/pb/ApiAion0.java | 36 +++++----- .../org/aion/api/server/rpc/ApiWeb3Aion.java | 4 +- .../org/aion/api/server/rpc/RpcMethods.java | 7 +- .../test/org/aion/api/server/ApiAionTest.java | 34 +--------- modBoot/src/org/aion/Aion.java | 7 +- 19 files changed, 217 insertions(+), 159 deletions(-) create mode 100644 modAionImpl/src/org/aion/zero/impl/blockchain/BlockchainCallbackInterface.java create mode 100644 modAionImpl/src/org/aion/zero/impl/types/PendingTxDetails.java create mode 100644 modApiServer/src/org/aion/api/server/BlockchainCallbackForApiServer.java diff --git a/modAionImpl/src/org/aion/zero/impl/blockchain/AionHub.java b/modAionImpl/src/org/aion/zero/impl/blockchain/AionHub.java index dbc4a78927..05a0330841 100644 --- a/modAionImpl/src/org/aion/zero/impl/blockchain/AionHub.java +++ b/modAionImpl/src/org/aion/zero/impl/blockchain/AionHub.java @@ -24,6 +24,7 @@ import org.aion.mcf.blockchain.BlockHeader; import org.aion.zero.impl.SystemExitCodes; import org.aion.zero.impl.Version; +import org.aion.zero.impl.blockchain.AionImpl.PendingTxCallback; import org.aion.zero.impl.config.CfgNetP2p; import org.aion.mcf.db.Repository; import org.aion.p2p.Handler; @@ -97,14 +98,15 @@ public boolean isRunning() { private ReentrantLock blockTemplateLock; - public AionHub() { - initializeHub(CfgAion.inst(), null, AionRepositoryImpl.inst(), false); + public AionHub(PendingTxCallback pendingTxCallback) { + initializeHub(CfgAion.inst(), null, AionRepositoryImpl.inst(), pendingTxCallback, false); } private void initializeHub( CfgAion _cfgAion, AionBlockchainImpl _blockchain, AionRepositoryImpl _repository, + PendingTxCallback pendingTxCallback, boolean forTest) { this.cfg = _cfgAion; @@ -121,7 +123,7 @@ private void initializeHub( this.repository = _repository; - this.mempool = AionPendingStateImpl.create(cfg, blockchain, repository, forTest); + this.mempool = AionPendingStateImpl.create(cfg, blockchain, repository, pendingTxCallback, forTest); try { loadBlockchain(); @@ -211,16 +213,18 @@ private void initializeHub( } public static AionHub createForTesting( - CfgAion _cfgAion, AionBlockchainImpl _blockchain, AionRepositoryImpl _repository) { - return new AionHub(_cfgAion, _blockchain, _repository, true); + CfgAion _cfgAion, AionBlockchainImpl _blockchain, AionRepositoryImpl _repository, + PendingTxCallback pendingTxCallback) { + return new AionHub(_cfgAion, _blockchain, _repository, pendingTxCallback, true); } private AionHub( CfgAion _cfgAion, AionBlockchainImpl _blockchain, AionRepositoryImpl _repository, + PendingTxCallback pendingTxCallback, boolean forTest) { - initializeHub(_cfgAion, _blockchain, _repository, forTest); + initializeHub(_cfgAion, _blockchain, _repository, pendingTxCallback, forTest); } private void registerCallback() { diff --git a/modAionImpl/src/org/aion/zero/impl/blockchain/AionImpl.java b/modAionImpl/src/org/aion/zero/impl/blockchain/AionImpl.java index 50321f85a2..8b5742fdad 100644 --- a/modAionImpl/src/org/aion/zero/impl/blockchain/AionImpl.java +++ b/modAionImpl/src/org/aion/zero/impl/blockchain/AionImpl.java @@ -1,7 +1,10 @@ package org.aion.zero.impl.blockchain; +import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Optional; +import org.aion.zero.impl.types.PendingTxDetails; import org.aion.zero.impl.vm.common.VmFatalException; import org.aion.base.AionTransaction; import org.aion.crypto.ECKey; @@ -22,7 +25,6 @@ import org.aion.zero.impl.config.CfgAion; import org.aion.zero.impl.db.AionRepositoryImpl; import org.aion.zero.impl.tx.TxCollector; -import org.aion.zero.impl.types.AionBlock; import org.aion.base.AionTxReceipt; import org.slf4j.Logger; @@ -41,13 +43,15 @@ public class AionImpl implements IAionChain { private EquihashMiner equihashMiner; + private List blockchainCallbackInterfaces = Collections.synchronizedList(new ArrayList<>()); + private AionImpl(boolean forTest) { this.cfg = CfgAion.inst(); if (forTest) { cfg.setGenesisForTest(); - aionHub = AionHub.createForTesting(cfg, new AionBlockchainImpl(cfg, true), AionRepositoryImpl.inst()); + aionHub = AionHub.createForTesting(cfg, new AionBlockchainImpl(cfg, true), AionRepositoryImpl.inst(), new PendingTxCallback(blockchainCallbackInterfaces)); } else { - aionHub = new AionHub(); + aionHub = new AionHub(new PendingTxCallback(blockchainCallbackInterfaces)); } LOG_GEN.info( @@ -255,6 +259,11 @@ public Optional getNetworkBestBlockNumber() { } } + @Override + public void setApiServiceCallback(BlockchainCallbackInterface blockchainCallbackForApiServer) { + blockchainCallbackInterfaces.add(blockchainCallbackForApiServer); + } + @Override public Optional getInitialStartingBlockNumber() { try { @@ -339,4 +348,30 @@ private static class Holder { private static class HolderForTest { static final AionImpl INSTANCE = new AionImpl(true); } + + public static class PendingTxCallback { + List callbackInterfaces; + + public PendingTxCallback(List callbackInterfaces) { + this.callbackInterfaces = callbackInterfaces; + } + + public void pendingTxReceivedCallback(List newPendingTx) { + for (BlockchainCallbackInterface callbackInterface : callbackInterfaces) { + if (callbackInterface.isForApiServer()) { + for (AionTransaction tx : newPendingTx) { + callbackInterface.pendingTxReceived(tx); + } + } + } + } + + public void pendingTxStateUpdateCallback(PendingTxDetails txDetails) { + for (BlockchainCallbackInterface callbackInterface : callbackInterfaces) { + if (callbackInterface.isForApiServer()) { + callbackInterface.pendingTxUpdated(txDetails); + } + } + } + } } diff --git a/modAionImpl/src/org/aion/zero/impl/blockchain/BlockchainCallbackInterface.java b/modAionImpl/src/org/aion/zero/impl/blockchain/BlockchainCallbackInterface.java new file mode 100644 index 0000000000..79d248babe --- /dev/null +++ b/modAionImpl/src/org/aion/zero/impl/blockchain/BlockchainCallbackInterface.java @@ -0,0 +1,13 @@ +package org.aion.zero.impl.blockchain; + +import org.aion.base.AionTransaction; +import org.aion.zero.impl.types.PendingTxDetails; + +public interface BlockchainCallbackInterface { + + boolean isForApiServer(); + + void pendingTxReceived(AionTransaction tx); + + void pendingTxUpdated(PendingTxDetails txDetails); +} diff --git a/modAionImpl/src/org/aion/zero/impl/blockchain/IAionChain.java b/modAionImpl/src/org/aion/zero/impl/blockchain/IAionChain.java index 276797687e..d326beb7e3 100644 --- a/modAionImpl/src/org/aion/zero/impl/blockchain/IAionChain.java +++ b/modAionImpl/src/org/aion/zero/impl/blockchain/IAionChain.java @@ -2,11 +2,13 @@ import java.util.List; import java.util.Optional; +import java.util.concurrent.BlockingQueue; import org.aion.base.AionTransaction; import org.aion.equihash.EquihashMiner; import org.aion.mcf.blockchain.Block; import org.aion.mcf.db.Repository; import org.aion.base.AionTxReceipt; +import org.aion.zero.impl.types.PendingTxDetails; /** Aion chain interface. */ public interface IAionChain { @@ -42,4 +44,6 @@ public interface IAionChain { Optional getLocalBestBlockNumber(); Optional getNetworkBestBlockNumber(); + + void setApiServiceCallback(BlockchainCallbackInterface blockchainCallbackForApiServer); } diff --git a/modAionImpl/src/org/aion/zero/impl/pendingState/AionPendingStateImpl.java b/modAionImpl/src/org/aion/zero/impl/pendingState/AionPendingStateImpl.java index 213b3aea65..414e679616 100644 --- a/modAionImpl/src/org/aion/zero/impl/pendingState/AionPendingStateImpl.java +++ b/modAionImpl/src/org/aion/zero/impl/pendingState/AionPendingStateImpl.java @@ -17,9 +17,12 @@ import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; import java.util.stream.Stream; +import org.aion.zero.impl.blockchain.AionImpl.PendingTxCallback; +import org.aion.zero.impl.types.PendingTxDetails; import org.aion.zero.impl.vm.common.VmFatalException; import org.aion.base.AionTransaction; import org.aion.base.PooledTransaction; @@ -148,6 +151,9 @@ public TransactionSortedSet() { private long fork040Block = -1; private boolean fork040Enable = false; + private AtomicBoolean pendingTxReceivedforMining; + private PendingTxCallback pendingTxCallback; + class TxBuffTask implements Runnable { @Override @@ -230,7 +236,6 @@ private final class EpPS implements Runnable { public void run() { while (go) { IEvent e = ees.take(); - if (e.getEventType() == IHandler.TYPE.POISONPILL.getValue()) { go = false; } @@ -261,13 +266,13 @@ private synchronized void backupPendingTx() { pendingTxCache.clearCacheTxHash(); } - public static AionPendingStateImpl create(CfgAion cfgAion, AionBlockchainImpl blockchain, AionRepositoryImpl repository, boolean forTest) { - AionPendingStateImpl ps = new AionPendingStateImpl(cfgAion, repository); + public static AionPendingStateImpl create(CfgAion cfgAion, AionBlockchainImpl blockchain, AionRepositoryImpl repository, PendingTxCallback pendingTxCallback, boolean forTest) { + AionPendingStateImpl ps = new AionPendingStateImpl(cfgAion, repository, pendingTxCallback); ps.init(blockchain, forTest); return ps; } - private AionPendingStateImpl(CfgAion _cfgAion, AionRepositoryImpl _repository) { + private AionPendingStateImpl(CfgAion _cfgAion, AionRepositoryImpl _repository, PendingTxCallback pendingTxCallback) { this.repository = _repository; @@ -299,6 +304,9 @@ private AionPendingStateImpl(CfgAion _cfgAion, AionRepositoryImpl _repository) { if (fork040 != null) { fork040Block = Long.valueOf(fork040); } + + this.pendingTxCallback = pendingTxCallback; + this.pendingTxReceivedforMining = new AtomicBoolean(); } public void init(final AionBlockchainImpl blockchain, boolean test) { @@ -553,9 +561,8 @@ else if (bestRepoNonce(txFrom).compareTo(txNonce) < 1) { } if (!newPending.isEmpty()) { - IEvent evtRecv = new EventTx(EventTx.CALLBACK.PENDINGTXRECEIVED0); - evtRecv.setFuncArgs(Collections.singletonList(newPending)); - this.evtMgr.newEvent(evtRecv); + pendingTxCallback.pendingTxReceivedCallback(newPending); + pendingTxReceivedforMining.set(true); } if (!loadPendingTx) { @@ -599,31 +606,22 @@ private boolean inPool(BigInteger txNonce, AionAddress from) { return (this.txPool.bestPoolNonce(from).compareTo(txNonce) > -1); } - private void fireTxUpdate( - AionTxReceipt txReceipt, PendingTransactionState state, Block block) { - if (LOGGER_TX.isTraceEnabled()) { - LOGGER_TX.trace( - String.format( - "PendingTransactionUpdate: (Tot: %3s) %12s : %s %8s %s [%s]", - getPendingTxSize(), - state, - txReceipt - .getTransaction() - .getSenderAddress() - .toString() - .substring(0, 8), - ByteUtil.byteArrayToLong(txReceipt.getTransaction().getNonce()), - block.getShortDescr(), - txReceipt.getError())); - } - - IEvent evt = new EventTx(EventTx.CALLBACK.PENDINGTXUPDATE0); - List args = new ArrayList<>(); - args.add(txReceipt); - args.add(state.getValue()); - args.add(block); - evt.setFuncArgs(args); - this.evtMgr.newEvent(evt); + private void fireTxUpdate(AionTxReceipt txReceipt, PendingTransactionState state, Block block) { + LOGGER_TX.trace( + String.format( + "PendingTransactionUpdate: (Tot: %3s) %12s : %s %8s %s [%s]", + getPendingTxSize(), + state, + txReceipt + .getTransaction() + .getSenderAddress() + .toString() + .substring(0, 8), + ByteUtil.byteArrayToLong(txReceipt.getTransaction().getNonce()), + block.getShortDescr(), + txReceipt.getError())); + + pendingTxCallback.pendingTxStateUpdateCallback(new PendingTxDetails(state.getValue(), txReceipt, block.getNumber())); } /** @@ -851,6 +849,11 @@ public synchronized void applyBlockUpdate(Block newBlock, List re } } + @Override + public void setNewPendingReceiveForMining(boolean newPendingTxReceived) { + pendingTxReceivedforMining.set(newPendingTxReceived); + } + private void flushCachePendingTx() { Set cacheTxAccount = this.pendingTxCache.getCacheTxAccount(); diff --git a/modAionImpl/src/org/aion/zero/impl/pendingState/IPendingState.java b/modAionImpl/src/org/aion/zero/impl/pendingState/IPendingState.java index 99d07338cb..23aa2e3daa 100644 --- a/modAionImpl/src/org/aion/zero/impl/pendingState/IPendingState.java +++ b/modAionImpl/src/org/aion/zero/impl/pendingState/IPendingState.java @@ -9,4 +9,6 @@ public interface IPendingState { List getPendingTransactions(); void applyBlockUpdate(Block newBlock, List receipts); + + void setNewPendingReceiveForMining(boolean newPendingTxReceived); } diff --git a/modAionImpl/src/org/aion/zero/impl/pow/AionPoW.java b/modAionImpl/src/org/aion/zero/impl/pow/AionPoW.java index 8a3efb5d2f..3ce3e6f407 100644 --- a/modAionImpl/src/org/aion/zero/impl/pow/AionPoW.java +++ b/modAionImpl/src/org/aion/zero/impl/pow/AionPoW.java @@ -21,7 +21,6 @@ import org.aion.evtmgr.impl.es.EventExecuteService; import org.aion.evtmgr.impl.evt.EventBlock; import org.aion.evtmgr.impl.evt.EventConsensus; -import org.aion.evtmgr.impl.evt.EventTx; import org.aion.log.AionLoggerFactory; import org.aion.log.LogEnum; import org.aion.mcf.blockchain.Block; @@ -47,13 +46,13 @@ public class AionPoW { protected IAionBlockchain blockchain; protected IPendingState pendingState; - protected IEventMgr eventMgr; + private IEventMgr eventMgr; - protected AtomicBoolean initialized = new AtomicBoolean(false); - protected AtomicBoolean newPendingTxReceived = new AtomicBoolean(false); + private AtomicBoolean initialized = new AtomicBoolean(false); + private AtomicBoolean newPendingTxReceived = new AtomicBoolean(false); // This value is the time of the last "full update" of the block template, that is, the last time // we created a fresh block from createNewBlockTemplate() - protected AtomicLong lastUpdate = new AtomicLong(0); + private AtomicLong lastUpdate = new AtomicLong(0); private AionBlock latestBlockTemplate; private AtomicBoolean shutDown = new AtomicBoolean(); @@ -68,12 +67,8 @@ private final class EpPOW implements Runnable { public void run() { while (go) { IEvent e = ees.take(); - - if (e.getEventType() == IHandler.TYPE.TX0.getValue() - && e.getCallbackType() == EventTx.CALLBACK.PENDINGTXRECEIVED0.getValue()) { - newPendingTxReceived.set(true); - } else if (e.getEventType() == IHandler.TYPE.BLOCK0.getValue() - && e.getCallbackType() == EventBlock.CALLBACK.ONBEST0.getValue()) { + if (e.getEventType() == IHandler.TYPE.BLOCK0.getValue() + && e.getCallbackType() == EventBlock.CALLBACK.ONBEST0.getValue()) { // create a new block template every time the best block // updates. createNewBlockTemplate(); @@ -106,6 +101,8 @@ public void init(IAionBlockchain blockchain, IPendingState pendingState, IEventM if (initialized.compareAndSet(false, true)) { this.blockchain = blockchain; this.pendingState = pendingState; + this.pendingState.setNewPendingReceiveForMining(newPendingTxReceived.get()); + this.eventMgr = eventMgr; this.syncMgr = syncMgr; @@ -145,11 +142,6 @@ public void init(IAionBlockchain blockchain, IPendingState pendingState, IEventM /** Sets up the consensus event handler. */ private void setupHandler() { - List txEvts = new ArrayList<>(); - txEvts.add(new EventTx(EventTx.CALLBACK.PENDINGTXRECEIVED0)); - txEvts.add(new EventTx(EventTx.CALLBACK.PENDINGTXUPDATE0)); - eventMgr.registerEvent(txEvts); - List events = new ArrayList<>(); events.add(new EventConsensus(EventConsensus.CALLBACK.ON_BLOCK_TEMPLATE)); events.add(new EventConsensus(EventConsensus.CALLBACK.ON_SOLUTION)); @@ -158,10 +150,7 @@ private void setupHandler() { private Set setEvtFilter() { Set eventSN = new HashSet<>(); - int sn = IHandler.TYPE.TX0.getValue() << 8; - eventSN.add(sn + EventTx.CALLBACK.PENDINGTXRECEIVED0.getValue()); - - sn = IHandler.TYPE.CONSENSUS.getValue() << 8; + int sn = IHandler.TYPE.CONSENSUS.getValue() << 8; eventSN.add(sn + EventConsensus.CALLBACK.ON_SOLUTION.getValue()); sn = IHandler.TYPE.BLOCK0.getValue() << 8; @@ -174,15 +163,12 @@ private Set setEvtFilter() { * Registers callback for the {@link * org.aion.evtmgr.impl.evt.EventConsensus.CALLBACK#ON_SOLUTION} event. */ - public void registerCallback() { + private void registerCallback() { IHandler consensusHandler = eventMgr.getHandler(IHandler.TYPE.CONSENSUS.getValue()); consensusHandler.eventCallback(new EventCallback(ees, LOG)); IHandler blockHandler = eventMgr.getHandler(IHandler.TYPE.BLOCK0.getValue()); blockHandler.eventCallback(new EventCallback(ees, LOG)); - - IHandler transactionHandler = eventMgr.getHandler(IHandler.TYPE.TX0.getValue()); - transactionHandler.eventCallback(new EventCallback(ees, LOG)); } /** @@ -190,7 +176,7 @@ public void registerCallback() { * * @param solution The generated equihash solution */ - protected synchronized void processSolution(AionPowSolution solution) { + private synchronized void processSolution(AionPowSolution solution) { if (!shutDown.get()) { if (LOG.isDebugEnabled()) { LOG.debug("Best block num [{}]", blockchain.getBestBlock().getNumber()); @@ -249,7 +235,7 @@ protected synchronized void processSolution(AionPowSolution solution) { } /** Creates a new block template. */ - protected synchronized void createNewBlockTemplate() { + private synchronized void createNewBlockTemplate() { if (!shutDown.get()) { // TODO: Validate the trustworthiness of getNetworkBestBlock - can // it be used in DDOS? @@ -292,7 +278,7 @@ protected synchronized void createNewBlockTemplate() { } /** Creates a new block template. */ - protected synchronized void updateTimestamp(long systemTime) { + private synchronized void updateTimestamp(long systemTime) { if (!shutDown.get() && systemTime > latestBlockTemplate.getTimestamp()) { A0BlockHeader newHeader = latestBlockTemplate.getHeader().updateTimestamp(systemTime); AionBlock newBlock = new AionBlock(newHeader, latestBlockTemplate.getTransactionsList()); diff --git a/modAionImpl/src/org/aion/zero/impl/types/PendingTxDetails.java b/modAionImpl/src/org/aion/zero/impl/types/PendingTxDetails.java new file mode 100644 index 0000000000..ad87027cfb --- /dev/null +++ b/modAionImpl/src/org/aion/zero/impl/types/PendingTxDetails.java @@ -0,0 +1,15 @@ +package org.aion.zero.impl.types; + +import org.aion.base.AionTxReceipt; + +public class PendingTxDetails { + public final int state; + public final AionTxReceipt receipt; + public final long blockNumber; + + public PendingTxDetails(int pendingState, AionTxReceipt receipt, long blockNumber) { + state = pendingState; + this.receipt = receipt; + this.blockNumber = blockNumber; + } +} diff --git a/modAionImpl/test/org/aion/zero/impl/blockchain/AionHubTest.java b/modAionImpl/test/org/aion/zero/impl/blockchain/AionHubTest.java index 0c70a04d29..9a1de266f3 100644 --- a/modAionImpl/test/org/aion/zero/impl/blockchain/AionHubTest.java +++ b/modAionImpl/test/org/aion/zero/impl/blockchain/AionHubTest.java @@ -18,6 +18,7 @@ import org.aion.log.LogEnum; import org.aion.log.LogLevel; import org.aion.mcf.blockchain.Block; +import org.aion.zero.impl.blockchain.AionImpl.PendingTxCallback; import org.aion.zero.impl.core.ImportResult; import org.aion.zero.impl.trie.TrieImpl; import org.aion.zero.impl.types.BlockContext; @@ -92,7 +93,8 @@ public void MockHubInst_wStartAtGenesis() { StandaloneBlockchain chain = bundle.bc; chain.setBestBlock(chain.getGenesis()); - AionHub hub = AionHub.createForTesting(CfgAion.inst(), chain, chain.getRepository()); + AionHub hub = AionHub.createForTesting(CfgAion.inst(), chain, chain.getRepository(), + new PendingTxCallback(new ArrayList<>())); checkHubNullity(hub); Block blk = hub.getStartingBlock(); @@ -112,7 +114,8 @@ public void MockHubInst_wStartAtBlock() { int expectedStartBlock = 6; generateRandomChainWithoutTransactions(chain, expectedStartBlock, 1); - AionHub hub = AionHub.createForTesting(CfgAion.inst(), chain, chain.getRepository()); + AionHub hub = AionHub.createForTesting(CfgAion.inst(), chain, chain.getRepository(), + new PendingTxCallback(new ArrayList<>())); checkHubNullity(hub); Block blk = hub.getStartingBlock(); @@ -179,7 +182,8 @@ public void MockHubInst_wStartRecovery() { assertThat(trie.isValidRoot(chain.getBestBlock().getStateRoot())).isFalse(); // recovery should be called by loadBlockchain() - AionHub hub = AionHub.createForTesting(CfgAion.inst(), chain, chain.getRepository()); + AionHub hub = AionHub.createForTesting(CfgAion.inst(), chain, chain.getRepository(), + new PendingTxCallback(new ArrayList<>())); checkHubNullity(hub); Block blk = hub.getStartingBlock(); @@ -267,7 +271,8 @@ public void MockHubInst_wStartRollback() { repo.flush(); // recovery should be called by loadBlockchain() - AionHub hub = AionHub.createForTesting(CfgAion.inst(), chain, chain.getRepository()); + AionHub hub = AionHub.createForTesting(CfgAion.inst(), chain, chain.getRepository(), + new PendingTxCallback(new ArrayList<>())); checkHubNullity(hub); assertEquals(td6, chain.getTotalDifficulty()); diff --git a/modAionImpl/test/org/aion/zero/impl/pendingState/PendingStateTest.java b/modAionImpl/test/org/aion/zero/impl/pendingState/PendingStateTest.java index 357a7fd5a4..686ae5ee0c 100644 --- a/modAionImpl/test/org/aion/zero/impl/pendingState/PendingStateTest.java +++ b/modAionImpl/test/org/aion/zero/impl/pendingState/PendingStateTest.java @@ -5,6 +5,7 @@ import static org.junit.Assert.assertNotEquals; import java.math.BigInteger; +import java.util.ArrayList; import java.util.Collections; import java.util.List; import org.aion.avm.stub.IAvmResourceFactory; @@ -14,6 +15,7 @@ import org.aion.base.TxUtil; import org.aion.crypto.ECKey; import org.aion.zero.impl.blockchain.AionHub; +import org.aion.zero.impl.blockchain.AionImpl.PendingTxCallback; import org.aion.zero.impl.blockchain.StandaloneBlockchain; import org.aion.zero.impl.blockchain.StandaloneBlockchain.Bundle; import org.aion.zero.impl.types.TxResponse; @@ -67,7 +69,8 @@ public void reinitialize() throws SecurityException, IllegalArgumentException { CfgAion.inst().setGenesis(blockchain.getGenesis()); - pendingState = AionHub.createForTesting(CfgAion.inst(), blockchain, blockchain.getRepository()).getPendingState(); + pendingState = AionHub.createForTesting(CfgAion.inst(), blockchain, blockchain.getRepository(), + new PendingTxCallback(new ArrayList<>())).getPendingState(); } @Test @@ -726,7 +729,8 @@ public void testAionPendingStateInit() { CfgAion.inst().getConsensus().setSeed(true); // NullPointerException should not happens - AionHub.createForTesting(CfgAion.inst(), blockchain, blockchain.getRepository()); + AionHub.createForTesting(CfgAion.inst(), blockchain, blockchain.getRepository(), + new PendingTxCallback(new ArrayList<>())); CfgAion.inst().getConsensus().setSeed(false); } diff --git a/modAionImpl/test/org/aion/zero/impl/sync/BlockPropagationTest.java b/modAionImpl/test/org/aion/zero/impl/sync/BlockPropagationTest.java index 8211399353..1d8d19a83d 100644 --- a/modAionImpl/test/org/aion/zero/impl/sync/BlockPropagationTest.java +++ b/modAionImpl/test/org/aion/zero/impl/sync/BlockPropagationTest.java @@ -24,6 +24,7 @@ import org.aion.p2p.IP2pMgr; import org.aion.p2p.IPeerMetric; import org.aion.p2p.Msg; +import org.aion.zero.impl.blockchain.AionImpl.PendingTxCallback; import org.aion.zero.impl.blockchain.StandaloneBlockchain; import org.aion.zero.impl.pendingState.AionPendingStateImpl; import org.aion.zero.impl.config.CfgAion; @@ -342,7 +343,7 @@ public void send(int _nodeId, String s, Msg _msg) { anotherBundle.bc.getBlockHeaderValidator(), false, (byte) 2, - AionPendingStateImpl.create(CfgAion.inst(), anotherBundle.bc, AionRepositoryImpl.inst(), true)); + AionPendingStateImpl.create(CfgAion.inst(), anotherBundle.bc, AionRepositoryImpl.inst(), new PendingTxCallback(new ArrayList<>()), true)); assertThat(handler.processIncomingBlock(senderMock.getIdHash(), "test", block)) .isEqualTo(BlockPropagationHandler.PropStatus.CONNECTED); @@ -409,7 +410,7 @@ public void send(int _nodeId, String s, Msg _msg) { anotherBundle.bc.getBlockHeaderValidator(), false, (byte) 2, - AionPendingStateImpl.create(CfgAion.inst(), anotherBundle.bc, AionRepositoryImpl.inst(), true)); + AionPendingStateImpl.create(CfgAion.inst(), anotherBundle.bc, AionRepositoryImpl.inst(), new PendingTxCallback(new ArrayList<>()), true)); // block is processed assertThat(handler.processIncomingBlock(senderMock.getIdHash(), "test", block)) @@ -472,7 +473,7 @@ public void send(int _nodeId, String s, Msg _msg) { anotherBundle.bc.getBlockHeaderValidator(), false, (byte) 2, - AionPendingStateImpl.create(CfgAion.inst(), anotherBundle.bc, AionRepositoryImpl.inst(), true)); + AionPendingStateImpl.create(CfgAion.inst(), anotherBundle.bc, AionRepositoryImpl.inst(), new PendingTxCallback(new ArrayList<>()), true)); // block is processed assertThat(handler.processIncomingBlock(senderMock.getIdHash(), "test", block)) @@ -530,7 +531,7 @@ public void send(int _nodeId, String s, Msg _msg) { anotherBundle.bc.getBlockHeaderValidator(), false, (byte) 2, - AionPendingStateImpl.create(CfgAion.inst(), anotherBundle.bc, AionRepositoryImpl.inst(), true)); + AionPendingStateImpl.create(CfgAion.inst(), anotherBundle.bc, AionRepositoryImpl.inst(), new PendingTxCallback(new ArrayList<>()), true)); // pretend that we propagate the new block handler.propagateNewBlock(block); // send counter incremented diff --git a/modApiServer/src/module-info.java b/modApiServer/src/module-info.java index 81eec315aa..4aba7de7b7 100644 --- a/modApiServer/src/module-info.java +++ b/modApiServer/src/module-info.java @@ -27,6 +27,7 @@ requires com.fasterxml.jackson.databind; requires rpc.lib4j; + exports org.aion.api.server; exports org.aion.api.server.pb; exports org.aion.api.server.zmq; exports org.aion.api.server.http; diff --git a/modApiServer/src/org/aion/api/server/ApiAion.java b/modApiServer/src/org/aion/api/server/ApiAion.java index bdd3693b4c..09b22cfc63 100644 --- a/modApiServer/src/org/aion/api/server/ApiAion.java +++ b/modApiServer/src/org/aion/api/server/ApiAion.java @@ -1,7 +1,5 @@ package org.aion.api.server; -import static org.aion.evtmgr.impl.evt.EventTx.STATE.GETSTATE; - import java.math.BigInteger; import java.util.Arrays; import java.util.Collections; @@ -28,7 +26,6 @@ import org.aion.evtmgr.IHandler; import org.aion.evtmgr.impl.es.EventExecuteService; import org.aion.evtmgr.impl.evt.EventBlock; -import org.aion.evtmgr.impl.evt.EventTx; import org.aion.mcf.blockchain.Block; import org.aion.zero.impl.blockchain.UnityChain; import org.aion.zero.impl.types.TxResponse; @@ -94,8 +91,6 @@ public ApiAion(final IAionChain ac, final AccountManager am) { fltrIndex = new AtomicLong(0); pendingState = ac.getAionHub().getPendingState(); IEventMgr evtMgr = ac.getAionHub().getEventMgr(); - evtMgr.registerEvent( - Collections.singletonList(new EventTx(EventTx.CALLBACK.PENDINGTXUPDATE0))); evtMgr.registerEvent( Collections.singletonList(new EventBlock(EventBlock.CALLBACK.ONBLOCK0))); } @@ -109,20 +104,8 @@ public void run() { try { IEvent e = ees.take(); if (e.getEventType() == IHandler.TYPE.BLOCK0.getValue() - && e.getCallbackType() == EventBlock.CALLBACK.ONBLOCK0.getValue()) { + && e.getCallbackType() == EventBlock.CALLBACK.ONBLOCK0.getValue()) { onBlock((AionBlockSummary) e.getFuncArgs().get(0)); - } else if (e.getEventType() == IHandler.TYPE.TX0.getValue()) { - if (e.getCallbackType() == EventTx.CALLBACK.PENDINGTXUPDATE0.getValue()) { - pendingTxUpdate( - (AionTxReceipt) e.getFuncArgs().get(0), - GETSTATE((int) e.getFuncArgs().get(1))); - } else if (e.getCallbackType() - == EventTx.CALLBACK.PENDINGTXRECEIVED0.getValue()) { - for (AionTransaction tx : - (List) e.getFuncArgs().get(0)) { - pendingTxReceived(tx); - } - } } else if (e.getEventType() == IHandler.TYPE.POISONPILL.getValue()) { go = false; } @@ -137,7 +120,7 @@ public void run() { protected abstract void pendingTxReceived(AionTransaction _tx); - protected abstract void pendingTxUpdate(AionTxReceipt _txRcpt, EventTx.STATE _state); + protected abstract void pendingTxUpdate(AionTxReceipt _txRcpt, int _state); // General Level public byte getApiVersion() { @@ -775,11 +758,7 @@ protected void startES(String thName) { private Set setEvtfilter() { Set eventSN = new HashSet<>(); - int sn = IHandler.TYPE.TX0.getValue() << 8; - eventSN.add(sn + EventTx.CALLBACK.PENDINGTXRECEIVED0.getValue()); - eventSN.add(sn + EventTx.CALLBACK.PENDINGTXUPDATE0.getValue()); - - sn = IHandler.TYPE.BLOCK0.getValue() << 8; + int sn = IHandler.TYPE.BLOCK0.getValue() << 8; eventSN.add(sn + EventBlock.CALLBACK.ONBLOCK0.getValue()); return eventSN; diff --git a/modApiServer/src/org/aion/api/server/BlockchainCallbackForApiServer.java b/modApiServer/src/org/aion/api/server/BlockchainCallbackForApiServer.java new file mode 100644 index 0000000000..7546f33f90 --- /dev/null +++ b/modApiServer/src/org/aion/api/server/BlockchainCallbackForApiServer.java @@ -0,0 +1,32 @@ +package org.aion.api.server; + +import org.aion.base.AionTransaction; +import org.aion.zero.impl.blockchain.BlockchainCallbackInterface; +import org.aion.zero.impl.types.PendingTxDetails; + +public class BlockchainCallbackForApiServer implements BlockchainCallbackInterface { + private ApiAion apiService; + + public BlockchainCallbackForApiServer(ApiAion apiService) { + if (apiService == null) { + throw new NullPointerException(); + } + + this.apiService = apiService; + } + + @Override + public boolean isForApiServer() { + return true; + } + + @Override + public void pendingTxReceived(AionTransaction tx) { + apiService.pendingTxReceived(tx); + } + + @Override + public void pendingTxUpdated(PendingTxDetails txDetails) { + apiService.pendingTxUpdate(txDetails.receipt, txDetails.state); + } +} diff --git a/modApiServer/src/org/aion/api/server/pb/ApiAion0.java b/modApiServer/src/org/aion/api/server/pb/ApiAion0.java index de66835237..f86c0c7318 100644 --- a/modApiServer/src/org/aion/api/server/pb/ApiAion0.java +++ b/modApiServer/src/org/aion/api/server/pb/ApiAion0.java @@ -48,7 +48,6 @@ import org.aion.evtmgr.impl.callback.EventCallback; import org.aion.evtmgr.impl.es.EventExecuteService; import org.aion.evtmgr.impl.evt.EventBlock; -import org.aion.evtmgr.impl.evt.EventTx; import org.aion.mcf.blockchain.BlockHeader.BlockSealType; import org.aion.zero.impl.keystore.Keystore; import org.aion.mcf.blockchain.Block; @@ -65,6 +64,7 @@ import org.aion.zero.impl.Version; import org.aion.zero.impl.blockchain.IAionChain; import org.aion.zero.impl.config.CfgAion; +import org.aion.zero.impl.pendingState.PendingTransactionState; import org.aion.zero.impl.sync.NodeWrapper; import org.aion.zero.impl.types.AionBlock; import org.aion.zero.impl.types.AionBlockSummary; @@ -187,16 +187,15 @@ protected void pendingTxReceived(AionTransaction _tx) { }); } - protected void pendingTxUpdate(AionTxReceipt _txRcpt, EventTx.STATE _state) { + @Override + protected void pendingTxUpdate(AionTxReceipt _txRcpt, int _state) { ByteArrayWrapper txHashW = ByteArrayWrapper.wrap(_txRcpt.getTransaction().getTransactionHash()); - if (LOG.isTraceEnabled()) { - LOG.trace( - "ApiAion0.onPendingTransactionUpdate - txHash: [{}], state: [{}]", - txHashW.toString(), - _state.getValue()); - } + LOG.trace( + "ApiAion0.onPendingTransactionUpdate - txHash: [{}], state: [{}]", + txHashW.toString(), + _state); if (getMsgIdMapping().get(txHashW) != null) { if (pendingStatus.remainingCapacity() == 0) { @@ -204,24 +203,22 @@ protected void pendingTxUpdate(AionTxReceipt _txRcpt, EventTx.STATE _state) { LOG.warn("ApiAion0.onPendingTransactionUpdate - txPend ingStatus queue full, drop the first message."); } - if (LOG.isTraceEnabled()) { - LOG.trace( - "ApiAion0.onPendingTransactionUpdate - the pending Tx state : [{}]", - _state.getValue()); - } + LOG.trace( + "ApiAion0.onPendingTransactionUpdate - the pending Tx state : [{}]", + _state); pendingStatus.add( new TxPendingStatus( txHashW, getMsgIdMapping().get(txHashW).getValue(), getMsgIdMapping().get(txHashW).getKey(), - _state.getValue(), + _state, ByteArrayWrapper.wrap(_txRcpt.getTransactionOutput() == null ? EMPTY_BYTE_ARRAY : _txRcpt.getTransactionOutput()), _txRcpt.getError())); - if (_state.isPending()) { + if (_state == PendingTransactionState.NEW_PENDING.getValue() || _state == PendingTransactionState.PENDING.getValue()) { pendingReceipts.put(txHashW, _txRcpt); } else { pendingReceipts.remove(txHashW); @@ -230,14 +227,12 @@ protected void pendingTxUpdate(AionTxReceipt _txRcpt, EventTx.STATE _state) { } else { if (txWait.remainingCapacity() == 0) { txWait.poll(); - if (LOG.isTraceEnabled()) { - LOG.trace("ApiAion0.onPendingTransactionUpdate - txWait queue full, drop the first message."); - } + LOG.trace("ApiAion0.onPendingTransactionUpdate - txWait queue full, drop the first message."); } // waiting origin Api call status been callback try { - txWait.put(new TxWaitingMappingUpdate(txHashW, _state.getValue(), _txRcpt)); + txWait.put(new TxWaitingMappingUpdate(txHashW, _state, _txRcpt)); } catch (InterruptedException e) { LOG.error("ApiAion0.onPendingTransactionUpdate txWait.put exception", e); } @@ -273,7 +268,7 @@ public void run() { go = false; } } catch (Exception e) { - LOG.debug("EpBlkCache - excepted out", e); + LOG.debug("EpBlkCache - exception", e); } } } @@ -288,7 +283,6 @@ public ApiAion0(IAionChain ac, AccountManager am) { this.pendingStatus = new LinkedBlockingQueue(MAP_SIZE); this.txWait = new LinkedBlockingQueue(MAP_SIZE); this.msgIdMapping = Collections.synchronizedMap(new LRUMap<>(MAP_SIZE, 100)); - this.initNrgOracle(ac); isFilterEnabled = CfgAion.inst().getApi().getZmq().isFiltersEnabled(); diff --git a/modApiServer/src/org/aion/api/server/rpc/ApiWeb3Aion.java b/modApiServer/src/org/aion/api/server/rpc/ApiWeb3Aion.java index 900097d10e..77fce30a06 100644 --- a/modApiServer/src/org/aion/api/server/rpc/ApiWeb3Aion.java +++ b/modApiServer/src/org/aion/api/server/rpc/ApiWeb3Aion.java @@ -50,7 +50,6 @@ import org.aion.evtmgr.IEventMgr; import org.aion.evtmgr.IHandler; import org.aion.evtmgr.impl.callback.EventCallback; -import org.aion.evtmgr.impl.evt.EventTx; import org.aion.mcf.blockchain.BlockHeader.BlockSealType; import org.aion.zero.impl.keystore.Keystore; import org.aion.mcf.blockchain.Block; @@ -165,7 +164,8 @@ protected void pendingTxReceived(AionTransaction _tx) { } } - protected void pendingTxUpdate(AionTxReceipt _txRcpt, EventTx.STATE _state) { + @Override + protected void pendingTxUpdate(AionTxReceipt _txRcpt, int _state) { // commenting this out because of lack support for old web3 client that we are using // TODO: re-enable this when we upgrade our web3 client /* diff --git a/modApiServer/src/org/aion/api/server/rpc/RpcMethods.java b/modApiServer/src/org/aion/api/server/rpc/RpcMethods.java index c08c213c79..e154f06fe4 100644 --- a/modApiServer/src/org/aion/api/server/rpc/RpcMethods.java +++ b/modApiServer/src/org/aion/api/server/rpc/RpcMethods.java @@ -3,10 +3,12 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import org.aion.api.server.BlockchainCallbackForApiServer; import org.aion.api.server.account.AccountManager; import org.aion.log.AionLoggerFactory; import org.aion.log.LogEnum; import org.aion.zero.impl.blockchain.AionImpl; +import org.aion.zero.impl.blockchain.IAionChain; import org.slf4j.Logger; public class RpcMethods { @@ -32,7 +34,10 @@ public RpcMethods( final List disabledMethods, final AccountManager am) { - api = new ApiWeb3Aion(AionImpl.inst(), am); + + IAionChain ac = AionImpl.inst(); + api = new ApiWeb3Aion(ac, am); + ac.setApiServiceCallback(new BlockchainCallbackForApiServer(api)); // find a way to autogen options in config using this enum, without generating circular // module dependency (right now it's manual) diff --git a/modApiServer/test/org/aion/api/server/ApiAionTest.java b/modApiServer/test/org/aion/api/server/ApiAionTest.java index 2166d71c1b..846f0beb70 100644 --- a/modApiServer/test/org/aion/api/server/ApiAionTest.java +++ b/modApiServer/test/org/aion/api/server/ApiAionTest.java @@ -11,9 +11,7 @@ import java.io.File; import java.io.IOException; import java.math.BigInteger; -import java.util.ArrayList; import java.util.Collections; -import java.util.List; import java.util.Map; import org.aion.api.server.types.ArgTxCall; import org.aion.api.server.types.SyncInfo; @@ -23,7 +21,6 @@ import org.aion.crypto.ECKeyFac; import org.aion.evtmgr.impl.evt.EventBlock; import org.aion.evtmgr.impl.evt.EventDummy; -import org.aion.evtmgr.impl.evt.EventTx; import org.aion.api.server.account.AccountManager; import org.aion.zero.impl.keystore.Keystore; import org.aion.mcf.blockchain.Block; @@ -60,7 +57,7 @@ protected void pendingTxReceived(AionTransaction _tx) { } @Override - protected void pendingTxUpdate(AionTxReceipt _txRcpt, EventTx.STATE _state) { + protected void pendingTxUpdate(AionTxReceipt _txRcpt, int _state) { pendingUpdateFlag = true; } @@ -76,33 +73,6 @@ private ApiAionImpl(AionImpl impl) { } private void addEvents() { - EventTx pendingRcvd = new EventTx(EventTx.CALLBACK.PENDINGTXRECEIVED0); - AionTransaction tx = - AionTransaction.create( - key, - new byte[0], - new AionAddress(new byte[32]), - new byte[0], - new byte[0], - 0L, - 1L, - TransactionTypes.DEFAULT, null); - List l1 = new ArrayList<>(); - l1.add(tx); - l1.add(tx); - l1.add(tx); - pendingRcvd.setFuncArgs(Collections.singletonList(l1)); - - ees.add(pendingRcvd); - - EventTx pendingUpdate = new EventTx(EventTx.CALLBACK.PENDINGTXUPDATE0); - List l2 = new ArrayList<>(); - l2.add(new AionTxReceipt()); - l2.add(-1); - pendingUpdate.setFuncArgs(l2); - - ees.add(pendingUpdate); - EventBlock evBlock = new EventBlock(EventBlock.CALLBACK.ONBLOCK0); AionBlockSummary abs = new AionBlockSummary(null, null, null, null); evBlock.setFuncArgs(Collections.singletonList(abs)); @@ -171,7 +141,7 @@ public void testStartES() throws Exception { api.addEvents(); Thread.sleep(2000); api.shutDownES(); - assertTrue(api.allFlagsSet()); + assertTrue(api.onBlockFlag); } @Test diff --git a/modBoot/src/org/aion/Aion.java b/modBoot/src/org/aion/Aion.java index 2cc6fe258e..b3a7f4cba1 100644 --- a/modBoot/src/org/aion/Aion.java +++ b/modBoot/src/org/aion/Aion.java @@ -24,6 +24,7 @@ import java.util.TimeZone; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; +import org.aion.api.server.BlockchainCallbackForApiServer; import org.aion.api.server.account.AccountManager; import org.aion.api.server.http.RpcServer; import org.aion.api.server.http.RpcServerBuilder; @@ -246,7 +247,11 @@ public static void main(String args[]) { AccountManager am = null; if (cfg.getApi().getZmq().getActive()) { am = new AccountManager(AionLoggerFactory.getLogger(LogEnum.API.name())); - IHdlr handler = new HdlrZmq(new ApiAion0(ac, am)); + ApiAion0 javaAPI = new ApiAion0(ac, am); + ac.setApiServiceCallback(new BlockchainCallbackForApiServer(javaAPI)); + + IHdlr handler = new HdlrZmq(javaAPI); + processor = new ProtocolProcessor(handler, cfg.getApi().getZmq()); zmqThread = new Thread(processor, "zmq-api"); zmqThread.start(); From d214d8bfdbc2987218e5a6d3f910574aac48bf7f Mon Sep 17 00:00:00 2001 From: AionJayT Date: Wed, 4 Dec 2019 17:13:03 -0500 Subject: [PATCH 5/5] AKI-608 Removed eventMgr in the pendingState --- .../pendingState/AionPendingStateImpl.java | 67 ------------------- 1 file changed, 67 deletions(-) diff --git a/modAionImpl/src/org/aion/zero/impl/pendingState/AionPendingStateImpl.java b/modAionImpl/src/org/aion/zero/impl/pendingState/AionPendingStateImpl.java index 414e679616..80622876d6 100644 --- a/modAionImpl/src/org/aion/zero/impl/pendingState/AionPendingStateImpl.java +++ b/modAionImpl/src/org/aion/zero/impl/pendingState/AionPendingStateImpl.java @@ -27,13 +27,6 @@ import org.aion.base.AionTransaction; import org.aion.base.PooledTransaction; import org.aion.base.TxUtil; -import org.aion.evtmgr.IEvent; -import org.aion.evtmgr.IEventMgr; -import org.aion.evtmgr.IHandler; -import org.aion.evtmgr.impl.callback.EventCallback; -import org.aion.evtmgr.impl.es.EventExecuteService; -import org.aion.evtmgr.impl.evt.EventBlock; -import org.aion.evtmgr.impl.evt.EventTx; import org.aion.log.AionLoggerFactory; import org.aion.log.LogEnum; import org.aion.mcf.blockchain.Block; @@ -108,16 +101,12 @@ public TransactionSortedSet() { private final ITxPool txPool; - private IEventMgr evtMgr = null; - private RepositoryCache pendingState; private AtomicReference best; private PendingTxCache pendingTxCache; - private EventExecuteService ees; - private List txBuffer; /** @@ -218,31 +207,6 @@ private synchronized void processTxBuffer() { } } - private final class EpPS implements Runnable { - - boolean go = true; - - /** - * When an object implementing interface Runnable is used to create a thread, - * starting the thread causes the object's run method to be called in that - * separately executing thread. - * - *

The general contract of the method run is that it may take any action - * whatsoever. - * - * @see Thread#run() - */ - @Override - public void run() { - while (go) { - IEvent e = ees.take(); - if (e.getEventType() == IHandler.TYPE.POISONPILL.getValue()) { - go = false; - } - } - } - } - private synchronized void backupPendingTx() { if (!backupPendingPoolAdd.isEmpty()) { @@ -320,7 +284,6 @@ public void init(final AionBlockchainImpl blockchain, boolean test) { if (!this.isSeed) { this.transactionStore = blockchain.getTransactionStore(); - this.evtMgr = blockchain.getEventMgr(); this.poolBackUpEnable = CfgAion.inst().getTx().getPoolBackup(); this.replayTxBuffer = new ArrayList<>(); this.pendingTxCache = @@ -329,14 +292,6 @@ public void init(final AionBlockchainImpl blockchain, boolean test) { this.dumpPool = test || CfgAion.inst().getTx().getPoolDump(); - ees = new EventExecuteService(1000, "EpPS", Thread.MAX_PRIORITY, LOGGER_TX); - ees.setFilter(setEvtFilter()); - - IHandler blkHandler = this.evtMgr.getHandler(IHandler.TYPE.BLOCK0.getValue()); - if (blkHandler != null) { - blkHandler.eventCallback(new EventCallback(ees, LOGGER_TX)); - } - if (poolBackUpEnable) { this.backupPendingPoolAdd = new HashMap<>(); this.backupPendingCacheAdd = new HashMap<>(); @@ -351,25 +306,7 @@ public void init(final AionBlockchainImpl blockchain, boolean test) { this.txBuffer = Collections.synchronizedList(new ArrayList<>()); } - - if (!test) { - ees.start(new EpPS()); - } - } - } - - private Set setEvtFilter() { - Set eventSN = new HashSet<>(); - - int sn = IHandler.TYPE.BLOCK0.getValue() << 8; - eventSN.add(sn + EventBlock.CALLBACK.ONBEST0.getValue()); - - if (poolBackUpEnable) { - sn = IHandler.TYPE.TX0.getValue() << 8; - eventSN.add(sn + EventTx.CALLBACK.TXBACKUP0.getValue()); } - - return eventSN; } public synchronized RepositoryCache getRepository() { @@ -1105,10 +1042,6 @@ public void shutDown() { if (this.bufferEnable) { ex.shutdown(); } - - if (ees != null) { - ees.shutdown(); - } } public synchronized void DumpPool() {