Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

purge functionality based on blockcount and time #362

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 109 additions & 0 deletions app/persistence/fabric/CRUDService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -499,6 +499,115 @@ export class CRUDService {
return row;
}

/**
*
* Delete the old data based on time purge.
*
* @param {*} channel_genesis_hash
* @param {*} daysToPurge
* @returns
* @memberof CRUDService
*/
async deleteOldData(channel_genesis_hash: string, network_id: string, daysToPurge: number) {
//Fetching the blockto and block from value as per the days required
let blockfrom = await this.sql.updateBySql(
`SELECT CASE WHEN MIN(blocknum) is null THEN 0 ELSE MIN(blocknum) end as blockfrom FROM blocks WHERE createdt < CURRENT_DATE - INTERVAL '1 day' *$1 and channel_genesis_hash=$2 and network_name = $3`,
[daysToPurge, channel_genesis_hash, network_id]);
let blockto = await this.sql.updateBySql(
`SELECT CASE WHEN MAX(blocknum) is null THEN 0 ELSE MAX(blocknum) end as blockto FROM blocks WHERE createdt < CURRENT_DATE - INTERVAL '1 day' *$1 and channel_genesis_hash=$2 and network_name = $3`,
[daysToPurge, channel_genesis_hash, network_id]);
if (blockto[0].blockto != 0) {
//Deleting the txn and blocks table
await this.sql.updateBySql(
`DELETE FROM transactions WHERE blockid>= $1 and blockid<=$2 and channel_genesis_hash=$3 and network_name = $4`,
[blockfrom[0].blockfrom, blockto[0].blockto, channel_genesis_hash, network_id]
);
await this.sql.updateBySql(
`DELETE FROM blocks WHERE blocknum>= $1 and blocknum<=$2 and channel_genesis_hash=$3 and network_name = $4`,
[blockfrom[0].blockfrom, blockto[0].blockto, channel_genesis_hash, network_id]
);
await this.explorerTableUpdation(
blockfrom[0].blockfrom, blockto[0].blockto, "TIME", channel_genesis_hash, network_id);
return true;
}
return false;
}

/**
*
* Fetch block to value from explorer_audit table.
*
* @param {*} channel_genesis_hash
* @param {*} mode
* @returns
* @memberof CRUDService
*/
async fetchLastBlockToFromExplorerAudit(mode: string, channel_genesis_hash: string, network_id: string) {
let blocktoValue = await this.sql.updateBySql(
`SELECT blockto FROM explorer_audit where mode =$1 and channel_genesis_hash=$2 and network_name = $3`, [mode, channel_genesis_hash, network_id]);
return blocktoValue[0].blockto;

}

/**
*
* Delete the old data based on block count purge.
*
* @param {*} channel_genesis_hash
* @param {*} blockCount
* @returns
* @memberof CRUDService
*/
async deleteBlock(network_name: string, channel_genesis_hash: string, blockCount: number) {
const count: any = await this.sql.getRowsBySQlCase(
' select count(*) as count from blocks where channel_genesis_hash=$1 and network_name = $2 ',
[channel_genesis_hash, network_name]
);
const rowCount: number = count.count;
let rowsToDelete: number = 0;
if (rowCount > blockCount) {
rowsToDelete = rowCount - blockCount;
}
if (rowsToDelete > 0) {
let blockfrom = await this.sql.updateBySql(
`SELECT min(blocknum) as blockfrom FROM blocks WHERE channel_genesis_hash=$1 and network_name = $2`, [channel_genesis_hash, network_name]);
let blockto = await this.sql.updateBySql(
`SELECT max(blocknum) as blockto FROM blocks WHERE channel_genesis_hash=$1 and network_name = $2`, [channel_genesis_hash, network_name]);
await this.sql.updateBySql(
`DELETE FROM transactions WHERE blockid>= $1 and blockid<=$2 and channel_genesis_hash=$3 and network_name = $4`,
[blockfrom[0].blockfrom, blockto[0].blockto - blockCount, channel_genesis_hash, network_name]
);
await this.sql.updateBySql(
`DELETE FROM blocks WHERE blocknum>= $1 and blocknum<=$2 and channel_genesis_hash=$3 and network_name = $4`,
[blockfrom[0].blockfrom, blockto[0].blockto - blockCount, channel_genesis_hash, network_name]
);
await this.explorerTableUpdation(
blockfrom[0].blockfrom, blockto[0].blockto - blockCount, "BLOCKCOUNT", channel_genesis_hash, network_name);

return true;
}
return false;
}

/**
*
* Update the explorer_audit table based on the details of purge.
*
* @param {*} channel_genesis_hash
* @param {*} blockFrom
* @param {*} blockTo
* @param {*} purgeMode
* @returns
* @memberof CRUDService
*/
async explorerTableUpdation(blockFrom: number, blockTo: number, purgeMode: string, channel_genesis_hash: string, network_name: string) {
const updateReponse = await this.sql.updateBySql(
`insert into explorer_audit (lastupdated,status,blockfrom,blockto,mode,channel_genesis_hash,network_name) values ($1,$2,$3,$4,$5,$6,$7) on conflict(mode,channel_genesis_hash,network_name)
do update set lastupdated = $1, status = $2, blockfrom = $3, blockto = $4, mode = $5, channel_genesis_hash =$6,network_name= $7 `,
[new Date(), "SUCCESS", blockFrom, blockTo, purgeMode, channel_genesis_hash, network_name]);
logger.info(" Data added to explorer_audit table", updateReponse);
}

}

