Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: Created daily_node_metrics table and received/stored station_id #188

Merged
merged 8 commits into from
Apr 30, 2024
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ Set up [PostgreSQL](https://www.postgresql.org/) with default settings:
- Database: spark_stats

Alternatively, set the environment variable `$DATABASE_URL` with
`postgres://${USER}:${PASS}@${HOST}:${POST}/${DATABASE}`.
`postgres://${USER}:${PASS}@${HOST}:${PORT}/${DATABASE}`.

The Postgres user and database need to exist already, and the user
needs full management permissions for the database.
Expand Down
26 changes: 26 additions & 0 deletions lib/platform-stats.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import createDebug from 'debug'

const debug = createDebug('spark:platform-stats')

/**
* @param {import('pg').Client} pgClient
* @param {import('./preprocess').Measurement[]} honestMeasurements
*/
export const updatePlatformStats = async (pgClient, honestMeasurements) => {
await updateDailyNodeMetrics(pgClient, honestMeasurements)
}

/**
* @param {import('pg').Client} pgClient
* @param {import('./preprocess').Measurement[]} honestMeasurements
*/
export const updateDailyNodeMetrics = async (pgClient, honestMeasurements) => {
debug('Updating daily node metrics, count=%s', honestMeasurements.length)
for (const m of honestMeasurements) {
bajtos marked this conversation as resolved.
Show resolved Hide resolved
await pgClient.query(`
INSERT INTO daily_node_metrics (station_id, metric_date)
VALUES ($1, now())
ON CONFLICT (station_id, metric_date) DO NOTHING
`, [m.stationId]) // TODO: when we add more fields, we should update the ON CONFLICT clause to update the fields
}
}
4 changes: 4 additions & 0 deletions lib/preprocess.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ export class Measurement {
this.end_at = parseDateTime(m.end_at)
this.status_code = m.status_code
this.indexerResult = pointerize(m.indexer_result)
this.stationId = pointerize(m.station_id)
}
}

Expand Down Expand Up @@ -200,6 +201,9 @@ const assertValidMeasurement = measurement => {
assert(ethers.isAddress(measurement.participantAddress), 'valid participant address required')
assert(typeof measurement.inet_group === 'string', 'valid inet group required')
assert(typeof measurement.finished_at === 'number', 'field `finished_at` must be set to a number')
if (measurement.stationId) {
assert(typeof measurement.stationId === 'string' && measurement.stationId.length === 64, 'stationId must be a string of 64 characters')
bajtos marked this conversation as resolved.
Show resolved Hide resolved
}
}

