From 2cc7da1140c59e30415509fe204c7ad97b507fd5 Mon Sep 17 00:00:00 2001 From: Ali Sharif Date: Thu, 1 Mar 2018 18:49:26 -0500 Subject: [PATCH] fixed inefficient cumulative nrg calculation for batch retrievals --- .../src/org/aion/api/server/pb/ApiAion0.java | 139 ++++++++++-------- 1 file changed, 74 insertions(+), 65 deletions(-) diff --git a/modApiServer/src/org/aion/api/server/pb/ApiAion0.java b/modApiServer/src/org/aion/api/server/pb/ApiAion0.java index c4013c281c..335e3d3694 100644 --- a/modApiServer/src/org/aion/api/server/pb/ApiAion0.java +++ b/modApiServer/src/org/aion/api/server/pb/ApiAion0.java @@ -49,6 +49,7 @@ import org.aion.zero.impl.blockchain.IAionChain; import org.aion.zero.impl.types.AionBlock; import org.aion.zero.impl.types.AionBlockSummary; +import org.aion.zero.impl.types.AionTxInfo; import org.aion.zero.types.AionTransaction; import org.aion.zero.types.AionTxReceipt; import org.json.JSONArray; @@ -1235,17 +1236,15 @@ public byte[] process(byte[] request, byte[] socketId) { try { req = Message.req_getBlockDetailsByNumber.parseFrom(data); - List num = req.getBlkNumbersList().parallelStream() + List blkNum = req.getBlkNumbersList().parallelStream() .collect(Collectors.toSet()).parallelStream() .sorted().collect(Collectors.toList()); - if (num.size() > 1000) { - num = num.subList(0, 1000); + if (blkNum.size() > 1000) { + blkNum = blkNum.subList(0, 1000); } - List> blks = - num.parallelStream().filter(n -> this.getBlock(n)!= null).map(n -> Map.entry(this.getBlock(n), this.ac.getBlockchain().getTotalDifficultyByHash( - Hash256.wrap(this.getBlock(n).getHash())))).collect(Collectors.toList()); + List> blks = getBlkAndDifficultyForBlkNumList(blkNum); if (blks == null) { return ApiUtil.toReturnHeader(getApiVersion(), Message.Retcode.r_fail_function_arguments_VALUE); @@ -1286,16 +1285,10 @@ public byte[] process(byte[] request, byte[] socketId) { Long endBlock = this.getBestBlock().getNumber(); Long startBlock = (endBlock - count + 1) >= 0 ? (endBlock - count + 1) : 0; - List numbers = LongStream.rangeClosed(startBlock, endBlock) + List blkNum = LongStream.rangeClosed(startBlock, endBlock) .boxed().collect(Collectors.toList()); - List> blks = - numbers.parallelStream() - .filter(n -> this.getBlock(n)!= null) - .map(n -> Map.entry(this.getBlock(n), - this.ac.getBlockchain().getTotalDifficultyByHash( - Hash256.wrap(this.getBlock(n).getHash())))) - .collect(Collectors.toList()); + List> blks = getBlkAndDifficultyForBlkNumList(blkNum); if (blks == null) { return ApiUtil.toReturnHeader(getApiVersion(), Message.Retcode.r_fail_function_arguments_VALUE); @@ -1334,16 +1327,10 @@ public byte[] process(byte[] request, byte[] socketId) { Long endBlock = this.getBestBlock().getNumber(); Long startBlock = (endBlock - count + 1) >= 0 ? (endBlock - count + 1) : 0; - List numbers = LongStream.rangeClosed(startBlock, endBlock) + List blkNum = LongStream.rangeClosed(startBlock, endBlock) .boxed().collect(Collectors.toList()); - List> blks = - numbers.parallelStream() - .filter(n -> this.getBlock(n)!= null) - .map(n -> Map.entry(this.getBlock(n), - this.ac.getBlockchain().getTotalDifficultyByHash( - Hash256.wrap(this.getBlock(n).getHash())))) - .collect(Collectors.toList()); + List> blks = getBlkAndDifficultyForBlkNumList(blkNum); if (blks == null) { return ApiUtil.toReturnHeader(getApiVersion(), Message.Retcode.r_fail_function_arguments_VALUE); @@ -1476,23 +1463,25 @@ private List getRsp_getBlocks(List bs = blks.parallelStream() .filter(Objects::nonNull) .map(blk -> { + AionBlock b = blk.getKey(); + return Message.t_Block.newBuilder() - .setBlockNumber(blk.getKey().getNumber()) - .setDifficulty(ByteString.copyFrom(blk.getKey().getDifficulty())) - .setExtraData(ByteString.copyFrom(blk.getKey().getExtraData())) - .setHash(ByteString.copyFrom(blk.getKey().getHash())) - .setLogsBloom(ByteString.copyFrom(blk.getKey().getLogBloom())) - .setMinerAddress(ByteString.copyFrom(blk.getKey().getCoinbase().toBytes())) - .setNonce(ByteString.copyFrom(blk.getKey().getNonce())) - .setNrgConsumed(blk.getKey().getNrgConsumed()) - .setNrgLimit(blk.getKey().getNrgLimit()) - .setParentHash(ByteString.copyFrom(blk.getKey().getParentHash())) - .setTimestamp(blk.getKey().getTimestamp()) - .setTxTrieRoot(ByteString.copyFrom(blk.getKey().getTxTrieRoot())) - .setReceiptTrieRoot(ByteString.copyFrom(blk.getKey().getReceiptsRoot())) - .setStateRoot(ByteString.copyFrom(blk.getKey().getStateRoot())) - .setSize(blk.getKey().getEncoded().length) - .setSolution(ByteString.copyFrom(blk.getKey().getHeader().getSolution())) + .setBlockNumber(b.getNumber()) + .setDifficulty(ByteString.copyFrom(b.getDifficulty())) + .setExtraData(ByteString.copyFrom(b.getExtraData())) + .setHash(ByteString.copyFrom(b.getHash())) + .setLogsBloom(ByteString.copyFrom(b.getLogBloom())) + .setMinerAddress(ByteString.copyFrom(b.getCoinbase().toBytes())) + .setNonce(ByteString.copyFrom(b.getNonce())) + .setNrgConsumed(b.getNrgConsumed()) + .setNrgLimit(b.getNrgLimit()) + .setParentHash(ByteString.copyFrom(b.getParentHash())) + .setTimestamp(b.getTimestamp()) + .setTxTrieRoot(ByteString.copyFrom(b.getTxTrieRoot())) + .setReceiptTrieRoot(ByteString.copyFrom(b.getReceiptsRoot())) + .setStateRoot(ByteString.copyFrom(b.getStateRoot())) + .setSize(b.getEncoded().length) + .setSolution(ByteString.copyFrom(b.getHeader().getSolution())) .setTotalDifficulty(ByteString.copyFrom(blk.getValue().toByteArray())) .build(); }).collect(Collectors.toList()); @@ -1502,36 +1491,45 @@ private List getRsp_getBlocks(List getRsp_getBlockDetails(List> blks) { List bds = blks.parallelStream().filter(Objects::nonNull).map(blk -> { - + AionBlock b = blk.getKey(); Message.t_BlockDetail.Builder builder = Message.t_BlockDetail.newBuilder() - .setBlockNumber(blk.getKey().getNumber()) - .setDifficulty(ByteString.copyFrom(blk.getKey().getDifficulty())) - .setExtraData(ByteString.copyFrom(blk.getKey().getExtraData())) - .setHash(ByteString.copyFrom(blk.getKey().getHash())) - .setLogsBloom(ByteString.copyFrom(blk.getKey().getLogBloom())) - .setMinerAddress(ByteString.copyFrom(blk.getKey().getCoinbase().toBytes())) - .setNonce(ByteString.copyFrom(blk.getKey().getNonce())) - .setNrgConsumed(blk.getKey().getNrgConsumed()) - .setNrgLimit(blk.getKey().getNrgLimit()) - .setParentHash(ByteString.copyFrom(blk.getKey().getParentHash())) - .setTimestamp(blk.getKey().getTimestamp()) - .setTxTrieRoot(ByteString.copyFrom(blk.getKey().getTxTrieRoot())) - .setReceiptTrieRoot(ByteString.copyFrom(blk.getKey().getReceiptsRoot())) - .setStateRoot(ByteString.copyFrom(blk.getKey().getStateRoot())) - .setSize(blk.getKey().getEncoded().length) - .setSolution(ByteString.copyFrom(blk.getKey().getHeader().getSolution())) + .setBlockNumber(b.getNumber()) + .setDifficulty(ByteString.copyFrom(b.getDifficulty())) + .setExtraData(ByteString.copyFrom(b.getExtraData())) + .setHash(ByteString.copyFrom(b.getHash())) + .setLogsBloom(ByteString.copyFrom(b.getLogBloom())) + .setMinerAddress(ByteString.copyFrom(b.getCoinbase().toBytes())) + .setNonce(ByteString.copyFrom(b.getNonce())) + .setNrgConsumed(b.getNrgConsumed()) + .setNrgLimit(b.getNrgLimit()) + .setParentHash(ByteString.copyFrom(b.getParentHash())) + .setTimestamp(b.getTimestamp()) + .setTxTrieRoot(ByteString.copyFrom(b.getTxTrieRoot())) + .setReceiptTrieRoot(ByteString.copyFrom(b.getReceiptsRoot())) + .setStateRoot(ByteString.copyFrom(b.getStateRoot())) + .setSize(b.getEncoded().length) + .setSolution(ByteString.copyFrom(b.getHeader().getSolution())) .setTotalDifficulty(ByteString.copyFrom(blk.getValue().toByteArray())); - List txs = blk.getKey().getTransactionsList(); + List txs = b.getTransactionsList(); - List tds = txs.parallelStream().filter(Objects::nonNull).map(tx -> { - TxRecpt rt = this.getTransactionReceipt(tx.getHash()); + List tds = new ArrayList<>(); + long cumulativeNrg = 0L; - List tles = Arrays.asList(rt.logs).parallelStream().map(log -> Message.t_LgEle.newBuilder() - .setData(ByteString.copyFrom(ByteUtil.hexStringToBytes(log.data))) - .setAddress(ByteString.copyFrom(Address.wrap(log.address).toBytes())) - .addAllTopics(Arrays.asList(log.topics)) - .build()).filter(Objects::nonNull).collect(Collectors.toList()); + // Can't use parallel streams here since we need to compute cumulativeNrg, which is done iteratively + for (AionTransaction tx : txs) { + AionTxInfo ti = this.ac.getAionHub().getBlockchain().getTransactionInfo(tx.getHash()); + cumulativeNrg += ti.getReceipt().getEnergyUsed(); + TxRecpt rt = new TxRecpt(b, ti, cumulativeNrg); + + List tles = Arrays.asList(rt.logs).parallelStream() + .map(log -> { + return Message.t_LgEle.newBuilder() + .setData(ByteString.copyFrom(ByteUtil.hexStringToBytes(log.data))) + .setAddress(ByteString.copyFrom(Address.wrap(log.address).toBytes())) + .addAllTopics(Arrays.asList(log.topics)) + .build(); + }).filter(Objects::nonNull).collect(Collectors.toList()); Message.t_TxDetail.Builder tdBuilder = Message.t_TxDetail.newBuilder() .setData(ByteString.copyFrom(tx.getData())) @@ -1549,8 +1547,8 @@ private List getRsp_getBlockDetails(List getRsp_getBlockDetails(List> getBlkAndDifficultyForBlkNumList(List blkNum) { + return blkNum.parallelStream() + .map(n -> { + AionBlock b = this.getBlock(n); + return Map.entry(b, this.ac.getAionHub().getBlockStore().getTotalDifficultyForHash(b.getHash())); + }) + .collect(Collectors.toList()); + } + @Override public byte getApiVersion() { return JAVAAPI_VAR;