Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat!: per-miner & per-client daily deal stats #336

Merged
merged 16 commits into from
Sep 9, 2024
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion lib/evaluate.js
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,14 @@ export const evaluate = async ({

if (createPgClient) {
try {
await updatePublicStats({ createPgClient, committees, honestMeasurements, allMeasurements: measurements })
await updatePublicStats({
createPgClient,
committees,
honestMeasurements,
allMeasurements: measurements,
findDealClients: (minerId, cid) => sparkRoundDetails.retrievalTasks
.find(t => t.cid === cid && t.minerId === minerId)?.clients
})
} catch (err) {
console.error('Cannot update public stats.', err)
ignoredErrors.push(err)
Expand Down
149 changes: 124 additions & 25 deletions lib/public-stats.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import * as Sentry from '@sentry/node'
import createDebug from 'debug'

import { updatePlatformStats } from './platform-stats.js'
Expand All @@ -14,8 +15,9 @@ const debug = createDebug('spark:public-stats')
* @param {Iterable<Committee>} args.committees
* @param {import('./preprocess.js').Measurement[]} args.honestMeasurements
* @param {import('./preprocess.js').Measurement[]} args.allMeasurements
* @param {(minerId: string, cid: string) => (string[] | undefined)} args.findDealClients
*/
export const updatePublicStats = async ({ createPgClient, committees, honestMeasurements, allMeasurements }) => {
export const updatePublicStats = async ({ createPgClient, committees, honestMeasurements, allMeasurements, findDealClients }) => {
/** @type {Map<string, {total: number, successful: number}>} */
const minerRetrievalStats = new Map()
for (const c of committees) {
Expand All @@ -35,7 +37,7 @@ export const updatePublicStats = async ({ createPgClient, committees, honestMeas
await updateRetrievalStats(pgClient, minerId, retrievalStats)
}
await updateIndexerQueryStats(pgClient, committees)
await updateDailyDealsStats(pgClient, committees)
await updateDailyDealsStats(pgClient, committees, findDealClients)
await updatePlatformStats(pgClient, honestMeasurements, allMeasurements)
} finally {
await pgClient.end()
Expand Down Expand Up @@ -104,37 +106,134 @@ const updateIndexerQueryStats = async (pgClient, committees) => {
/**
* @param {pg.Client} pgClient
* @param {Iterable<Committee>} committees
* @param {(minerId: string, cid: string) => (string[] | undefined)} findDealClients
*/
const updateDailyDealsStats = async (pgClient, committees) => {
let total = 0
let indexed = 0
let retrievable = 0
const updateDailyDealsStats = async (pgClient, committees, findDealClients) => {
/** @type {Map<string, Map<string, {
* tested: number;
* index_majority_found: number;
* retrieval_majority_found: number;
* indexed: number;
* indexed_http: number;
* retrievable: number;
* }>>} */
const minerClientDealStats = new Map()
for (const c of committees) {
total++
const { minerId, cid } = c.retrievalTask
let clients = findDealClients(minerId, cid)
if (!clients || !clients.length) {
console.warn(`Invalid retrieval task (${minerId}, ${cid}): no deal clients found. Using a placeholder value 'no-clients'.`)
bajtos marked this conversation as resolved.
Show resolved Hide resolved
clients = ['no-clients']
Sentry.captureException(new Error('Invalid retrieval task: no deal clients found.'), {
extra: {
minerId,
cid
}
})
}

const evaluation = c.evaluation
if (!evaluation) continue
if (evaluation.indexerResult === 'OK' || evaluation.indexerResult === 'HTTP_NOT_ADVERTISED') {
indexed++
let clientDealStats = minerClientDealStats.get(minerId)
if (!clientDealStats) {
clientDealStats = new Map()
minerClientDealStats.set(minerId, clientDealStats)
}
if (evaluation.retrievalResult === 'OK') {
retrievable++

for (const clientId of clients) {
let stats = clientDealStats.get(clientId)
if (!stats) {
stats = {
tested: 0,
index_majority_found: 0,
retrieval_majority_found: 0,
indexed: 0,
indexed_http: 0,
retrievable: 0
}
clientDealStats.set(clientId, stats)
}

stats.tested++

const evaluation = c.evaluation
if (!evaluation) continue

if (evaluation.indexerResult !== 'COMMITTEE_TOO_SMALL' && evaluation.indexerResult !== 'MAJORITY_NOT_FOUND') {
bajtos marked this conversation as resolved.
Show resolved Hide resolved
stats.index_majority_found++
}

if (evaluation.indexerResult === 'OK' || evaluation.indexerResult === 'HTTP_NOT_ADVERTISED') {
stats.indexed++
}

if (evaluation.indexerResult === 'OK') {
stats.indexed_http++
}

if (evaluation.retrievalResult !== 'COMMITTEE_TOO_SMALL' && evaluation.indexerResult !== 'MAJORITY_NOT_FOUND') {
bajtos marked this conversation as resolved.
Show resolved Hide resolved
stats.retrieval_majority_found++
}

if (evaluation.retrievalResult === 'OK') {
stats.retrievable++
}
}
}

debug('Updating public stats - daily deals: total += %s indexed += %s retrievable += %s', total, indexed, retrievable)
// Convert the nested map to an array for the query
const flatStats = Array.from(minerClientDealStats.entries()).flatMap(
([minerId, clientDealStats]) => Array.from(clientDealStats.entries()).flatMap(
([clientId, stats]) => ({ minerId, clientId, ...stats })
)
)

if (debug.enabled) {
debug(
'Updating public stats - daily deals: tested += %s index_majority_found += %s indexed += %s retrieval_majority_found += %s retrievable += %s',
flatStats.reduce((sum, stat) => sum + stat.tested, 0),
flatStats.reduce((sum, stat) => sum + stat.index_majority_found, 0),
flatStats.reduce((sum, stat) => sum + stat.indexed, 0),
flatStats.reduce((sum, stat) => sum + stat.retrieval_majority_found, 0),
flatStats.reduce((sum, stat) => sum + stat.retrievable, 0)
)
}

await pgClient.query(`
INSERT INTO daily_deals
(day, total, indexed, retrievable)
VALUES
(now(), $1, $2, $3)
ON CONFLICT(day) DO UPDATE SET
total = daily_deals.total + $1,
indexed = daily_deals.indexed + $2,
retrievable = daily_deals.retrievable + $3
INSERT INTO daily_deals (
day,
miner_id,
client_id,
tested,
index_majority_found,
indexed,
indexed_http,
retrieval_majority_found,
retrievable
) VALUES (
now(),
unnest($1::text[]),
unnest($2::text[]),
unnest($3::int[]),
unnest($4::int[]),
unnest($5::int[]),
unnest($6::int[]),
unnest($7::int[]),
unnest($8::int[])
)
ON CONFLICT(day, miner_id, client_id) DO UPDATE SET
tested = daily_deals.tested + EXCLUDED.tested,
index_majority_found = daily_deals.index_majority_found + EXCLUDED.index_majority_found,
indexed = daily_deals.indexed + EXCLUDED.indexed,
indexed_http = daily_deals.indexed_http + EXCLUDED.indexed_http,
retrieval_majority_found = daily_deals.retrieval_majority_found + EXCLUDED.retrieval_majority_found,
retrievable = daily_deals.retrievable + EXCLUDED.retrievable
`, [
total,
indexed,
retrievable
flatStats.map(stat => stat.minerId),
flatStats.map(stat => stat.clientId),
flatStats.map(stat => stat.tested),
flatStats.map(stat => stat.index_majority_found),
flatStats.map(stat => stat.indexed),
flatStats.map(stat => stat.indexed_http),
flatStats.map(stat => stat.retrieval_majority_found),
flatStats.map(stat => stat.retrievable)
])
}
1 change: 1 addition & 0 deletions lib/typings.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ export {
export interface RetrievalTask {
cid: string;
minerId: string;
clients?: string[];
bajtos marked this conversation as resolved.
Show resolved Hide resolved
}

/**
Expand Down
41 changes: 41 additions & 0 deletions migrations/015.do.deal-retrievability-score.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
-- For each (day, miner_id, client_id), we want to know the following numbers (counts):
-- * `tested`: (NEW) total deals tested
-- * `indexed`: deals announcing retrievals to IPNI (HTTP or Graphsync retrievals)
-- * `indexed_http`: (NEW) deals announcing HTTP retrievals to IPNI
-- * `majority_found`: (NEW) deals where we found a majority agreeing on the same result
-- * `retrievable`: deals where the majority agrees the content can be retrieved

ALTER TABLE daily_deals ADD COLUMN miner_id TEXT;
UPDATE daily_deals SET miner_id = 'all-combined';
ALTER TABLE daily_deals ALTER COLUMN miner_id SET NOT NULL;

ALTER TABLE daily_deals ADD COLUMN client_id TEXT;
UPDATE daily_deals SET client_id = 'all-combined';
ALTER TABLE daily_deals ALTER COLUMN client_id SET NOT NULL;

-- Change the primary key to a composite pair (day, miner_id, client_id)
ALTER TABLE daily_deals DROP CONSTRAINT daily_deals_pkey;
ALTER TABLE daily_deals ADD PRIMARY KEY (day, miner_id, client_id);

CREATE INDEX daily_deals_day ON daily_deals (day);

ALTER TABLE daily_deals ADD COLUMN retrieval_majority_found INT;
UPDATE daily_deals SET retrieval_majority_found = total;
ALTER TABLE daily_deals ALTER COLUMN retrieval_majority_found SET NOT NULL;

ALTER TABLE daily_deals ADD COLUMN index_majority_found INT;
UPDATE daily_deals SET index_majority_found = total;
ALTER TABLE daily_deals ALTER COLUMN index_majority_found SET NOT NULL;

-- Note: backfilling `tested = total` is not entirely accurate:
-- * Before we introduced committees & majorities, tested = total
-- * After that change we started to calculate total = majority_found
ALTER TABLE daily_deals RENAME COLUMN total to tested;

ALTER TABLE daily_deals ADD COLUMN indexed_http INT;
-- We don't how many of the deals tested in the past offered HTTP retrievals.
-- Historically, this value was between 1/7 to 1/3 of indexed deals.
-- I am using 1/5 as an approximation to give us more meaningful data than 0.
UPDATE daily_deals SET indexed_http = indexed/5;
ALTER TABLE daily_deals ALTER COLUMN indexed_http SET NOT NULL;

Loading