/**
Expand Down
20 changes: 20 additions & 0 deletions app/persistence/fabric/postgreSQL/db/explorerpg.sql
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,26 @@ CREATE TABLE users
);
ALTER table users owner to :user;

-- ---------------------------
-- Table structure for `explorer audit`
-- ----------------------------
DROP TABLE IF EXISTS explorer_audit;
CREATE TYPE purgeMode AS ENUM('TIME','BLOCKCOUNT');
CREATE TABLE explorer_audit
(
id SERIAL PRIMARY KEY,
lastupdated Timestamp DEFAULT NULL,
status varchar(255) DEFAULT NULL,
blockfrom integer DEFAULT NULL,
blockto integer DEFAULT NULL,
mode purgeMode DEFAULT NULL,
channel_genesis_hash character varying(256) DEFAULT NULL,
network_name varchar(255)
);
CREATE UNIQUE INDEX exp_audit_id
ON explorer_audit (network_name,mode,channel_genesis_hash);
ALTER table explorer_audit owner to :user;

DROP TABLE IF EXISTS write_lock;
CREATE TABLE write_lock
(
Expand Down
5 changes: 4 additions & 1 deletion app/platform/fabric/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@
"network-configs": {
"test-network": {
"name": "Test Network",
"profile": "./connection-profile/test-network.json"
"profile": "./connection-profile/test-network.json",
"blockCount": 5,
"purgeMode":"NONE",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume we will now have the following purgemode NONE, TIME,BLOCKCOUNT, are we also handling the NONE case as well (which means no purge is needed).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, we are handling the NONE case(* No purge) too.

"daysToPurge": 7
}
},
"license": "Apache-2.0"
Expand Down
51 changes: 50 additions & 1 deletion app/platform/fabric/sync/SyncService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,16 @@ import { explorerError } from '../../../common/ExplorerMessage';
import * as FabricConst from '../../../platform/fabric/utils/FabricConst';
import * as FabricUtils from '../../../platform/fabric/utils/FabricUtils';

import * as path from 'path';
import * as fs from 'fs';

const logger = helper.getLogger('SyncServices');

const fabric_const = FabricConst.fabric.const;
const purge_mode = FabricConst.PurgeModes;
const config_path = path.resolve(__dirname, '../config.json');
const all_config = JSON.parse(fs.readFileSync(config_path, 'utf8'));
const network_configs = all_config[fabric_const.NETWORK_CONFIGS];

// Transaction validation code
const _validation_codes = {};
Expand Down Expand Up @@ -361,6 +368,10 @@ export class SyncServices {

// Get channel information from ledger
const channelInfo = await client.fabricGateway.queryChainInfo(channel_name);
//Get purge configuration from config.json
const purgeMode = network_configs[network_id].purgeMode.toUpperCase();
const daysToPurge = network_configs[network_id].daysToPurge;
const blockCount = network_configs[network_id].blockCount;

if (!channelInfo) {
logger.info(`syncBlocks: Failed to retrieve channelInfo >> ${channel_name}`);
Expand Down Expand Up @@ -390,7 +401,20 @@ export class SyncServices {
result.missing_id
);
if (block) {
await this.processBlockEvent(client, block, noDiscovery);
if (this.validatePurgemode(purgeMode)) {
const lastBlockTo = await this.persistence.getCrudService()
.fetchLastBlockToFromExplorerAudit(purgeMode, channel_genesis_hash, network_id);
if (result.missing_id > lastBlockTo) {
await this.processBlockEvent(client, block, noDiscovery);
}
else {
logger.info(`Not saving this block to DB since we are keeping only recent data # ${result.missing_id}`);
}
}
else {
await this.processBlockEvent(client, block, noDiscovery);
logger.debug("Purge unsuccessful since purge inputs are not matching");
}
}
} catch {
logger.error(`Failed to process Block # ${result.missing_id}`);
Expand All @@ -399,6 +423,24 @@ export class SyncServices {
} else {
logger.debug('Missing blocks not found for %s', channel_name);
}
if (this.validatePurgemode(purgeMode)) {
let response: boolean;
switch (purgeMode) {
case purge_mode[0]: {
response = await this.persistence.
getCrudService().
deleteOldData(channel_genesis_hash, network_id, daysToPurge);
break;
}
case purge_mode[1]: {
Comment on lines +429 to +435
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you check if we can use the enums here instead of purge_mode[0] and purge_mode[1]

response = await this.persistence
.getCrudService()
.deleteBlock(network_id, channel_genesis_hash, blockCount);
break;
}
}
logger.info(`Result of Purging based on ${purgeMode} :`, response);
}
const index = this.synchInProcess.indexOf(synch_key);
this.synchInProcess.splice(index, 1);
logger.info(`syncBlocks: Finish >> ${synch_key}`);
Expand Down Expand Up @@ -736,6 +778,13 @@ export class SyncServices {
}
}

validatePurgemode(purgeMode: string) {
if (Object.values(purge_mode).includes(purgeMode)) {
return true;
}
else return false;
}

/**
*
*
Expand Down
5 changes: 5 additions & 0 deletions app/platform/fabric/utils/FabricConst.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,8 @@ export const fabric = {
NOTITY_TYPE_CLIENTERROR: '6'
}
};

export enum PurgeModes {
'TIME',
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we have the NONE as purge mode, then we need to handle that case, can you check this case once.

'BLOCKCOUNT'
Comment on lines +22 to +24
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about having the Enums like this

PurgeModes {
	TIME = "TIME"
}

};