Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Services 1085 save inactive items and db validation 2 #960

Open
wants to merge 18 commits into
base: development
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 20 additions & 6 deletions src/common/persistence/persistence.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,10 @@ export class PersistenceService {
return await this.execute(this.saveTags.name, this.tagsRepository.saveTags(tags));
}

async saveTagsOrIgnore(tags: TagEntity[]): Promise<void> {
await this.execute(this.saveTagsOrIgnore.name, this.tagsRepository.saveTagsOrIgnore(tags));
}

async getCollectionStats(
identifier: string,
marketplaceKey: string = undefined,
Expand Down Expand Up @@ -456,6 +460,13 @@ export class PersistenceService {
return await this.execute(this.getLastOrdersByAuctionIds.name, this.ordersRepository.getLastOrdersByAuctionIds(auctionIds));
}

async getOrdersByAuctionIdsGroupByAuctionId(auctionIds: number[]): Promise<any[]> {
return await this.execute(
this.getOrdersByAuctionIdsGroupByAuctionId.name,
this.ordersRepository.getOrdersByAuctionIdsGroupByAuctionId(auctionIds),
);
}

async getOrdersByAuctionIds(auctionIds: number[]): Promise<any[]> {
return await this.execute(this.getOrdersByAuctionIds.name, this.ordersRepository.getOrdersByAuctionIds(auctionIds));
}
Expand All @@ -468,8 +479,8 @@ export class PersistenceService {
return await this.execute(this.saveOrder.name, this.ordersRepository.saveOrder(order));
}

async saveBulkOrders(orders: OrderEntity[]) {
return await this.execute(this.saveBulkOrders.name, this.ordersRepository.saveBulkOrders(orders));
async saveBulkOrdersOrUpdateAndFillId(orders: OrderEntity[]) {
return await this.execute(this.saveBulkOrdersOrUpdateAndFillId.name, this.ordersRepository.saveBulkOrdersOrUpdateAndFillId(orders));
}

async updateOrderWithStatus(order: OrderEntity, status: OrderStatusEnum) {
Expand Down Expand Up @@ -640,8 +651,11 @@ export class PersistenceService {
return await this.execute(this.insertAuction.name, this.auctionsRepository.insertAuction(auction));
}

async saveBulkAuctions(auctions: AuctionEntity[]): Promise<void> {
return await this.execute(this.saveBulkAuctions.name, this.auctionsRepository.saveBulkAuctions(auctions));
async saveBulkAuctionsOrUpdateAndFillId(auctions: AuctionEntity[]): Promise<void> {
return await this.execute(
this.saveBulkAuctionsOrUpdateAndFillId.name,
this.auctionsRepository.saveBulkAuctionsOrUpdateAndFillId(auctions),
);
}

async rollbackAuctionAndOrdersByHash(blockHash: string): Promise<any> {
Expand Down Expand Up @@ -701,8 +715,8 @@ export class PersistenceService {
return await this.execute(this.saveOffer.name, this.offersRepository.saveOffer(offer));
}

async saveBulkOffers(offers: OfferEntity[]): Promise<void> {
return await this.execute(this.saveBulkOffers.name, this.offersRepository.saveBulkOffers(offers));
async saveBulkOffersOrUpdateAndFillId(offers: OfferEntity[]): Promise<void> {
return await this.execute(this.saveBulkOffersOrUpdateAndFillId.name, this.offersRepository.saveBulkOffersOrUpdateAndFillId(offers));
}

async getOffersThatReachedDeadline(): Promise<OfferEntity[]> {
Expand Down
5 changes: 3 additions & 2 deletions src/config/default.json
Original file line number Diff line number Diff line change
Expand Up @@ -69,14 +69,15 @@
"updateAllNftTraitsBatchSize": 10000,
"getNftsForScamInfoBatchSize": 100,
"elasticMaxBatch": 10000,
"dbBatch": 1000,
"dbBatch": 10000,
"complexityLevel": 200,
"getLogsFromElasticBatchSize": 1000,
"dbMaxTimestamp": 2147483647,
"defaultPageOffset": 0,
"defaultPageSize": 10,
"dbMaxDenominatedValue": 99999999999999999,
"dbMaxTagLength": 20
"dbMaxTagLength": 20,
"marketplaceReindexDataMaxInMemoryItems": 25000
},
"elasticDictionary": {
"scamInfo": {
Expand Down
136 changes: 133 additions & 3 deletions src/db/auctions/auctions.repository.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ import {
getLowestAuctionForIdentifiersAndMarketplace,
getOnSaleAssetsCountForCollection,
} from './sql.queries';
import { CpuProfiler } from '@multiversx/sdk-nestjs-monitoring';
import { OrderEntity } from '../orders';
import { TagEntity } from './tags.entity';

@Injectable()
export class AuctionsRepository {
Expand Down Expand Up @@ -421,9 +424,136 @@ export class AuctionsRepository {
return await this.auctionsRepository.save(auction);
}

async saveBulkAuctions(auctions: AuctionEntity[]): Promise<void> {
await this.auctionsRepository.save(auctions, {
chunk: constants.dbBatch,
async getBulkAuctionsByMarketplaceAndAuctionIds(marketplaceKey: string, marketplaceAuctionIds: number[]): Promise<AuctionEntity[]> {
return await this.auctionsRepository
.createQueryBuilder('a')
.select('id, marketplaceAuctionId')
.where(`a.marketplaceKey = '${marketplaceKey}'`)
.andWhere(`a.marketplaceAuctionId IN(:...marketplaceAuctionIds)`, {
marketplaceAuctionIds: marketplaceAuctionIds,
})
.orderBy('a.marketplaceAuctionId', 'ASC')
.execute();
}

// async saveBulkAuctions(auctions: AuctionEntity[]): Promise<void> {
// await this.auctionsRepository.save(auctions, {
// chunk: 1000,
// });
// }

// async saveBulkAuctionsOrUpdateAndFillId(auctions: AuctionEntity[]): Promise<void> {
// const batchSize = 1000;
// if (auctions.length === 0) {
// return;
// }

// const connection = this.auctionsRepository.manager.connection;

// await connection.transaction(async (transactionalEntityManager) => {
// for (let i = 0; i < auctions.length; i += batchSize) {
// const batch = auctions.slice(i, i + batchSize);
// console.log('Processing batch number', i);

// const cpu = new CpuProfiler();
// // for (const item of batch) {
// const currentQueryBuilder = transactionalEntityManager
// .createQueryBuilder()
// .from(AuctionEntity, 'auction')
// .insert()
// .values(batch)
// .orUpdate({
// conflict_target: ['marketplaceKey', 'marketplaceAuctionId'],
// overwrite: [
// 'collection',
// 'nrAuctionedTokens',
// 'identifier',
// 'nonce',
// 'status',
// 'type',
// 'paymentToken',
// 'paymentNonce',
// 'ownerAddress',
// 'minBidDiff',
// 'minBid',
// 'minBidDenominated',
// 'maxBid',
// 'maxBidDenominated',
// 'startDate',
// 'tags',
// 'blockHash',
// ],
// });
// console.log(currentQueryBuilder.getQueryAndParameters());
// // Include related orders

// await currentQueryBuilder.execute();
// // }

// cpu.stop(`batch ${i}`);
// }

// console.log('Bulk insert or update completed successfully.');
// });
// }

async saveBulkAuctionsOrUpdateAndFillId(auctions: AuctionEntity[]): Promise<void> {
const batchSize = 1000;
if (auctions.length === 0) {
return;
}

const connection = this.auctionsRepository.manager.connection;

await connection.transaction(async (transactionalEntityManager) => {
for (let i = 0; i < auctions.length; i += batchSize) {
const batch = auctions.slice(i, i + batchSize);
console.log('Processing batch number', i);

const cpu = new CpuProfiler();
const response = await transactionalEntityManager
.createQueryBuilder()
.from(AuctionEntity, 'auction')
.insert()
.into(AuctionEntity)
.values(batch)
.execute();
console.log({ auctions: response?.identifiers?.length });

let orders = [];
let tags = [];
for (const item of batch) {
if (item.orders) {
item.orders.forEach((i) => {
i.auction = item;
});
orders.push(...item.orders);
}
if (item.tagEntities && item.tagEntities.length > 0) {
console.log(item.tagEntities);
item.tagEntities.forEach((i) => {
i.auction = item;
});

tags.push(...item.tagEntities);
}
}
if (orders.length) {
const res = await transactionalEntityManager.createQueryBuilder().insert().into(OrderEntity).values(orders).execute();

console.log({ orders: res?.identifiers?.length });
}

if (tags.length) {
const res = await transactionalEntityManager.createQueryBuilder().insert().into(TagEntity).values(tags).execute();

console.log({ orders: res?.identifiers?.length });
}

cpu.stop(`batch ${i}`);
}

console.log('Bulk insert or update completed successfully.');
});
}

Expand Down
34 changes: 34 additions & 0 deletions src/db/auctions/tags.repository.ts
Original file line number Diff line number Diff line change
Expand Up @@ -81,4 +81,38 @@ export class TagsRepository {
throw err;
}
}

async getBulkTagsByAuctionIds(auctionIds: number[]): Promise<TagEntity[]> {
return await this.tagsRepository
.createQueryBuilder('t')
.select('t.id, t.auctionId, t.tag')
.where(`t.auctionId IN(:...auctionIds)`, {
auctionIds: auctionIds,
})
.execute();
}

async saveTagsOrIgnore(tags: TagEntity[]): Promise<void> {
if (tags.length === 0) {
return;
}
const dbTags = await this.getBulkTagsByAuctionIds(
tags.map((t) => t.auctionId),
);
for (const dbTag of dbTags) {
const correspondingTagIndex = tags.findIndex(
(t) => t.auctionId === dbTag.auctionId && t.tag === dbTag.tag,
);
if (correspondingTagIndex !== -1) {
tags[correspondingTagIndex].id = dbTag.id;
}
}
await this.tagsRepository
.createQueryBuilder()
.insert()
.into('tags')
.values(tags.filter((t) => t.id === undefined))
.orIgnore(true)
.execute();
}
}
1 change: 1 addition & 0 deletions src/db/marketplaces/marketplace-events.entity.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { Column, Entity, Index, Unique } from 'typeorm';

@Entity('marketplace_events')
@Unique('MarketplaceEventsEntity_UQ_EVENT', ['txHash', 'eventOrder', 'isTx'])
@Index('idx_marketplaceAddress_timestamp', ['marketplaceAddress', 'timestamp'])
export class MarketplaceEventsEntity extends BaseEntity {
@Column({ length: 64 })
txHash: string;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import { MigrationInterface, QueryRunner } from "typeorm";

export class AddMarketplaceAddresTimestampIndex1699949169946 implements MigrationInterface {
name = 'AddMarketplaceAddresTimestampIndex1699949169946'

public async up(queryRunner: QueryRunner): Promise<void> {
await queryRunner.query(`CREATE INDEX \`idx_marketplaceAddress_timestamp\` ON \`marketplace_events\` (\`marketplaceAddress\`, \`timestamp\`)`);
}

public async down(queryRunner: QueryRunner): Promise<void> {
await queryRunner.query(`DROP INDEX \`idx_marketplaceAddress_timestamp\` ON \`marketplace_events\``);
}

}
62 changes: 58 additions & 4 deletions src/db/offers/offers.repository.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,64 @@ export class OffersRepository {
return savedOffer;
}

async saveBulkOffers(orders: OfferEntity[]): Promise<void> {
await this.offersRepository.save(orders, {
chunk: constants.dbBatch,
});
async getBulkIdsByMarketplaceAndOfferIds(
marketplaceKey: string,
marketplaceOfferIds: number[],
): Promise<number[]> {
const res = await this.offersRepository
.createQueryBuilder('o')
.select('id')
.where(`o.marketplaceKey = '${marketplaceKey}'`)
.andWhere(`o.marketplaceOfferId IN(:...marketplaceOfferIds)`, {
marketplaceOfferIds: marketplaceOfferIds,
})
.orderBy('marketplaceOfferId', 'ASC')
.execute();
return res.map((o) => o.id);
}

async saveBulkOffersOrUpdateAndFillId(offers: OfferEntity[]): Promise<void> {
if (offers.length === 0) {
return;
}
const saveOrUpdateResponse = await this.offersRepository
.createQueryBuilder()
.insert()
.into('offers')
.values(offers)
.orUpdate({
overwrite: [
'creationDate',
'modifiedDate',
'boughtTokensNo',
'status',
'priceToken',
'priceNonce',
'priceAmount',
'priceAmountDenominated',
'ownerAddress',
'endDate',
'blockHash',
],
conflict_target: ['marketplaceOfferId', 'marketplaceKey'],
})
.updateEntity(false)
.execute();
if (
saveOrUpdateResponse.identifiers.length === 0 ||
offers.findIndex((a) => a.id === undefined) !== -1
) {
const ids = await this.getBulkIdsByMarketplaceAndOfferIds(
offers?.[0]?.marketplaceKey,
offers?.map((o) => o.marketplaceOfferId),
);
for (let i = 0; i < offers.length; i++) {
offers[i].id = ids[i];
}
}
if (offers.findIndex((a) => a.id === undefined) !== -1) {
throw new Error('oooppps');
}
}

async updateOfferWithStatus(offer: OfferEntity, status: OfferStatusEnum) {
Expand Down
Loading
Loading