/**
Expand Down
2 changes: 2 additions & 0 deletions lib/public-stats.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import assert from 'node:assert'
import createDebug from 'debug'

import { updatePlatformStats } from './platform-stats.js'
import { getTaskId } from './retrieval-stats.js'

const debug = createDebug('spark:public-stats')
Expand Down Expand Up @@ -31,6 +32,7 @@ export const updatePublicStats = async ({ createPgClient, honestMeasurements })
}
await updateDailyParticipants(pgClient, participants)
await updateIndexerQueryStats(pgClient, honestMeasurements)
await updatePlatformStats(pgClient, honestMeasurements)
} finally {
await pgClient.end()
}
Expand Down
5 changes: 5 additions & 0 deletions migrations/006.do.daily-node-metrics.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
CREATE TABLE daily_node_metrics (
bajtos marked this conversation as resolved.
Show resolved Hide resolved
metric_date DATE NOT NULL,
bajtos marked this conversation as resolved.
Show resolved Hide resolved
station_id TEXT NOT NULL,
PRIMARY KEY (metric_date, station_id)
)
2 changes: 2 additions & 0 deletions test/helpers/test-data.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
export const VALID_PARTICIPANT_ADDRESS = '0x000000000000000000000000000000000000dEaD'
export const VALID_STATION_ID = '6400000000000000000000000000000000000000000000000000000000000000'

export const VALID_TASK = {
cid: 'QmUuEoBdjC8D1PfWZCc7JCSK8nj7TV6HbXWDHYHzZHCVGS',
Expand All @@ -13,6 +14,7 @@ export const VALID_MEASUREMENT = {
provider_address: '/dns4/production-ipfs-peer.pinata.cloud/tcp/3000/ws/p2p/Qma8ddFEQWEU8ijWvdxXm3nxU7oHsRtCykAaVz8WUYhiKn',
protocol: 'bitswap',
participantAddress: VALID_PARTICIPANT_ADDRESS,
stationId: VALID_STATION_ID,
inet_group: 'some-group-id',
status_code: 200,
timeout: false,
Expand Down
74 changes: 74 additions & 0 deletions test/platform-stats.test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
import assert from 'node:assert'
import pg from 'pg'
import { beforeEach, describe, it } from 'mocha'

import { DATABASE_URL } from '../lib/config.js'
import { migrateWithPgClient } from '../lib/migrate.js'
import { VALID_MEASUREMENT, VALID_STATION_ID } from './helpers/test-data.js'
import { updateDailyNodeMetrics } from '../lib/platform-stats.js'

const createPgClient = async () => {
const pgClient = new pg.Client({ connectionString: DATABASE_URL })
await pgClient.connect()
return pgClient
}

describe('platform-stats', () => {
let pgClient
before(async () => {
pgClient = await createPgClient()
await migrateWithPgClient(pgClient)
})

let today
beforeEach(async () => {
await pgClient.query('DELETE FROM daily_node_metrics')
await pgClient.query('BEGIN TRANSACTION')
bajtos marked this conversation as resolved.
Show resolved Hide resolved
today = await getCurrentDate()
})

afterEach(async () => {
await pgClient.query('END TRANSACTION')
})

after(async () => {
await pgClient.end()
})

describe('updateDailyNodeMetrics', () => {
it('updates daily node metrics for today with multiple measurements', async () => {
const validStationId2 = VALID_STATION_ID.slice(0, -1) + '1'
const honestMeasurements = [
{ ...VALID_MEASUREMENT, stationId: VALID_STATION_ID },
{ ...VALID_MEASUREMENT, stationId: validStationId2 }
]

await updateDailyNodeMetrics(pgClient, honestMeasurements)

const { rows } = await pgClient.query('SELECT station_id, metric_date::TEXT FROM daily_node_metrics ORDER BY station_id')
assert.strictEqual(rows.length, 2)
assert.deepStrictEqual(rows, [
{ station_id: VALID_STATION_ID, metric_date: today },
{ station_id: validStationId2, metric_date: today }
])
})

it('ignores duplicate measurements for the same station on the same day', async () => {
const honestMeasurements = [
{ ...VALID_MEASUREMENT, stationId: VALID_STATION_ID },
{ ...VALID_MEASUREMENT, stationId: VALID_STATION_ID }
]

await updateDailyNodeMetrics(pgClient, honestMeasurements)

const { rows } = await pgClient.query('SELECT station_id, metric_date::TEXT FROM daily_node_metrics')
assert.strictEqual(rows.length, 1)
assert.deepStrictEqual(rows, [{ station_id: VALID_STATION_ID, metric_date: today }])
})
})

const getCurrentDate = async () => {
const { rows: [{ today }] } = await pgClient.query('SELECT now()::DATE::TEXT as today')
return today
}
})
4 changes: 4 additions & 0 deletions test/preprocess.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { Point } from '../lib/telemetry.js'
import assert from 'node:assert'
import createDebug from 'debug'
import { assertPointFieldValue, assertRecordedTelemetryPoint } from './helpers/assertions.js'
import { VALID_STATION_ID } from './helpers/test-data.js'
import { RoundData } from '../lib/round.js'

const debug = createDebug('test')
Expand All @@ -23,6 +24,7 @@ describe('preprocess', () => {
const roundIndex = 0
const measurements = [{
participant_address: 'f410ftgmzttyqi3ti4nxbvixa4byql3o5d4eo3jtc43i',
station_id: VALID_STATION_ID,
spark_version: '1.2.3',
inet_group: 'ig1',
finished_at: '2023-11-01T09:00:00.000Z',
Expand All @@ -41,6 +43,7 @@ describe('preprocess', () => {
assert.deepStrictEqual(round.measurements, [
new Measurement({
participant_address: '0x999999cf1046e68e36E1aA2E0E07105eDDD1f08E',
station_id: VALID_STATION_ID,
spark_version: '1.2.3',
inet_group: 'ig1',
finished_at: '2023-11-01T09:00:00.000Z',
Expand Down Expand Up @@ -97,6 +100,7 @@ describe('getRetrievalResult', () => {
spark_version: '1.5.2',
zinnia_version: '0.14.0',
participant_address: 'f410fgkhpcrbmdvic52o3nivftrjxr7nzw47updmuzra',
station_id: VALID_STATION_ID,
finished_at: '2023-11-01T09:42:03.246Z',
timeout: false,
start_at: '2023-11-01T09:40:03.393Z',
Expand Down