Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat!: per-miner & per-client daily deal stats #336

Merged
merged 16 commits into from
Sep 9, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion lib/evaluate.js
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,14 @@ export const evaluate = async ({

if (createPgClient) {
try {
await updatePublicStats({ createPgClient, committees, honestMeasurements, allMeasurements: measurements })
await updatePublicStats({
createPgClient,
committees,
honestMeasurements,
allMeasurements: measurements,
findDealClients: (minerId, cid) => sparkRoundDetails.retrievalTasks
.find(t => t.cid === cid && t.minerId === minerId)?.clients
})
} catch (err) {
console.error('Cannot update public stats.', err)
ignoredErrors.push(err)
Expand Down
120 changes: 96 additions & 24 deletions lib/public-stats.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,9 @@ const debug = createDebug('spark:public-stats')
* @param {Iterable<Committee>} args.committees
* @param {import('./preprocess.js').Measurement[]} args.honestMeasurements
* @param {import('./preprocess.js').Measurement[]} args.allMeasurements
* @param {(minerId: string, cid: string) => (string[] | undefined)} args.findDealClients
*/
export const updatePublicStats = async ({ createPgClient, committees, honestMeasurements, allMeasurements }) => {
export const updatePublicStats = async ({ createPgClient, committees, honestMeasurements, allMeasurements, findDealClients }) => {
/** @type {Map<string, {total: number, successful: number}>} */
const minerRetrievalStats = new Map()
for (const m of honestMeasurements) {
Expand All @@ -32,7 +33,7 @@ export const updatePublicStats = async ({ createPgClient, committees, honestMeas
await updateRetrievalStats(pgClient, minerId, retrievalStats)
}
await updateIndexerQueryStats(pgClient, committees)
await updateDailyDealsStats(pgClient, committees)
await updateDailyDealsStats(pgClient, committees, findDealClients)
await updatePlatformStats(pgClient, honestMeasurements, allMeasurements)
} finally {
await pgClient.end()
Expand Down Expand Up @@ -101,37 +102,108 @@ const updateIndexerQueryStats = async (pgClient, committees) => {
/**
* @param {pg.Client} pgClient
* @param {Iterable<Committee>} committees
* @param {(minerId: string, cid: string) => (string[] | undefined)} findDealClients
*/
const updateDailyDealsStats = async (pgClient, committees) => {
let total = 0
let indexed = 0
let retrievable = 0
const updateDailyDealsStats = async (pgClient, committees, findDealClients) => {
/** @type {Map<string, Map<string, {
* tested: number;
* majority_found: number;
* indexed: number;
* indexed_http: number;
* retrievable: number;
* }>>} */
const minerClientDealStats = new Map()
for (const c of committees) {
total++
const { minerId, cid } = c.retrievalTask
let clients = findDealClients(minerId, cid)
if (!(clients && clients.length)) {
bajtos marked this conversation as resolved.
Show resolved Hide resolved
console.warn(`Invalid retrieval task (${minerId}, ${cid}): no deal clients found. Using a placeholder value 'no-clients'.`)
bajtos marked this conversation as resolved.
Show resolved Hide resolved
clients = ['no-clients']
}

const evaluation = c.evaluation
if (!evaluation) continue
if (evaluation.indexerResult === 'OK' || evaluation.indexerResult === 'HTTP_NOT_ADVERTISED') {
indexed++
let clientDealStats = minerClientDealStats.get(minerId)
if (!clientDealStats) {
clientDealStats = new Map()
minerClientDealStats.set(minerId, clientDealStats)
}
if (evaluation.retrievalResult === 'OK') {
retrievable++

for (const clientId of clients) {
let stats = clientDealStats.get(clientId)
if (!stats) {
stats = {
tested: 0,
majority_found: 0,
indexed: 0,
indexed_http: 0,
retrievable: 0
}
clientDealStats.set(clientId, stats)
}

stats.tested++

const evaluation = c.evaluation
if (!evaluation) continue

stats.majority_found++

if (evaluation.indexerResult === 'OK' || evaluation.indexerResult === 'HTTP_NOT_ADVERTISED') {
stats.indexed++
}

if (evaluation.indexerResult === 'OK') {
stats.indexed_http++
}

if (evaluation.retrievalResult === 'OK') {
stats.retrievable++
}
}
}

debug('Updating public stats - daily deals: total += %s indexed += %s retrievable += %s', total, indexed, retrievable)
// Convert the nested map to an array for the query
const flatStats = Array.from(minerClientDealStats.entries()).flatMap(
([minerId, clientDealStats]) => Array.from(clientDealStats.entries()).flatMap(
([clientId, stats]) => ({ minerId, clientId, ...stats })
)
)

if (debug.enabled) {
debug(
'Updating public stats - daily deals: tested += %s majority_found += %s indexed += %s retrievable += %s',
flatStats.reduce((sum, stat) => sum + stat.tested, 0),
flatStats.reduce((sum, stat) => sum + stat.majority_found, 0),
flatStats.reduce((sum, stat) => sum + stat.indexed, 0),
flatStats.reduce((sum, stat) => sum + stat.retrievable, 0)
)
}

await pgClient.query(`
INSERT INTO daily_deals
(day, total, indexed, retrievable)
VALUES
(now(), $1, $2, $3)
ON CONFLICT(day) DO UPDATE SET
total = daily_deals.total + $1,
indexed = daily_deals.indexed + $2,
retrievable = daily_deals.retrievable + $3
(day, miner_id, client_id, tested, majority_found, indexed, indexed_http, retrievable)
VALUES (
now(),
unnest($1::text[]),
unnest($2::text[]),
unnest($3::int[]),
unnest($4::int[]),
unnest($5::int[]),
unnest($6::int[]),
unnest($7::int[])
)
ON CONFLICT(day, miner_id, client_id) DO UPDATE SET
tested = daily_deals.tested + EXCLUDED.tested,
majority_found = daily_deals.majority_found + EXCLUDED.majority_found,
indexed = daily_deals.indexed + EXCLUDED.indexed,
indexed_http = daily_deals.indexed_http + EXCLUDED.indexed_http,
retrievable = daily_deals.retrievable + EXCLUDED.retrievable
`, [
total,
indexed,
retrievable
flatStats.map(stat => stat.minerId),
flatStats.map(stat => stat.clientId),
flatStats.map(stat => stat.tested),
flatStats.map(stat => stat.majority_found),
flatStats.map(stat => stat.indexed),
flatStats.map(stat => stat.indexed_http),
flatStats.map(stat => stat.retrievable)
])
}
1 change: 1 addition & 0 deletions lib/typings.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ export {
export interface RetrievalTask {
cid: string;
minerId: string;
clients?: string[];
bajtos marked this conversation as resolved.
Show resolved Hide resolved
}

/**
Expand Down
35 changes: 35 additions & 0 deletions migrations/014.do.deal-retrievability-score.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
-- For each (day, miner_id, client_id), we want to know the following numbers (counts):
-- * `tested`: (NEW) total deals tested
-- * `indexed`: deals announcing retrievals to IPNI (HTTP or Graphsync retrievals)
-- * `indexed_http`: (NEW) deals announcing HTTP retrievals to IPNI
-- * `majority_found`: (NEW) deals where we found a majority agreeing on the same result
-- * `retrievable`: deals where the majority agrees the content can be retrieved

ALTER TABLE daily_deals ADD COLUMN miner_id TEXT;
UPDATE daily_deals SET miner_id = 'all-combined';
ALTER TABLE daily_deals ALTER COLUMN miner_id SET NOT NULL;

ALTER TABLE daily_deals ADD COLUMN client_id TEXT;
UPDATE daily_deals SET client_id = 'all-combined';
ALTER TABLE daily_deals ALTER COLUMN client_id SET NOT NULL;

-- Change the primary key to a composite pair (day, miner_id, client_id)
ALTER TABLE daily_deals DROP CONSTRAINT daily_deals_pkey;
ALTER TABLE daily_deals ADD PRIMARY KEY (day, miner_id, client_id);

ALTER TABLE daily_deals ADD COLUMN majority_found INT;
UPDATE daily_deals SET majority_found = total;
ALTER TABLE daily_deals ALTER COLUMN majority_found SET NOT NULL;

-- Note: backfilling `tested = total` is not entirely accurate:
-- * Before we introduced committees & majorities, tested = total
-- * After that change we started to calculate total = majority_found
ALTER TABLE daily_deals RENAME COLUMN total to tested;

ALTER TABLE daily_deals ADD COLUMN indexed_http INT;
-- We don't how many of the deals tested in the past offered HTTP retrievals.
-- Historically, this value was between 1/7 to 1/3 of indexed deals.
-- I am using 1/5 as an approximation to give us more meaningful data than 0.
UPDATE daily_deals SET indexed_http = indexed/5;
ALTER TABLE daily_deals ALTER COLUMN indexed_http SET NOT NULL;

73 changes: 61 additions & 12 deletions test/public-stats.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,13 @@ describe('public-stats', () => {
const allMeasurements = honestMeasurements
const committees = buildEvaluatedCommitteesFromMeasurements(honestMeasurements)

await updatePublicStats({ createPgClient, committees, honestMeasurements, allMeasurements })
await updatePublicStats({
createPgClient,
committees,
honestMeasurements,
allMeasurements,
findDealClients: (_minerId, _cid) => ['f0client']
})

const { rows: created } = await pgClient.query(
'SELECT day::TEXT, total, successful FROM retrieval_stats'
Expand All @@ -64,7 +70,13 @@ describe('public-stats', () => {
])

honestMeasurements.push({ ...VALID_MEASUREMENT, retrievalResult: 'UNKNOWN_ERROR' })
await updatePublicStats({ createPgClient, committees, honestMeasurements, allMeasurements })
await updatePublicStats({
createPgClient,
committees,
honestMeasurements,
allMeasurements,
findDealClients: (_minerId, _cid) => ['f0client']
})

const { rows: updated } = await pgClient.query(
'SELECT day::TEXT, total, successful FROM retrieval_stats'
Expand All @@ -84,8 +96,14 @@ describe('public-stats', () => {
const allMeasurements = honestMeasurements
const committees = buildEvaluatedCommitteesFromMeasurements(honestMeasurements)

await updatePublicStats({ createPgClient, committees, honestMeasurements, allMeasurements })
await updatePublicStats({

createPgClient,
committees,
honestMeasurements,
allMeasurements,
findDealClients: (_minerId, _cid) => ['f0client']
})
const { rows: created } = await pgClient.query(
'SELECT day::TEXT, miner_id, total, successful FROM retrieval_stats'
)
Expand All @@ -96,7 +114,13 @@ describe('public-stats', () => {

honestMeasurements.push({ ...VALID_MEASUREMENT, minerId: 'f1first', retrievalResult: 'UNKNOWN_ERROR' })
honestMeasurements.push({ ...VALID_MEASUREMENT, minerId: 'f1second', retrievalResult: 'UNKNOWN_ERROR' })
await updatePublicStats({ createPgClient, committees, honestMeasurements, allMeasurements })
await updatePublicStats({
createPgClient,
committees,
honestMeasurements,
allMeasurements,
findDealClients: (_minerId, _cid) => ['f0client']
})

const { rows: updated } = await pgClient.query(
'SELECT day::TEXT, miner_id, total, successful FROM retrieval_stats'
Expand All @@ -119,7 +143,13 @@ describe('public-stats', () => {
const allMeasurements = honestMeasurements
let committees = buildEvaluatedCommitteesFromMeasurements(honestMeasurements)

await updatePublicStats({ createPgClient, committees, honestMeasurements, allMeasurements })
await updatePublicStats({
createPgClient,
committees,
honestMeasurements,
allMeasurements,
findDealClients: (_minerId, _cid) => ['f0client']
})

const { rows: created } = await pgClient.query(
'SELECT day::TEXT, deals_tested, deals_advertising_http FROM indexer_query_stats'
Expand All @@ -135,7 +165,13 @@ describe('public-stats', () => {
honestMeasurements.push({ ...VALID_MEASUREMENT, cid: 'bafy4', indexerResult: 'UNKNOWN_ERROR' })
committees = buildEvaluatedCommitteesFromMeasurements(honestMeasurements)

await updatePublicStats({ createPgClient, committees, honestMeasurements, allMeasurements })
await updatePublicStats({
createPgClient,
committees,
honestMeasurements,
allMeasurements,
findDealClients: (_minerId, _cid) => ['f0client']
})

const { rows: updated } = await pgClient.query(
'SELECT day::TEXT, deals_tested, deals_advertising_http FROM indexer_query_stats'
Expand All @@ -147,6 +183,7 @@ describe('public-stats', () => {
})

describe('daily_deals', () => {
// TODO: add tests for miner_id, client_id, majority_found, indexed_http
it('creates or updates the row for today', async () => {
/** @type {Measurement[]} */
const honestMeasurements = [
Expand All @@ -159,13 +196,19 @@ describe('public-stats', () => {
const allMeasurements = honestMeasurements
let committees = buildEvaluatedCommitteesFromMeasurements(honestMeasurements)

await updatePublicStats({ createPgClient, committees, honestMeasurements, allMeasurements })
await updatePublicStats({
createPgClient,
committees,
honestMeasurements,
allMeasurements,
findDealClients: (_minerId, _cid) => ['f0client']
})

const { rows: created } = await pgClient.query(
'SELECT day::TEXT, total, indexed, retrievable FROM daily_deals'
'SELECT day::TEXT, tested, indexed, retrievable FROM daily_deals'
)
assert.deepStrictEqual(created, [
{ day: today, total: 4, indexed: 3, retrievable: 1 }
{ day: today, tested: 4, indexed: 3, retrievable: 1 }
])

// Notice: this measurement is for the same task as honestMeasurements[0], therefore it's
Expand All @@ -176,14 +219,20 @@ describe('public-stats', () => {
honestMeasurements.push({ ...VALID_MEASUREMENT, cid: 'bafy5', indexerResult: 'UNKNOWN_ERROR', retrievalResult: 'IPNI_UNKNOWN_ERROR' })
committees = buildEvaluatedCommitteesFromMeasurements(honestMeasurements)

await updatePublicStats({ createPgClient, committees, honestMeasurements, allMeasurements })
await updatePublicStats({
createPgClient,
committees,
honestMeasurements,
allMeasurements,
findDealClients: (_minerId, _cid) => ['f0client']
})

const { rows: updated } = await pgClient.query(
'SELECT day::TEXT, total, indexed, retrievable FROM daily_deals'
'SELECT day::TEXT, tested, indexed, retrievable FROM daily_deals'
)
assert.deepStrictEqual(updated, [{
day: today,
total: 2 * 4 + 1 /* added bafy5 */,
tested: 2 * 4 + 1 /* added bafy5 */,
indexed: 2 * 3 + 1 /* bafy5 is indexed */,
retrievable: 2 * 1 + 0 /* bafy5 not retrievable */
}])
Expand Down