diff --git a/.github/workflows/cron.yml b/.github/workflows/cron.yml new file mode 100644 index 0000000000..f566933403 --- /dev/null +++ b/.github/workflows/cron.yml @@ -0,0 +1,23 @@ +name: cron +on: + push: + branches: + - main + paths: + - 'packages/cron/**' + - '.github/workflows/cron.yml' + pull_request: + paths: + - 'packages/cron/**' + - '.github/workflows/cron.yml' +jobs: + test: + runs-on: ubuntu-latest + name: Test + steps: + - uses: actions/checkout@v2 + - uses: actions/setup-node@v2 + with: + node-version: 16 + - uses: bahmutov/npm-install@v1 + - run: npm test --workspace packages/cron \ No newline at end of file diff --git a/package-lock.json b/package-lock.json index b8aeaf75cc..736e7395fd 100644 --- a/package-lock.json +++ b/package-lock.json @@ -5537,7 +5537,6 @@ "version": "2.0.0", "resolved": "https://registry.npmjs.org/buffer-writer/-/buffer-writer-2.0.0.tgz", "integrity": "sha512-a7ZpuTZU1TRtnwyCNW3I5dc0wWNC3VR9S++Ewyk2HHZdrO3CQJqSpd+95Us590V6AL7JqUAH2IwZ/398PmNFgw==", - "dev": true, "engines": { "node": ">=4" } @@ -15575,8 +15574,7 @@ "node_modules/packet-reader": { "version": "1.0.0", "resolved": "https://registry.npmjs.org/packet-reader/-/packet-reader-1.0.0.tgz", - "integrity": "sha512-HAKu/fG3HpHFO0AA8WE8q2g+gBJaZ9MG7fcKk+IJPLTGAD6Psw4443l+9DGRbOIh3/aXr7Phy0TjilYivJo5XQ==", - "dev": true + "integrity": "sha512-HAKu/fG3HpHFO0AA8WE8q2g+gBJaZ9MG7fcKk+IJPLTGAD6Psw4443l+9DGRbOIh3/aXr7Phy0TjilYivJo5XQ==" }, "node_modules/pako": { "version": "1.0.11", @@ -15744,7 +15742,6 @@ "version": "8.7.1", "resolved": "https://registry.npmjs.org/pg/-/pg-8.7.1.tgz", "integrity": "sha512-7bdYcv7V6U3KAtWjpQJJBww0UEsWuh4yQ/EjNf2HeO/NnvKjpvhEIe/A/TleP6wtmSKnUnghs5A9jUoK6iDdkA==", - "dev": true, "dependencies": { "buffer-writer": "2.0.0", "packet-reader": "1.0.0", @@ -15769,14 +15766,12 @@ "node_modules/pg-connection-string": { "version": "2.5.0", "resolved": "https://registry.npmjs.org/pg-connection-string/-/pg-connection-string-2.5.0.tgz", - "integrity": "sha512-r5o/V/ORTA6TmUnyWZR9nCj1klXCO2CEKNRlVuJptZe85QuhFayC7WeMic7ndayT5IRIR0S0xFxFi2ousartlQ==", - "dev": true + "integrity": "sha512-r5o/V/ORTA6TmUnyWZR9nCj1klXCO2CEKNRlVuJptZe85QuhFayC7WeMic7ndayT5IRIR0S0xFxFi2ousartlQ==" }, "node_modules/pg-int8": { "version": "1.0.1", "resolved": "https://registry.npmjs.org/pg-int8/-/pg-int8-1.0.1.tgz", "integrity": "sha512-WCtabS6t3c8SkpDBUlb1kjOs7l66xsGdKpIPZsg4wR+B3+u9UAum2odSsF9tnvxg80h4ZxLWMy4pRjOsFIqQpw==", - "dev": true, "engines": { "node": ">=4.0.0" } @@ -15785,7 +15780,6 @@ "version": "3.4.1", "resolved": "https://registry.npmjs.org/pg-pool/-/pg-pool-3.4.1.tgz", "integrity": "sha512-TVHxR/gf3MeJRvchgNHxsYsTCHQ+4wm3VIHSS19z8NC0+gioEhq1okDY1sm/TYbfoP6JLFx01s0ShvZ3puP/iQ==", - "dev": true, "peerDependencies": { "pg": ">=8.0" } @@ -15793,14 +15787,12 @@ "node_modules/pg-protocol": { "version": "1.5.0", "resolved": "https://registry.npmjs.org/pg-protocol/-/pg-protocol-1.5.0.tgz", - "integrity": "sha512-muRttij7H8TqRNu/DxrAJQITO4Ac7RmX3Klyr/9mJEOBeIpgnF8f9jAfRz5d3XwQZl5qBjF9gLsUtMPJE0vezQ==", - "dev": true + "integrity": "sha512-muRttij7H8TqRNu/DxrAJQITO4Ac7RmX3Klyr/9mJEOBeIpgnF8f9jAfRz5d3XwQZl5qBjF9gLsUtMPJE0vezQ==" }, "node_modules/pg-types": { "version": "2.2.0", "resolved": "https://registry.npmjs.org/pg-types/-/pg-types-2.2.0.tgz", "integrity": "sha512-qTAAlrEsl8s4OiEQY69wDvcMIdQN6wdz5ojQiOy6YRMuynxenON0O5oCpJI6lshc6scgAY8qvJ2On/p+CXY0GA==", - "dev": true, "dependencies": { "pg-int8": "1.0.1", "postgres-array": "~2.0.0", @@ -15816,7 +15808,6 @@ "version": "1.0.4", "resolved": "https://registry.npmjs.org/pgpass/-/pgpass-1.0.4.tgz", "integrity": "sha512-YmuA56alyBq7M59vxVBfPJrGSozru8QAdoNlWuW3cz8l+UX3cWge0vTvjKhsSHSJpo3Bom8/Mm6hf0TR5GY0+w==", - "dev": true, "dependencies": { "split2": "^3.1.1" } @@ -16472,7 +16463,6 @@ "version": "2.0.0", "resolved": "https://registry.npmjs.org/postgres-array/-/postgres-array-2.0.0.tgz", "integrity": "sha512-VpZrUqU5A69eQyW2c5CA1jtLecCsN2U/bD6VilrFDWq5+5UIEVO7nazS3TEcHf1zuPYO/sqGvUvW62g86RXZuA==", - "dev": true, "engines": { "node": ">=4" } @@ -16481,7 +16471,6 @@ "version": "1.0.0", "resolved": "https://registry.npmjs.org/postgres-bytea/-/postgres-bytea-1.0.0.tgz", "integrity": "sha1-AntTPAqokOJtFy1Hz5zOzFIazTU=", - "dev": true, "engines": { "node": ">=0.10.0" } @@ -16490,7 +16479,6 @@ "version": "1.0.7", "resolved": "https://registry.npmjs.org/postgres-date/-/postgres-date-1.0.7.tgz", "integrity": "sha512-suDmjLVQg78nMK2UZ454hAG+OAW+HQPZ6n++TNDUX+L0+uUlLywnoxJKDou51Zm+zTCjrCl0Nq6J9C5hP9vK/Q==", - "dev": true, "engines": { "node": ">=0.10.0" } @@ -16499,7 +16487,6 @@ "version": "1.2.0", "resolved": "https://registry.npmjs.org/postgres-interval/-/postgres-interval-1.2.0.tgz", "integrity": "sha512-9ZhXKM/rw350N1ovuWHbGxnGh/SNJ4cnxHiM0rxE4VN41wsg8P8zWn9hv/buK00RP4WvlOyr/RBDiptyxVbkZQ==", - "dev": true, "dependencies": { "xtend": "^4.0.0" }, @@ -21411,11 +21398,15 @@ "limiter": "2.0.1", "node-fetch": "^2.6.1", "p-retry": "^4.6.1", + "pg": "^8.7.1", "piggybacker": "^2.0.0" }, "devDependencies": { "@types/node": "^16.3.1", - "npm-run-all": "^4.1.5" + "execa": "^5.1.1", + "mocha": "^8.3.2", + "npm-run-all": "^4.1.5", + "smoke": "^3.1.1" } }, "packages/cron/node_modules/dotenv": { @@ -25142,11 +25133,15 @@ "@web3-storage/db": "^2.0.0", "debug": "^4.3.1", "dotenv": "^9.0.2", + "execa": "^5.1.1", "limiter": "2.0.1", + "mocha": "^8.3.2", "node-fetch": "^2.6.1", "npm-run-all": "^4.1.5", "p-retry": "^4.6.1", - "piggybacker": "^2.0.0" + "pg": "^8.7.1", + "piggybacker": "^2.0.0", + "smoke": "^3.1.1" }, "dependencies": { "dotenv": { @@ -26578,8 +26573,7 @@ "buffer-writer": { "version": "2.0.0", "resolved": "https://registry.npmjs.org/buffer-writer/-/buffer-writer-2.0.0.tgz", - "integrity": "sha512-a7ZpuTZU1TRtnwyCNW3I5dc0wWNC3VR9S++Ewyk2HHZdrO3CQJqSpd+95Us590V6AL7JqUAH2IwZ/398PmNFgw==", - "dev": true + "integrity": "sha512-a7ZpuTZU1TRtnwyCNW3I5dc0wWNC3VR9S++Ewyk2HHZdrO3CQJqSpd+95Us590V6AL7JqUAH2IwZ/398PmNFgw==" }, "buffer-xor": { "version": "1.0.3", @@ -34307,8 +34301,7 @@ "packet-reader": { "version": "1.0.0", "resolved": "https://registry.npmjs.org/packet-reader/-/packet-reader-1.0.0.tgz", - "integrity": "sha512-HAKu/fG3HpHFO0AA8WE8q2g+gBJaZ9MG7fcKk+IJPLTGAD6Psw4443l+9DGRbOIh3/aXr7Phy0TjilYivJo5XQ==", - "dev": true + "integrity": "sha512-HAKu/fG3HpHFO0AA8WE8q2g+gBJaZ9MG7fcKk+IJPLTGAD6Psw4443l+9DGRbOIh3/aXr7Phy0TjilYivJo5XQ==" }, "pako": { "version": "1.0.11", @@ -34449,7 +34442,6 @@ "version": "8.7.1", "resolved": "https://registry.npmjs.org/pg/-/pg-8.7.1.tgz", "integrity": "sha512-7bdYcv7V6U3KAtWjpQJJBww0UEsWuh4yQ/EjNf2HeO/NnvKjpvhEIe/A/TleP6wtmSKnUnghs5A9jUoK6iDdkA==", - "dev": true, "requires": { "buffer-writer": "2.0.0", "packet-reader": "1.0.0", @@ -34463,33 +34455,28 @@ "pg-connection-string": { "version": "2.5.0", "resolved": "https://registry.npmjs.org/pg-connection-string/-/pg-connection-string-2.5.0.tgz", - "integrity": "sha512-r5o/V/ORTA6TmUnyWZR9nCj1klXCO2CEKNRlVuJptZe85QuhFayC7WeMic7ndayT5IRIR0S0xFxFi2ousartlQ==", - "dev": true + "integrity": "sha512-r5o/V/ORTA6TmUnyWZR9nCj1klXCO2CEKNRlVuJptZe85QuhFayC7WeMic7ndayT5IRIR0S0xFxFi2ousartlQ==" }, "pg-int8": { "version": "1.0.1", "resolved": "https://registry.npmjs.org/pg-int8/-/pg-int8-1.0.1.tgz", - "integrity": "sha512-WCtabS6t3c8SkpDBUlb1kjOs7l66xsGdKpIPZsg4wR+B3+u9UAum2odSsF9tnvxg80h4ZxLWMy4pRjOsFIqQpw==", - "dev": true + "integrity": "sha512-WCtabS6t3c8SkpDBUlb1kjOs7l66xsGdKpIPZsg4wR+B3+u9UAum2odSsF9tnvxg80h4ZxLWMy4pRjOsFIqQpw==" }, "pg-pool": { "version": "3.4.1", "resolved": "https://registry.npmjs.org/pg-pool/-/pg-pool-3.4.1.tgz", "integrity": "sha512-TVHxR/gf3MeJRvchgNHxsYsTCHQ+4wm3VIHSS19z8NC0+gioEhq1okDY1sm/TYbfoP6JLFx01s0ShvZ3puP/iQ==", - "dev": true, "requires": {} }, "pg-protocol": { "version": "1.5.0", "resolved": "https://registry.npmjs.org/pg-protocol/-/pg-protocol-1.5.0.tgz", - "integrity": "sha512-muRttij7H8TqRNu/DxrAJQITO4Ac7RmX3Klyr/9mJEOBeIpgnF8f9jAfRz5d3XwQZl5qBjF9gLsUtMPJE0vezQ==", - "dev": true + "integrity": "sha512-muRttij7H8TqRNu/DxrAJQITO4Ac7RmX3Klyr/9mJEOBeIpgnF8f9jAfRz5d3XwQZl5qBjF9gLsUtMPJE0vezQ==" }, "pg-types": { "version": "2.2.0", "resolved": "https://registry.npmjs.org/pg-types/-/pg-types-2.2.0.tgz", "integrity": "sha512-qTAAlrEsl8s4OiEQY69wDvcMIdQN6wdz5ojQiOy6YRMuynxenON0O5oCpJI6lshc6scgAY8qvJ2On/p+CXY0GA==", - "dev": true, "requires": { "pg-int8": "1.0.1", "postgres-array": "~2.0.0", @@ -34502,7 +34489,6 @@ "version": "1.0.4", "resolved": "https://registry.npmjs.org/pgpass/-/pgpass-1.0.4.tgz", "integrity": "sha512-YmuA56alyBq7M59vxVBfPJrGSozru8QAdoNlWuW3cz8l+UX3cWge0vTvjKhsSHSJpo3Bom8/Mm6hf0TR5GY0+w==", - "dev": true, "requires": { "split2": "^3.1.1" } @@ -34949,26 +34935,22 @@ "postgres-array": { "version": "2.0.0", "resolved": "https://registry.npmjs.org/postgres-array/-/postgres-array-2.0.0.tgz", - "integrity": "sha512-VpZrUqU5A69eQyW2c5CA1jtLecCsN2U/bD6VilrFDWq5+5UIEVO7nazS3TEcHf1zuPYO/sqGvUvW62g86RXZuA==", - "dev": true + "integrity": "sha512-VpZrUqU5A69eQyW2c5CA1jtLecCsN2U/bD6VilrFDWq5+5UIEVO7nazS3TEcHf1zuPYO/sqGvUvW62g86RXZuA==" }, "postgres-bytea": { "version": "1.0.0", "resolved": "https://registry.npmjs.org/postgres-bytea/-/postgres-bytea-1.0.0.tgz", - "integrity": "sha1-AntTPAqokOJtFy1Hz5zOzFIazTU=", - "dev": true + "integrity": "sha1-AntTPAqokOJtFy1Hz5zOzFIazTU=" }, "postgres-date": { "version": "1.0.7", "resolved": "https://registry.npmjs.org/postgres-date/-/postgres-date-1.0.7.tgz", - "integrity": "sha512-suDmjLVQg78nMK2UZ454hAG+OAW+HQPZ6n++TNDUX+L0+uUlLywnoxJKDou51Zm+zTCjrCl0Nq6J9C5hP9vK/Q==", - "dev": true + "integrity": "sha512-suDmjLVQg78nMK2UZ454hAG+OAW+HQPZ6n++TNDUX+L0+uUlLywnoxJKDou51Zm+zTCjrCl0Nq6J9C5hP9vK/Q==" }, "postgres-interval": { "version": "1.2.0", "resolved": "https://registry.npmjs.org/postgres-interval/-/postgres-interval-1.2.0.tgz", "integrity": "sha512-9ZhXKM/rw350N1ovuWHbGxnGh/SNJ4cnxHiM0rxE4VN41wsg8P8zWn9hv/buK00RP4WvlOyr/RBDiptyxVbkZQ==", - "dev": true, "requires": { "xtend": "^4.0.0" } diff --git a/packages/cron/README.md b/packages/cron/README.md index 071b248909..a9af52a8c8 100644 --- a/packages/cron/README.md +++ b/packages/cron/README.md @@ -15,8 +15,22 @@ To run this locally you will need the following in your `packages/cron/.env` fil ```ini ENV=dev +DATABASE=postgres + +# PostgREST API URL +DEV_PG_REST_URL=http://localhost:3000 +# PostgREST API token, for role "postgres", using secret value PGRST_JWT_SECRET from './postgres/docker/docker-compose.yml' +# https://postgrest.org/en/v8.0/tutorials/tut1.html#step-3-sign-a-token +DEV_PG_REST_JWT=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJyb2xlIjoicG9zdGdyZXMifQ.oM0SXF31Vs1nfwCaDxjlczE237KcNKhTpKEYxMX-jEU + +# Connection string for locally running postgres used in tests +DEV_PG_CONNECTION=postgres://postgres:postgres@127.0.0.1:5432/postgres + +# Cluster CLUSTER_API_URL=http://127.0.0.1:9094/ CLUSTER_IPFS_PROXY_API_URL=http://127.0.0.1:9095/api/v0/ + +# Fauna DEV_FAUNA_KEY="" ``` @@ -31,25 +45,3 @@ Run the job: ```sh npm run start:pins ``` - -### pinata - -Fetch the oldest 600 PinRequests from the DB and pin them on PiΓ±ata - -To run this locally you will need the following in your `packages/cron/.env` file: - -```ini -ENV=dev -DEV_FAUNA_KEY="" -PINATA_JWT="" -``` - -You also need to have: - -- a dev account and db set up on FaunaDB with the latest schema imported as per [../db/README.md](../db/README.md) - -Run the job: - -```sh -npm run start:pinata -``` \ No newline at end of file diff --git a/packages/cron/package.json b/packages/cron/package.json index bfa4a6095b..f102fb2fde 100644 --- a/packages/cron/package.json +++ b/packages/cron/package.json @@ -9,7 +9,11 @@ "start": "run-s start:*", "start:metrics": "node src/bin/metrics.js", "start:pins": "node src/bin/pins.js", - "start:pinata": "node src/bin/pinata.js" + "start:dagcargo:views": "NODE_TLS_REJECT_UNAUTHORIZED=0 node src/bin/dagcargo-views.js", + "test": "npm-run-all -p -r mock:cluster mock:pgrest test:e2e", + "test:e2e": "mocha test/*.spec.js --exit", + "mock:cluster": "smoke -p 9094 test/mocks/cluster", + "mock:pgrest": "smoke -p 9087 test/mocks/pgrest" }, "author": "Alan Shaw", "license": "(Apache-2.0 AND MIT)", @@ -20,12 +24,16 @@ "debug": "^4.3.1", "dotenv": "^9.0.2", "limiter": "2.0.1", + "pg": "^8.7.1", "node-fetch": "^2.6.1", "p-retry": "^4.6.1", "piggybacker": "^2.0.0" }, "devDependencies": { "@types/node": "^16.3.1", - "npm-run-all": "^4.1.5" + "execa": "^5.1.1", + "mocha": "^8.3.2", + "npm-run-all": "^4.1.5", + "smoke": "^3.1.1" } } diff --git a/packages/cron/src/bin/dagcargo-views.js b/packages/cron/src/bin/dagcargo-views.js new file mode 100644 index 0000000000..48e51bf31d --- /dev/null +++ b/packages/cron/src/bin/dagcargo-views.js @@ -0,0 +1,19 @@ +#!/usr/bin/env node + +import dotenv from 'dotenv' +import { refreshMaterializedViews } from '../jobs/dagcargo.js' +import { getPg } from '../lib/utils.js' + +async function main () { + const pg = getPg(process.env) + await pg.connect() + + try { + await refreshMaterializedViews({ pg }) + } finally { + await pg.end() + } +} + +dotenv.config() +main() diff --git a/packages/cron/src/jobs/dagcargo.js b/packages/cron/src/jobs/dagcargo.js new file mode 100644 index 0000000000..c910ffd7d6 --- /dev/null +++ b/packages/cron/src/jobs/dagcargo.js @@ -0,0 +1,26 @@ +import debug from 'debug' + +/** + * Refreshes the materialized views. + * + * @param {{ pg: import('pg').Client }} config + */ +export async function refreshMaterializedViews ({ pg }) { + const log = debug('dagcargo:refreshMaterializedViews') + if (!log.enabled) { + console.log( + 'ℹ️ Enable logging by setting DEBUG=dagcargo:refreshMaterializedViews' + ) + } + + log('πŸ” REFRESH MATERIALIZED VIEW CONCURRENTLY public.deal;') + await pg.query('REFRESH MATERIALIZED VIEW CONCURRENTLY public.deal;') + + log('πŸ” REFRESH MATERIALIZED VIEW CONCURRENTLY public.aggregate;') + await pg.query('REFRESH MATERIALIZED VIEW CONCURRENTLY public.aggregate;') + + log('πŸ” REFRESH MATERIALIZED VIEW CONCURRENTLY public.aggregate_entry;') + await pg.query('REFRESH MATERIALIZED VIEW CONCURRENTLY public.aggregate_entry;') + + log('βœ… Done') +} diff --git a/packages/cron/src/jobs/pins.js b/packages/cron/src/jobs/pins.js index 61bf677ad0..93f9b508d3 100644 --- a/packages/cron/src/jobs/pins.js +++ b/packages/cron/src/jobs/pins.js @@ -1,75 +1,18 @@ import debug from 'debug' -import { gql } from '@web3-storage/db' import { toPinStatusEnum } from '@web3-storage/api/src/utils/pin.js' import retry from 'p-retry' import { piggyback } from 'piggybacker' +const MAX_PIN_REQUESTS_PER_RUN = 600 const log = debug('pins:updatePinStatuses') -const FIND_PIN_SYNC_REQUESTS = gql` - query FindPinSyncRequests($to: Time, $after: String) { - findPinSyncRequests(to: $to, _size: 1000, _cursor: $after) { - data { - _id - pin { - _id - content { - _id - cid - dagSize - } - location { - peerId - } - status - created - } - } - after - } - } -` - -const UPDATE_PINS = gql` - mutation UpdatePins($pins: [UpdatePinInput!]!) { - updatePins(pins: $pins) { - _id - } - } -` - -const UPDATE_CONTENT_DAG_SIZE = gql` - mutation UpdateContentDagSize($content: ID!, $dagSize: Long!) { - updateContentDagSize(content: $content, dagSize: $dagSize) { - _id - } - } -` - -const DELETE_PIN_SYNC_REQUESTS = gql` - mutation DeletePinSyncRequests($requests: [ID!]!) { - deletePinSyncRequests(requests: $requests) { - _id - } - } -` - -const CREATE_PIN_SYNC_REQUESTS = gql` - mutation CreatePinSyncRequests($pins: [ID!]!) { - createPinSyncRequests(pins: $pins) { - _id - } - } -` - /** * @param {{ * cluster: import('@nftstorage/ipfs-cluster').Cluster * db: import('@web3-storage/db').DBClient - * ipfs: import('../lib/ipfs').IPFS * }} config */ -export async function updatePinStatuses ({ cluster, db, ipfs }) { +export async function updatePinStatuses ({ cluster, db }) { if (!log.enabled) { console.log('ℹ️ Enable logging by setting DEBUG=pins:updatePinStatuses') } @@ -78,10 +21,6 @@ export async function updatePinStatuses ({ cluster, db, ipfs }) { // multiple times about the same CID. /** @type {Map} */ const statusCache = new Map() - // List of CIDs that we already updated the DAG size for and don't need to do - // get the size or update again. - /** @type {Set} */ - const updatedDagSizes = new Set() const getPinStatus = piggyback( async cid => { @@ -99,18 +38,26 @@ export async function updatePinStatuses ({ cluster, db, ipfs }) { ) const to = new Date().toISOString() + const size = MAX_PIN_REQUESTS_PER_RUN let queryRes, after let i = 0 while (true) { - queryRes = await retry(() => db.query(FIND_PIN_SYNC_REQUESTS, { to, after }), { onFailedAttempt: log }) - const requests = queryRes.findPinSyncRequests.data + queryRes = await retry(() => db.getPinSyncRequests({ to, after, size }), { onFailedAttempt: log }) + + const requests = queryRes.data log(`πŸ“₯ Processing ${i} -> ${i + requests.length}`) - const checkDagSizePins = [] const reSyncPins = [] let pinUpdates = await Promise.all(requests.map(async req => { const { pin } = req - const peerMap = await getPinStatus(pin.content.cid) + let peerMap + + try { + peerMap = await getPinStatus(pin.contentCid) + } catch (err) { + reSyncPins.push(pin) + return null // Cluster could not find the content, please check later + } if (!peerMap[pin.location.peerId]) { return null // not tracked by our cluster @@ -123,59 +70,42 @@ export async function updatePinStatuses ({ cluster, db, ipfs }) { } if (status === pin.status) { - log(`πŸ™… ${pin.content.cid}@${pin.location.peerId}: No status change (${status})`) + log(`πŸ™… ${pin.contentCid}@${pin.location.peerId}: No status change (${status})`) return null } - if (status === 'Pinned' && !pin.content.dagSize && !updatedDagSizes.has(pin.content.cid)) { - checkDagSizePins.push(pin) - updatedDagSizes.add(pin.content.cid) - } + log(`πŸ“Œ ${pin.contentCid}@${pin.location.peerId}: ${pin.status} => ${status}`) - log(`πŸ“Œ ${pin.content.cid}@${pin.location.peerId}: ${pin.status} => ${status}`) - return { pin: pin._id, status: status } + return { + id: pin._id, + status: status, + content_cid: pin.contentCid, + pin_location_id: pin.location._id, + updated_at: new Date().toISOString() + } })) + pinUpdates = pinUpdates.filter(Boolean) log(`⏳ Updating ${pinUpdates.length} pins...`) if (pinUpdates.length) { - await retry(() => db.query(UPDATE_PINS, { - pins: pinUpdates - }), { onFailedAttempt: log }) + await retry(() => db.upsertPins(pinUpdates), { onFailedAttempt: log }) } log(`βœ… Updated ${pinUpdates.filter(Boolean).length} pins...`) - log(`⏳ Re-queuing ${reSyncPins.length} pin sync requests...`) - if (reSyncPins.length) { - await retry(() => db.query(CREATE_PIN_SYNC_REQUESTS, { - pins: reSyncPins.map(p => p._id) - }), { onFailedAttempt: log }) - } - log(`βœ… Re-queued ${reSyncPins.length} pin sync requests...`) - log(`⏳ Removing ${requests.length} pin sync requests...`) if (requests.length) { - await retry(() => db.query(DELETE_PIN_SYNC_REQUESTS, { - requests: requests.map(r => r._id) - }), { onFailedAttempt: log }) + await retry(() => db.deletePinSyncRequests(requests.map(r => r._id)), { onFailedAttempt: log }) } log(`βœ… Removed ${requests.length} pin sync requests...`) - await Promise.all(checkDagSizePins.map(async pin => { - log(`⏳ ${pin.content.cid}: Querying DAG size...`) - let dagSize - try { - // Note: this will timeout for large DAGs - dagSize = await ipfs.dagSize(pin.content.cid, { timeout: 10 * 60000 }) - log(`πŸ›„ ${pin.content.cid}@${pin.location.peerId}: ${dagSize} bytes`) - await retry(() => db.query(UPDATE_CONTENT_DAG_SIZE, { content: pin.content._id, dagSize }), { onFailedAttempt: log }) - } catch (err) { - log(`πŸ’₯ ${pin.content.cid}@${pin.location.peerId}: Failed to update DAG size`) - log(err) - } - })) + log(`⏳ Re-queuing ${reSyncPins.length} pin sync requests...`) + if (reSyncPins.length) { + await retry(() => db.createPinSyncRequests(reSyncPins.map(p => p._id)), { onFailedAttempt: log }) + } + log(`βœ… Re-queued ${reSyncPins.length} pin sync requests...`) - after = queryRes.findPinSyncRequests.after + after = queryRes.after if (!after) break i += requests.length } diff --git a/packages/cron/src/lib/utils.js b/packages/cron/src/lib/utils.js index e8caeeff5c..f04677cea3 100644 --- a/packages/cron/src/lib/utils.js +++ b/packages/cron/src/lib/utils.js @@ -1,3 +1,4 @@ +import pg from 'pg' import { Cluster } from '@nftstorage/ipfs-cluster' import { DBClient } from '@web3-storage/db' import { IPFS } from './ipfs.js' @@ -33,6 +34,14 @@ export function getClusterIPFSProxy (env) { * @param {Record} env */ export function getDBClient (env) { + if (env.DATABASE === 'postgres') { + return getDBPostgresClient(env) + } + + return getDBFaunaClient(env) +} + +function getDBFaunaClient (env) { let token if (env.ENV === 'production') { if (!env.FAUNA_KEY) throw new Error('missing FAUNA_KEY environment var') @@ -48,3 +57,43 @@ export function getDBClient (env) { } return new DBClient({ token }) } + +function getDBPostgresClient (env) { + let token, endpoint + if (env.ENV === 'production') { + if (!env.PG_REST_JWT) throw new Error('missing PG_REST_JWT environment var') + if (!env.PG_REST_URL) throw new Error('missing PG_REST_URL environment var') + token = env.PG_REST_JWT + endpoint = env.PG_REST_URL + } else if (env.ENV === 'staging') { + if (!env.STAGING_PG_REST_JWT) throw new Error('missing STAGING_PG_REST_JWT environment var') + if (!env.STAGING_PG_REST_URL) throw new Error('missing STAGING_PG_REST_URL environment var') + token = env.STAGING_PG_REST_JWT + endpoint = env.STAGING_PG_REST_URL + } else if (env.ENV === 'dev') { + if (!env.DEV_PG_REST_JWT) throw new Error('missing DEV_PG_REST_JWT environment var') + if (!env.DEV_PG_REST_URL) throw new Error('missing DEV_PG_REST_URL environment var') + token = env.DEV_FAUNA_KEY + endpoint = env.DEV_PG_REST_URL + } else { + throw new Error(`unsupported environment ${env.ENV}`) + } + return new DBClient({ token, endpoint, postgres: true }) +} + +/** + * Create a new Postgres client instance from the passed environment variables. + * @param {Record} env + */ +export function getPg (env) { + let connectionString + if (env.ENV === 'production') { + connectionString = env.PROD_PG_CONNECTION + } else if (env.ENV === 'staging') { + connectionString = env.STAGING_PG_CONNECTION + } else { + connectionString = env.PG_CONNECTION + } + if (!connectionString) throw new Error('missing Postgres connection string') + return new pg.Client({ connectionString }) +} diff --git a/packages/cron/test/mocks/cluster/get_pins#@cid.js b/packages/cron/test/mocks/cluster/get_pins#@cid.js new file mode 100644 index 0000000000..fbaf54bb78 --- /dev/null +++ b/packages/cron/test/mocks/cluster/get_pins#@cid.js @@ -0,0 +1,21 @@ +/** + * https://github.com/sinedied/smoke#javascript-mocks + * @param {{ params: Record }} request + */ +module.exports = async ({ params }) => { + return { + statusCode: 200, + headers: { 'Content-Type': 'application/json' }, + body: { + cid: { '/': params.cid }, + name: 'test-pin-name', + peer_map: { + 'test-peer-id': { + peername: 'test-peer-name', + status: 'pinned', + timestamp: new Date().toISOString() + } + } + } + } +} diff --git a/packages/cron/test/mocks/package.json b/packages/cron/test/mocks/package.json new file mode 100644 index 0000000000..024ddd1265 --- /dev/null +++ b/packages/cron/test/mocks/package.json @@ -0,0 +1,5 @@ +{ + "name": "mocks", + "version": "1.0.0", + "description": "just here to fix cjs loading" +} diff --git a/packages/cron/test/mocks/pgrest/delete_pin_sync_request.json b/packages/cron/test/mocks/pgrest/delete_pin_sync_request.json new file mode 100644 index 0000000000..98470228c4 --- /dev/null +++ b/packages/cron/test/mocks/pgrest/delete_pin_sync_request.json @@ -0,0 +1,3 @@ +[{ + "id": 1 +}] \ No newline at end of file diff --git a/packages/cron/test/mocks/pgrest/get_pin_sync_request.json b/packages/cron/test/mocks/pgrest/get_pin_sync_request.json new file mode 100644 index 0000000000..a69b83d418 --- /dev/null +++ b/packages/cron/test/mocks/pgrest/get_pin_sync_request.json @@ -0,0 +1,62 @@ +[ + { + "_id": 1, + "pin": { + "_id": 1, + "status": "Pinning", + "contentCid": "bafy1", + "created:": "2021-07-14T19:27:14.934572Z", + "location": { + "_id": 1, + "peerId": "test-peer-id", + "peerName": "test-peer-name", + "region": "region" + } + } + }, + { + "_id": 2, + "pin": { + "_id": 2, + "status": "Pinning", + "contentCid": "bafy1", + "created:": "2021-07-14T19:27:14.934572Z", + "location": { + "_id": 2, + "peerId": "test-peer-id-2", + "peerName": "test-peer-name-2", + "region": "region" + } + } + }, + { + "_id": 3, + "pin": { + "_id": 3, + "status": "Pinning", + "contentCid": "bafy3", + "created:": "2021-07-14T19:27:14.934572Z", + "location": { + "_id": 1, + "peerId": "test-peer-id", + "peerName": "test-peer-name", + "region": "region" + } + } + }, + { + "_id": 4, + "pin": { + "_id": 4, + "status": "Pinning", + "contentCid": "bafy4", + "created:": "2021-07-14T19:27:14.934572Z", + "location": { + "_id": 1, + "peerId": "test-peer-id", + "peerName": "test-peer-name", + "region": "region" + } + } + } +] \ No newline at end of file diff --git a/packages/cron/test/mocks/pgrest/post_pin.json b/packages/cron/test/mocks/pgrest/post_pin.json new file mode 100644 index 0000000000..95fc8f9b9a --- /dev/null +++ b/packages/cron/test/mocks/pgrest/post_pin.json @@ -0,0 +1,3 @@ +{ + "id": 1 +} \ No newline at end of file diff --git a/packages/cron/test/mocks/pgrest/post_pin_sync_request.json b/packages/cron/test/mocks/pgrest/post_pin_sync_request.json new file mode 100644 index 0000000000..95fc8f9b9a --- /dev/null +++ b/packages/cron/test/mocks/pgrest/post_pin_sync_request.json @@ -0,0 +1,3 @@ +{ + "id": 1 +} \ No newline at end of file diff --git a/packages/cron/test/pins.spec.js b/packages/cron/test/pins.spec.js new file mode 100644 index 0000000000..422b0a5fcf --- /dev/null +++ b/packages/cron/test/pins.spec.js @@ -0,0 +1,33 @@ +/* eslint-env mocha */ +import execa from 'execa' +import assert from 'assert' + +const env = { + DEBUG: '*', + ENV: 'dev', + CLUSTER_API_URL: 'http://localhost:9094', + CLUSTER_IPFS_PROXY_API_URL: 'http://127.0.0.1:9095/api/v0/', + CLUSTER_BASIC_AUTH_TOKEN: 'test', + DATABASE: 'postgres', + DEV_PG_REST_URL: 'http://localhost:9087', + DEV_PG_REST_JWT: 'eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJpc3MiOiJzdXBhYmFzZSIsImlhdCI6MTYwMzk2ODgzNCwiZXhwIjoyNTUwNjUzNjM0LCJyb2xlIjoic2VydmljZV9yb2xlIn0.necIJaiP7X2T2QjGeV-FhpkizcNTX8HjDDBAxpgQTEI' +} + +describe('cron - pins', () => { + it('can be executed', async () => { + const { stderr } = execa.sync('./src/bin/pins.js', { env }) + assert.match(stderr, /pins:updatePinStatuses πŸ“₯ Processing 0 -> 4/) + assert.match(stderr, /pins:updatePinStatuses ⏳ bafy1: Checking status/) + assert.match(stderr, /pins:updatePinStatuses ⏳ bafy3: Checking status/) + assert.match(stderr, /pins:updatePinStatuses ⏳ bafy4: Checking status/) + assert.match(stderr, /pins:updatePinStatuses πŸ“Œ bafy1@test-peer-id: Pinning => Pinned/) + assert.match(stderr, /pins:updatePinStatuses πŸ“Œ bafy4@test-peer-id: Pinning => Pinned/) + assert.match(stderr, /pins:updatePinStatuses πŸ“Œ bafy3@test-peer-id: Pinning => Pinned/) + assert.match(stderr, /pins:updatePinStatuses ⏳ Updating 3 pins/) + assert.match(stderr, /pins:updatePinStatuses βœ… Updated 3 pins/) + assert.match(stderr, /pins:updatePinStatuses ⏳ Removing 4 pin sync requests/) + assert.match(stderr, /pins:updatePinStatuses βœ… Removed 4 pin sync requests/) + assert.match(stderr, /pins:updatePinStatuses ⏳ Re-queuing 0 pin sync requests/) + assert.match(stderr, /pins:updatePinStatuses βœ… Re-queued 0 pin sync requests/) + }) +}) diff --git a/packages/db/db-client-types.ts b/packages/db/db-client-types.ts index 0a01b15e52..3bb1d6acf3 100644 --- a/packages/db/db-client-types.ts +++ b/packages/db/db-client-types.ts @@ -60,6 +60,11 @@ export type AuthKeyItemOutput = { } // Pin +export type PinsUpsertInput = { + id: definitions['pin']['id'] + status: definitions['pin']['status'] +} + export type PinItem = { _id: definitions['pin']['id'] status: definitions['pin']['status'] @@ -83,6 +88,33 @@ export type PinItemOutput = { region: definitions['pin_location']['region'] } +export type PinRequestItemOutput = { + _id: definitions['pin_request']['id'] + cid: definitions['pin_request']['content_cid'] + created: definitions['pin_request']['inserted_at'] +} + +export type PinSyncRequestItem = { + _id: definitions['pin_sync_request']['id'] + pin: { + _id: definitions['pin']['id'] + status: definitions['pin']['status'] + contentCid: definitions['pin']['content_cid'] + created: definitions['pin']['inserted_at'] + location: { + _id?: definitions['pin_location']['id'] + peerId: definitions['pin_location']['peer_id'] + peerName?: definitions['pin_location']['peer_name'] + region?: definitions['pin_location']['region'] + } + } +} + +export type PinSyncRequestOutput = { + data: Array + after: definitions['pin']['inserted_at'] +} + // Backup export type BackupOutput = { _id: definitions['backup']['id'] diff --git a/packages/db/fauna/client.js b/packages/db/fauna/client.js index 80df8059a7..dc34372534 100644 --- a/packages/db/fauna/client.js +++ b/packages/db/fauna/client.js @@ -265,7 +265,7 @@ export class FaunaClient { * Upsert pin. * * @param {string} cid - * @param {import('../db-client-types').PinItemOutput} pin + * @param {import('../db-client-types').PinsUpsertInput} pin * @return {Promise} */ async upsertPin (cid, pin) { @@ -280,6 +280,21 @@ export class FaunaClient { return res } + /** + * Upsert given pin status. + * + * @param {Array} pins + */ + async upsertPins (pins) { + await this.query(gql` + mutation UpdatePins($pins: [UpdatePinInput!]!) { + updatePins(pins: $pins) { + _id + } + } + `, { pins }) + } + /** * Get Pins for a cid * @@ -290,6 +305,126 @@ export class FaunaClient { throw new Error('not implemented in fauna') } + /** + * Get All Pin requests. + * + * @param {Object} [options] + * @param {number} [options.size = 600] + * @return {Promise>} + */ + async getPinRequests ({ size = 600 } = {}) { + const res = await this.query(gql` + query FindAllPinRequests($size: Int!) { + findAllPinRequests(_size: $size) { + data { + _id + cid + created + } + } + } + `, { size }) + + return res.findAllPinRequests.data + } + + /** + * Delete pin requests with provided ids. + * + * @param {Array} ids + * @return {Promise} + */ + async deletePinRequests (ids) { + await this.query(gql` + mutation DeletePinRequests($requests: [ID!]!) { + deletePinRequests(requests: $requests){ + _id + } + } + `, { requests: ids }) + } + + /** + * Create pin sync requests. + * + * @param {Array} pinSyncRequests + */ + async createPinSyncRequests (pinSyncRequests) { + await this.query(gql` + mutation CreatePinSyncRequests($pins: [ID!]!) { + createPinSyncRequests(pins: $pins) { + _id + } + } + `, { pins: pinSyncRequests }) + } + + /** + * Get All Pin Sync requests. + * + * @param {Object} [options] + * @param {string} [options.to] + * @param {string} [options.afer] + * @return {Promise>} + */ + async getPinSyncRequests ({ to, after } = {}) { + const res = await this.query(gql` + query FindPinSyncRequests($to: Time, $after: String) { + findPinSyncRequests(to: $to, _size: 1000, _cursor: $after) { + data { + _id + pin { + _id + content { + _id + cid + dagSize + } + location { + peerId + } + status + created + } + } + after + } + } + `, { to, after }) + + return { + data: res.findPinSyncRequests.data.map(req => ({ + _id: req._id, + pin: { + _id: req.pin._id, + status: req.pin.status, + created: req.pin.created, + contentCid: req.pin.content.cid, + location: { + peerId: req.pin.location.peerId + } + } + })), + after: res.findPinSyncRequests.after + } + } + + /** + * Delete pin sync requests with provided ids. + * + * @param {Array} ids + * @return {Promise} + */ + async deletePinSyncRequests (ids) { + await this.query(gql` + mutation DeletePinSyncRequests($requests: [ID!]!) { + deletePinSyncRequests(requests: $requests) { + _id + } + } + `, { requests: ids }) + } + /** * Get deals for a cid * diff --git a/packages/db/index.d.ts b/packages/db/index.d.ts index b47ea76681..d23359063c 100644 --- a/packages/db/index.d.ts +++ b/packages/db/index.d.ts @@ -1,5 +1,6 @@ import { gql } from 'graphql-request' import { RequestDocument } from 'graphql-request/dist/types' +import { PostgrestClient } from '@supabase/postgrest-js' import type { UpsertUserInput, @@ -16,6 +17,9 @@ import type { AuthKey, AuthKeyItemOutput, PinItemOutput, + PinRequestItemOutput, + PinSyncRequestOutput, + PinsUpsertInput, BackupOutput } from './db-client-types' @@ -23,6 +27,7 @@ export { gql } export class DBClient { constructor(config: { endpoint?: string; token: string, postgres?: boolean }) + client: PostgrestClient upsertUser (user: UpsertUserInput): Promise getUser (issuer: string): Promise getUsedStorage (userId: number): Promise @@ -34,7 +39,13 @@ export class DBClient { getStatus (cid: string): Promise getBackups(uploadId: number): Promise> upsertPin (cid: string, pin: PinItemOutput): Promise + upsertPins (pins: Array): Promise getPins (cid: string): Promise> + getPinRequests ({ size }: { size: number }): Promise> + deletePinRequests (ids: Array): Promise + createPinSyncRequests (pinSyncRequests: Array): Promise + getPinSyncRequests ({ to, after }: { to: string, after: string }): Promise + deletePinSyncRequests (ids: Array): Promise getDeals (cid: string): Promise getDealsForCids (cids: string[]): Promise> createKey (key: CreateAuthKeyInput): Promise diff --git a/packages/db/index.js b/packages/db/index.js index 6c6d19f4a0..0923a4a9b8 100644 --- a/packages/db/index.js +++ b/packages/db/index.js @@ -132,6 +132,15 @@ export class DBClient { return this._client.upsertPin(cid, pin) } + /** + * Upsert given pin status. + * + * @param {Array} pins + */ + upsertPins (pins) { + return this._client.upsertPins(pins) + } + /** * Get Pins for a cid * @@ -142,6 +151,58 @@ export class DBClient { return this._client.getPins(cid) } + /** + * Get All Pin requests. + * + * @param {Object} [options] + * @param {number} [options.size = 600] + * @return {Promise>} + */ + getPinRequests () { + return this._client.getPinRequests() + } + + /** + * Delete pin requests with provided ids. + * + * @param {Array} ids + * @return {Promise} + */ + deletePinRequests (ids) { + return this._client.deletePinRequests(ids) + } + + /** + * Create pin sync requests. + * + * @param {Array} pinSyncRequests + */ + createPinSyncRequests (pinSyncRequests) { + return this._client.createPinSyncRequests(pinSyncRequests) + } + + /** + * Get All Pin Sync requests. + * + * @param {Object} [options] + * @param {string} [options.to] + * @param {string} [options.afer] + * @return {Promise>} + */ + getPinSyncRequests (options) { + return this._client.getPinSyncRequests(options) + } + + /** + * Delete pin sync requests with provided ids. + * + * @param {Array} ids + * @return {Promise} + */ + deletePinSyncRequests (ids) { + return this._client.deletePinSyncRequests(ids) + } + /** * Get deals for a cid * diff --git a/packages/db/postgres/README.md b/packages/db/postgres/README.md index 26f09afd07..61354d7eb9 100644 --- a/packages/db/postgres/README.md +++ b/packages/db/postgres/README.md @@ -67,6 +67,6 @@ node scripts/cli.js db --stop --clean --project web3-storage ## Database Diagram -![image](https://user-images.githubusercontent.com/7295071/135415822-be854ec5-d1e2-4588-a951-f287b60b65be.png) +![image](https://user-images.githubusercontent.com/7295071/137729026-50aebb55-e89c-45ed-b636-b3e39cc53cc0.png) Powered by [dbdiagram.io](https://dbdiagram.io/d/61546519825b5b014618caf6). diff --git a/packages/db/postgres/client.js b/packages/db/postgres/client.js index 5490b35561..cfb520ccf7 100644 --- a/packages/db/postgres/client.js +++ b/packages/db/postgres/client.js @@ -343,10 +343,10 @@ export class PostgresClient { } /** - * Upsert pin. + * Upsert pin of a given cid. * * @param {string} cid - * @param {import('../db-client-types').PinItemOutput} pin + * @param {import('../db-client-types').PinsUpsertInput} pin * @return {Promise} */ async upsertPin (cid, pin) { @@ -372,6 +372,21 @@ export class PostgresClient { return pinId } + /** + * Upsert given pin status. + * + * @param {Array} pins + */ + async upsertPins (pins) { + const { error } = await this._client + .from('pin') + .upsert(pins, { count: 'exact', returning: 'minimal' }) + + if (error) { + throw new DBError(error) + } + } + /** * Get Pins for a cid * @@ -398,6 +413,129 @@ export class PostgresClient { return normalizePins(pins) } + /** + * Get All Pin requests. + * + * @param {Object} [options] + * @param {number} [options.size = 600] + * @return {Promise>} + */ + async getPinRequests ({ size = 600 } = {}) { + /** @type {{ data: Array, error: Error }} */ + const { data: pinReqs, error } = await this._client + .from('pin_request') + .select(` + _id:id, + cid:content_cid, + created:inserted_at + `) + .limit(size) + + if (error) { + throw new DBError(error) + } + + return pinReqs + } + + /** + * Delete pin requests with provided ids. + * + * @param {Array} ids + * @return {Promise} + */ + async deletePinRequests (ids) { + /** @type {{ error: Error }} */ + const { error } = await this._client + .from('pin_request') + .delete() + .in('id', ids) + + if (error) { + throw new DBError(error) + } + } + + /** + * Create pin sync requests. + * + * @param {Array} pinSyncRequests + */ + async createPinSyncRequests (pinSyncRequests) { + /** @type {{ error: Error }} */ + const { error } = await this._client + .from('pin_sync_request') + .upsert(pinSyncRequests.map(psr => ({ + pin_id: psr, + inserted_at: new Date().toISOString() + })), { + onConflict: 'pin_id' + }) + + if (error) { + throw new DBError(error) + } + } + + /** + * Get All Pin Sync requests. + * + * @param {Object} [options] + * @param {string} [options.to] + * @param {string} [options.afer] + * @param {number} [options.size] + * @return {Promise} + */ + async getPinSyncRequests ({ to, after, size }) { + let query = this._client + .from('pin_sync_request') + .select(` + _id:id, + pin:pin(_id:id, status, contentCid:content_cid, created:inserted_at, location:pin_location(_id:id, peerId:peer_id, peerName:peer_name, region)) + `) + .order( + 'inserted_at', + { ascending: true } + ) + .limit(size) + + if (to) { + query = query.lt('inserted_at', to) + } + if (after) { + query = query.gte('inserted_at', after) + } + + /** @type {{ data: Array, error: Error }} */ + const { data: pinSyncReqs, error } = await query + if (error) { + throw new DBError(error) + } + + return { + data: pinSyncReqs, + after: !!size && pinSyncReqs.length === size && pinSyncReqs[0].pin.created // return after if more + } + } + + /** + * Delete pin sync requests with provided ids. + * + * @param {Array} ids + * @return {Promise} + */ + async deletePinSyncRequests (ids) { + /** @type {{ error: Error }} */ + const { error } = await this._client + .from('pin_sync_request') + .delete() + .in('id', ids) + + if (error) { + throw new DBError(error) + } + } + /** * Get deals for a cid * diff --git a/packages/db/postgres/functions.sql b/packages/db/postgres/functions.sql index 326852cfad..4e097ee08d 100644 --- a/packages/db/postgres/functions.sql +++ b/packages/db/postgres/functions.sql @@ -22,6 +22,7 @@ $$ DECLARE backup_url TEXT; pin json; + pin_result_id BIGINT; pin_location_result_id BIGINT; inserted_upload_id BIGINT; BEGIN @@ -32,6 +33,14 @@ BEGIN (data ->> 'updated_at')::timestamptz, (data ->> 'inserted_at')::timestamptz) ON CONFLICT ( cid ) DO NOTHING; + + -- Add to pin_request table if new + insert into pin_request (content_cid, attempts, updated_at, inserted_at) + values (data ->> 'content_cid', + 0, + (data ->> 'updated_at')::timestamptz, + (data ->> 'inserted_at')::timestamptz) + ON CONFLICT ( content_cid ) DO NOTHING; -- Iterate over received pins foreach pin in array json_arr_to_json_element_array(data -> 'pins') @@ -53,7 +62,18 @@ BEGIN pin_location_result_id, (data ->> 'updated_at')::timestamptz, (data ->> 'inserted_at')::timestamptz) - ON CONFLICT ( content_cid, pin_location_id ) DO NOTHING; + -- Force update on conflict to get result, otherwise needs a follow up select + ON CONFLICT ( content_cid, pin_location_id ) DO UPDATE + SET "updated_at" = (data ->> 'updated_at')::timestamptz + returning id into pin_result_id; + + -- Create a Pin Sync Request if not pinned + IF (pin ->> 'status')::pin_status_type != ('Pinned')::pin_status_type THEN + insert into pin_sync_request (pin_id, inserted_at) + values (pin_result_id, + (data ->> 'inserted_at')::timestamptz) + ON CONFLICT ( pin_id ) DO NOTHING; + END IF; end loop; insert into upload (user_id, diff --git a/packages/db/postgres/pg-rest-api-types.d.ts b/packages/db/postgres/pg-rest-api-types.d.ts index 88519c977d..2636127d84 100644 --- a/packages/db/postgres/pg-rest-api-types.d.ts +++ b/packages/db/postgres/pg-rest-api-types.d.ts @@ -631,8 +631,10 @@ export interface paths { parameters: { query: { id?: parameters["rowFilter.pin_request.id"]; - pin_id?: parameters["rowFilter.pin_request.pin_id"]; + content_cid?: parameters["rowFilter.pin_request.content_cid"]; + attempts?: parameters["rowFilter.pin_request.attempts"]; inserted_at?: parameters["rowFilter.pin_request.inserted_at"]; + updated_at?: parameters["rowFilter.pin_request.updated_at"]; /** Filtering Columns */ select?: parameters["select"]; /** Ordering */ @@ -684,8 +686,10 @@ export interface paths { parameters: { query: { id?: parameters["rowFilter.pin_request.id"]; - pin_id?: parameters["rowFilter.pin_request.pin_id"]; + content_cid?: parameters["rowFilter.pin_request.content_cid"]; + attempts?: parameters["rowFilter.pin_request.attempts"]; inserted_at?: parameters["rowFilter.pin_request.inserted_at"]; + updated_at?: parameters["rowFilter.pin_request.updated_at"]; }; header: { /** Preference */ @@ -701,8 +705,10 @@ export interface paths { parameters: { query: { id?: parameters["rowFilter.pin_request.id"]; - pin_id?: parameters["rowFilter.pin_request.pin_id"]; + content_cid?: parameters["rowFilter.pin_request.content_cid"]; + attempts?: parameters["rowFilter.pin_request.attempts"]; inserted_at?: parameters["rowFilter.pin_request.inserted_at"]; + updated_at?: parameters["rowFilter.pin_request.updated_at"]; }; body: { /** pin_request */ @@ -719,6 +725,99 @@ export interface paths { }; }; }; + "/pin_sync_request": { + get: { + parameters: { + query: { + id?: parameters["rowFilter.pin_sync_request.id"]; + pin_id?: parameters["rowFilter.pin_sync_request.pin_id"]; + inserted_at?: parameters["rowFilter.pin_sync_request.inserted_at"]; + /** Filtering Columns */ + select?: parameters["select"]; + /** Ordering */ + order?: parameters["order"]; + /** Limiting and Pagination */ + offset?: parameters["offset"]; + /** Limiting and Pagination */ + limit?: parameters["limit"]; + }; + header: { + /** Limiting and Pagination */ + Range?: parameters["range"]; + /** Limiting and Pagination */ + "Range-Unit"?: parameters["rangeUnit"]; + /** Preference */ + Prefer?: parameters["preferCount"]; + }; + }; + responses: { + /** OK */ + 200: { + schema: definitions["pin_sync_request"][]; + }; + /** Partial Content */ + 206: unknown; + }; + }; + post: { + parameters: { + body: { + /** pin_sync_request */ + pin_sync_request?: definitions["pin_sync_request"]; + }; + query: { + /** Filtering Columns */ + select?: parameters["select"]; + }; + header: { + /** Preference */ + Prefer?: parameters["preferReturn"]; + }; + }; + responses: { + /** Created */ + 201: unknown; + }; + }; + delete: { + parameters: { + query: { + id?: parameters["rowFilter.pin_sync_request.id"]; + pin_id?: parameters["rowFilter.pin_sync_request.pin_id"]; + inserted_at?: parameters["rowFilter.pin_sync_request.inserted_at"]; + }; + header: { + /** Preference */ + Prefer?: parameters["preferReturn"]; + }; + }; + responses: { + /** No Content */ + 204: never; + }; + }; + patch: { + parameters: { + query: { + id?: parameters["rowFilter.pin_sync_request.id"]; + pin_id?: parameters["rowFilter.pin_sync_request.pin_id"]; + inserted_at?: parameters["rowFilter.pin_sync_request.inserted_at"]; + }; + body: { + /** pin_sync_request */ + pin_sync_request?: definitions["pin_sync_request"]; + }; + header: { + /** Preference */ + Prefer?: parameters["preferReturn"]; + }; + }; + responses: { + /** No Content */ + 204: never; + }; + }; + }; "/upload": { get: { parameters: { @@ -944,6 +1043,23 @@ export interface paths { }; }; }; + "/rpc/pin_dag_size_total": { + post: { + parameters: { + body: { + args: { [key: string]: unknown }; + }; + header: { + /** Preference */ + Prefer?: parameters["preferParams"]; + }; + }; + responses: { + /** OK */ + 200: unknown; + }; + }; + }; "/rpc/postgres_fdw_handler": { post: { parameters: { @@ -1054,12 +1170,29 @@ export interface paths { }; }; }; - "/rpc/user_used_storage": { + "/rpc/content_dag_size_total": { + post: { + parameters: { + body: { + args: { [key: string]: unknown }; + }; + header: { + /** Preference */ + Prefer?: parameters["preferParams"]; + }; + }; + responses: { + /** OK */ + 200: unknown; + }; + }; + }; + "/rpc/json_arr_to_json_element_array": { post: { parameters: { body: { args: { - query_user_id: number; + _json: string; }; }; header: { @@ -1073,12 +1206,12 @@ export interface paths { }; }; }; - "/rpc/create_upload": { + "/rpc/user_used_storage": { post: { parameters: { body: { args: { - data: string; + query_user_id: number; }; }; header: { @@ -1092,12 +1225,12 @@ export interface paths { }; }; }; - "/rpc/json_arr_to_upload__pin_type_arr": { + "/rpc/create_upload": { post: { parameters: { body: { args: { - _json: string; + data: string; }; }; header: { @@ -1227,6 +1360,21 @@ export interface definitions { region?: string; }; pin_request: { + /** + * Note: + * This is a Primary Key. + */ + id: number; + /** + * Note: + * This is a Foreign Key to `content.cid`. + */ + content_cid: string; + attempts?: number; + inserted_at: string; + updated_at: string; + }; + pin_sync_request: { /** * Note: * This is a Primary Key. @@ -1372,8 +1520,15 @@ export interface parameters { /** pin_request */ "body.pin_request": definitions["pin_request"]; "rowFilter.pin_request.id": string; - "rowFilter.pin_request.pin_id": string; + "rowFilter.pin_request.content_cid": string; + "rowFilter.pin_request.attempts": string; "rowFilter.pin_request.inserted_at": string; + "rowFilter.pin_request.updated_at": string; + /** pin_sync_request */ + "body.pin_sync_request": definitions["pin_sync_request"]; + "rowFilter.pin_sync_request.id": string; + "rowFilter.pin_sync_request.pin_id": string; + "rowFilter.pin_sync_request.inserted_at": string; /** upload */ "body.upload": definitions["upload"]; "rowFilter.upload.id": string; diff --git a/packages/db/postgres/reset.sql b/packages/db/postgres/reset.sql index e226a27cc8..678fe11572 100644 --- a/packages/db/postgres/reset.sql +++ b/packages/db/postgres/reset.sql @@ -4,6 +4,7 @@ DROP TABLE IF EXISTS upload CASCADE; DROP TABLE IF EXISTS pin CASCADE; DROP TABLE IF EXISTS pin_location; DROP TABLE IF EXISTS pin_request; +DROP TABLE IF EXISTS pin_sync_request; DROP TABLE IF EXISTS content; DROP TABLE IF EXISTS backup; DROP TABLE IF EXISTS auth_key; diff --git a/packages/db/postgres/tables.sql b/packages/db/postgres/tables.sql index ee6a8cfa41..7be5114faf 100644 --- a/packages/db/postgres/tables.sql +++ b/packages/db/postgres/tables.sql @@ -148,11 +148,22 @@ CREATE TABLE IF NOT EXISTS backup inserted_at TIMESTAMP WITH TIME ZONE DEFAULT timezone('utc'::text, now()) NOT NULL ); --- A request to keep a Pin in sync with the nodes that are pinning it. +-- Tracks requests to replicate content to more nodes. CREATE TABLE IF NOT EXISTS pin_request +( + id BIGSERIAL PRIMARY KEY, + -- Root CID of the Pin we want to replicate. + content_cid TEXT NOT NULL UNIQUE REFERENCES content (cid), + attempts INT DEFAULT 0, + inserted_at TIMESTAMP WITH TIME ZONE DEFAULT timezone('utc'::text, now()) NOT NULL, + updated_at TIMESTAMP WITH TIME ZONE DEFAULT timezone('utc'::text, now()) NOT NULL +); + +-- A request to keep a Pin in sync with the nodes that are pinning it. +CREATE TABLE IF NOT EXISTS pin_sync_request ( id BIGSERIAL PRIMARY KEY, -- Identifier for the pin to keep in sync. - pin_id BIGINT NOT NULL REFERENCES pin (id), + pin_id BIGINT NOT NULL UNIQUE REFERENCES pin (id), inserted_at TIMESTAMP WITH TIME ZONE DEFAULT timezone('utc'::text, now()) NOT NULL -) +); diff --git a/packages/db/scripts/cmds/db.js b/packages/db/scripts/cmds/db.js index a1896efc2c..073439ad72 100644 --- a/packages/db/scripts/cmds/db.js +++ b/packages/db/scripts/cmds/db.js @@ -21,6 +21,10 @@ const __dirname = path.dirname(fileURLToPath(import.meta.url)) export async function dbCmd ({ project, init, start, stop, clean }) { const composePath = path.join(__dirname, '../../postgres/docker/docker-compose.yml') + if (!project) { + throw new Error('A project must be provided as parameter') + } + if (init) { await execa('docker-compose', [ '--file', diff --git a/packages/db/test/pin-request.spec.js b/packages/db/test/pin-request.spec.js new file mode 100644 index 0000000000..95d1ad98f6 --- /dev/null +++ b/packages/db/test/pin-request.spec.js @@ -0,0 +1,74 @@ +/* eslint-env mocha, browser */ +import assert from 'assert' +import { DBClient } from '../index' + +import { createUser, createUserAuthKey, createUpload, token } from './utils' + +describe('pin-request', () => { + /** @type {DBClient} */ + const client = new DBClient({ + endpoint: 'http://127.0.0.1:3000', + token, + postgres: true + }) + + const cids = [ + 'bafybeiczsscdsbs7aaqz55asqdf3smv6klcw3gofszvwlyarci47bgf350', + 'bafybeiczsscdsbs7aaqz55asqdf3smv6klcw3gofszvwlyarci47bgf351' + ] + + const pins1 = [ + { + status: 'Pinning', + location: { + peerId: '12D3KooWFe387JFDpgNEVCP5ARut7gRkX7YuJCXMStpkq714ziK6', + peerName: 'web3-storage-sv15', + region: 'region' + } + }, + { + status: 'Pinning', + location: { + peerId: '12D3KooWFe387JFDpgNEVCP5ARut7gRkX7YuJCXMStpkq714ziK7', + peerName: 'web3-storage-sv16', + region: 'region' + } + } + ] + let user + let authKey + const uploads = [] + + // Setup testing user + before(async () => { + user = await createUser(client) + authKey = await createUserAuthKey(client, user._id) + }) + + // Guarantee no pin requests exist + before(async () => { + const pinReqs = await client.getPinRequests() + await client.deletePinRequests(pinReqs.map(pr => pr._id)) + }) + + // Setup two uploads: first with default one pin and second with two pins + before(async () => { + const upload0 = await createUpload(client, user._id, authKey, cids[0]) + const upload1 = await createUpload(client, user._id, authKey, cids[1], { pins: pins1 }) + uploads.push(upload0) + uploads.push(upload1) + }) + + it('can get pin requests and delete them', async () => { + const pinReqs = await client.getPinRequests() + assert(pinReqs, 'pin sync requests exist') + // There is only one pin request per upload/content + assert(pinReqs.length, uploads.length, 'created pin requests for each upload') + + await client.deletePinRequests(pinReqs.map(pr => pr._id)) + + const pinReqsAfterUpdate = await client.getPinRequests() + assert(pinReqsAfterUpdate, 'can get pin requests after update') + assert.strictEqual(pinReqsAfterUpdate.length, 0, 'all pin requests were deleted') + }) +}) diff --git a/packages/db/test/pin-sync-request.spec.js b/packages/db/test/pin-sync-request.spec.js new file mode 100644 index 0000000000..53ed5ac88e --- /dev/null +++ b/packages/db/test/pin-sync-request.spec.js @@ -0,0 +1,155 @@ +/* eslint-env mocha, browser */ +import assert from 'assert' +import { DBClient } from '../index' + +import { createUser, createUserAuthKey, createUpload, defaultPinData, token } from './utils' + +describe('pin-sync-request', () => { + /** @type {DBClient} */ + const client = new DBClient({ + endpoint: 'http://127.0.0.1:3000', + token, + postgres: true + }) + + const cids = [ + 'bafybeiczsscdsbs7aaqz55asqdf3smv6klcw3gofszvwlyarci47bgf350', + 'bafybeiczsscdsbs7aaqz55asqdf3smv6klcw3gofszvwlyarci47bgf351', + 'bafybeiczsscdsbs7aaqz55asqdf3smv6klcw3gofszvwlyarci47bgf352' + ] + let user + let authKey + const uploads = [] + + // Setup testing user + before(async () => { + user = await createUser(client) + authKey = await createUserAuthKey(client, user._id) + }) + + // Guarantee no pin sync requests exist + before(async () => { + const to = new Date().toISOString() + const { data: pinSyncReqs } = await client.getPinSyncRequests({ to }) + + await client.deletePinSyncRequests(pinSyncReqs.map(psr => psr._id)) + }) + + // Setup two default uploads + before(async () => { + const upload0 = await createUpload(client, user._id, authKey, cids[0]) + const upload1 = await createUpload(client, user._id, authKey, cids[1]) + + uploads.push(upload0) + uploads.push(upload1) + }) + + it('created pin sync requests for the uploads', async () => { + const to = new Date().toISOString() + + const { data: pinSyncReqs } = await client.getPinSyncRequests({ to }) + assert(pinSyncReqs, 'pin sync requests exist') + + // expect pin sync requests = added pins for each upload where status is not pinned + const expectedPinSyncReqs = defaultPinData.filter(pd => pd.status !== 'Pinned').length * 2 + assert.strictEqual(pinSyncReqs.length, expectedPinSyncReqs, 'created pin sync requests for non pinned entries') + }) + + it('create multiple pin sync requests when upload has multiple pins', async () => { + const pins = [ + { + status: 'Pinning', + location: { + peerId: '12D3KooWFe387JFDpgNEVCP5ARut7gRkX7YuJCXMStpkq714ziK6', + peerName: 'web3-storage-sv15', + region: 'region' + } + }, + { + status: 'Pinning', + location: { + peerId: '12D3KooWFe387JFDpgNEVCP5ARut7gRkX7YuJCXMStpkq714ziK7', + peerName: 'web3-storage-sv16', + region: 'region' + } + }, + { + status: 'Pinned', + location: { + peerId: '12D3KooWFe387JFDpgNEVCP5ARut7gRkX7YuJCXMStpkq714ziK8', + peerName: 'web3-storage-sv17', + region: 'region' + } + } + ] + + await createUpload(client, user._id, authKey, cids[2], { pins }) + const to = new Date().toISOString() + + const { data: pinSyncReqs } = await client.getPinSyncRequests({ to }) + assert(pinSyncReqs, 'pin sync requests exist') + + // From the 2 setup uploads expected pins + const previousExpectedPinSyncReqs = defaultPinData.filter(pd => pd.status !== 'Pinned').length * 2 + // Pins for the new upload + const newPinSyncReqs = pins.filter(pd => pd.status !== 'Pinned').length + assert.strictEqual(pinSyncReqs.length, newPinSyncReqs + previousExpectedPinSyncReqs, 'created pin sync requests for non pinned entries') + }) + + it('can update multiple pin status', async () => { + const to = new Date().toISOString() + const { data: pinSyncReqs } = await client.getPinSyncRequests({ to }) + + // Assert Previous pin state + pinSyncReqs.forEach(psr => { + assert.strictEqual(psr.pin.status, 'Pinning', 'pin sync requests have Pinning state') + }) + + // Update all Pins to Pinned + await client.upsertPins(pinSyncReqs.map(psr => ({ + id: psr.pin._id, + status: 'Pinned', + content_cid: psr.pin.contentCid, + pin_location_id: psr.pin.location._id, + updated_at: new Date().toISOString() + }))) + + const { data: pinSyncReqsAfterUpdate } = await client.getPinSyncRequests({ to }) + + // Assert After pin state + pinSyncReqsAfterUpdate.forEach(psr => { + assert.strictEqual(psr.pin.status, 'Pinned', 'pin sync requests have Pinned state') + }) + }) + + it('can delete pin sync requests', async () => { + const to = new Date().toISOString() + const { data: pinSyncReqs } = await client.getPinSyncRequests({ to }) + + await client.deletePinSyncRequests(pinSyncReqs.map(psr => psr._id)) + + const { data: pinSyncReqsAfterUpdate } = await client.getPinSyncRequests({ to }) + assert(pinSyncReqsAfterUpdate, 'could get pin sync requests') + assert.strictEqual(pinSyncReqsAfterUpdate.length, 0, 'all pin sync requests were deleted') + }) + + it('can create pin sync requests', async () => { + const { data: pinSyncReqs } = await client.getPinSyncRequests({ to: new Date().toISOString() }) + const previousLength = pinSyncReqs.length + + // Get pins + const pins0 = await client.getPins(cids[0]) + const pins1 = await client.getPins(cids[1]) + const pinIds = [ + ...pins0.map(p => p._id), + ...pins1.map(p => p._id) + ] + + // Create pin sync requests + await client.createPinSyncRequests(pinIds) + const { data: pinSyncReqsAfterUpdate } = await client.getPinSyncRequests({ to: new Date().toISOString() }) + + assert(pinSyncReqsAfterUpdate, 'could get pin sync requests') + assert.strictEqual(pinSyncReqsAfterUpdate.length, pinIds.length + previousLength, 'all pin sync requests were created') + }) +}) diff --git a/packages/db/test/utils.js b/packages/db/test/utils.js index 1ecebee4e4..83a19fd17d 100644 --- a/packages/db/test/utils.js +++ b/packages/db/test/utils.js @@ -1 +1,77 @@ export const token = 'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJyb2xlIjoicG9zdGdyZXMifQ.oM0SXF31Vs1nfwCaDxjlczE237KcNKhTpKEYxMX-jEU' + +/** + * @param {import('../index').DBClient} dbClient + * @param {Object} [options] + * @param {string} [options.name] + * @param {string} [options.email] + * @param {string} [options.issuer] + * @param {string} [options.publicAddress] + */ +export async function createUser (dbClient, options = {}) { + const issuer = options.issuer || `issuer${Math.random()}` + await dbClient.upsertUser({ + name: options.name || 'test-name', + email: options.email || 'test@email.com', + issuer, + publicAddress: options.publicAddress || `public_address${Math.random()}` + }) + + return dbClient.getUser(issuer) +} + +/** + * @param {import('../index').DBClient} dbClient + * @param {number} user + * @param {Object} [options] + * @param {string} [options.name] + * @param {string} [options.secret] + */ +export async function createUserAuthKey (dbClient, user, options = {}) { + const { _id } = await dbClient.createKey({ + name: options.name || 'test-key-name', + secret: options.secret || 'test-secret', + user + }) + + return _id +} + +export const defaultPinData = [{ + status: 'Pinning', + location: { + peerId: '12D3KooWFe387JFDpgNEVCP5ARut7gRkX7YuJCXMStpkq714ziK6', + peerName: 'web3-storage-sv15', + region: 'region' + } +}] + +/** + * @param {import('../index').DBClient} dbClient + * @param {number} user + * @param {number} authKey + * @param {string} cid + * @param {Object} [options] + * @param {string} [options.type] + * @param {number} [options.dagSize] + * @param {string} [options.name] + * @param {Array} [options.pins] + * @param {Array} [options.backupUrls] + */ +export async function createUpload (dbClient, user, authKey, cid, options = {}) { + const initialBackupUrl = `https://backup.cid/${new Date().toISOString()}/${Math.random()}` + + await dbClient.createUpload({ + user: user, + contentCid: cid, + sourceCid: cid, + authKey: authKey, + type: options.type || 'Upload', + dagSize: options.dagSize || 1000, + name: options.name || `Upload_${new Date().toISOString()}`, + pins: options.pins || defaultPinData, + backupUrls: options.backupUrls || [initialBackupUrl] + }) + + return dbClient.getUpload(cid, user) +} diff --git a/packages/pinpin/README.md b/packages/pinpin/README.md index 3d1fdb65a1..391d56b776 100644 --- a/packages/pinpin/README.md +++ b/packages/pinpin/README.md @@ -15,8 +15,16 @@ Ensure you have all the dependencies, by running `npm i` in the parent project. To run this locally you will need the following in your `packages/cron/.env` file: ```ini +DATABASE=postgres + FAUNA_KEY="" PINATA_JWT="" + +# PostgREST API URL +PG_REST_URL=http://localhost:3000 +# PostgREST API token, for role "postgres", using secret value PGRST_JWT_SECRET from './postgres/docker/docker-compose.yml' +# https://postgrest.org/en/v8.0/tutorials/tut1.html#step-3-sign-a-token +PG_REST_JWT=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJyb2xlIjoicG9zdGdyZXMifQ.oM0SXF31Vs1nfwCaDxjlczE237KcNKhTpKEYxMX-jEU ``` You also need to have: @@ -31,7 +39,7 @@ npm run start ## Running on Heroku -Deploy using the `herkou` command. Install the `heroku` cli and log in with `heroku login`. A Procfile is added to the root of the monorepo to define the `pinpin` process. It's a worker that should run on a single dyno. There is no `web` worker, which heroku assumes there will be, so we have to explicitly scale the dynos to make that the case. Deployment uses git, and you get `heroku` remote to push to, added to your repo automatically. +Deploy using the `heroku` command. Install the `heroku` cli and log in with `heroku login`. A Procfile is added to the root of the monorepo to define the `pinpin` process. It's a worker that should run on a single dyno. There is no `web` worker, which heroku assumes there will be, so we have to explicitly scale the dynos to make that the case. Deployment uses git, and you get `heroku` remote to push to, added to your repo automatically. Deploying from scratch looks like this: diff --git a/packages/pinpin/src/bin.js b/packages/pinpin/src/bin.js index 63d9f57e02..a7d380e888 100755 --- a/packages/pinpin/src/bin.js +++ b/packages/pinpin/src/bin.js @@ -29,6 +29,18 @@ async function main () { * @param {Record} env */ function getDBClient (env) { + if (env.DATABASE === 'postgres') { + const token = env.PG_REST_JWT + const endpoint = env.PG_REST_URL + if (!token) { + throw new Error('missing PG_REST_JWT environment var') + } + if (!endpoint) { + throw new Error('missing PG_REST_URL environment var') + } + return new DBClient({ token, endpoint, postgres: true }) + } + const token = env.FAUNA_KEY if (!token) { throw new Error('missing FAUNA_KEY environment var') diff --git a/packages/pinpin/src/index.js b/packages/pinpin/src/index.js index ba740819e3..a9fe415d31 100644 --- a/packages/pinpin/src/index.js +++ b/packages/pinpin/src/index.js @@ -5,7 +5,6 @@ * We can only make 3req/s to PiΓ±ata max, so we rate limit in the client to 2 req/s * As such we'll try batches of 600 and see how we go. */ -import { gql } from '@web3-storage/db' import debug from 'debug' import retry from 'p-retry' @@ -13,38 +12,6 @@ import retry from 'p-retry' const MAX_PIN_REQUESTS_PER_RUN = 600 const log = debug('pinpin') -const FIND_BATCH = gql` - query FindAllPinRequests($size: Int!) { - findAllPinRequests(_size: $size) { - data { - _id - cid - created - } - } - } -` - -const DELETE_PIN_REQUESTS = gql` - mutation DeletePinRequests($requests: [ID!]!) { - deletePinRequests(requests: $requests){ - _id - } - } -` - -/** - * Fetch a batch of PinRequests with CIDs to pin - * - * @param {import('@web3-storage/db').DBClient} db - * @returns {Array<{_id: string, cid: string}>} - */ -async function getPinRequests (db) { - const size = MAX_PIN_REQUESTS_PER_RUN - const res = await db.query(FIND_BATCH, { size }) - return res.findAllPinRequests.data -} - /** * Find PinRequests and pin them to PiΓ±ata * @@ -58,7 +25,7 @@ export async function pinToPinata ({ db, pinata }) { console.log('ℹ️ Enable logging by setting DEBUG=pinpin') } log('πŸ“‘ Fetcing Pin Requests from DB') - const pinReqs = await retry(() => getPinRequests(db), { onFailedAttempt: log }) + const pinReqs = await retry(() => db.getPinRequests({ size: MAX_PIN_REQUESTS_PER_RUN }), { onFailedAttempt: log }) const total = pinReqs.length const pinned = [] @@ -80,7 +47,7 @@ export async function pinToPinata ({ db, pinata }) { })) log(`πŸ“‘ Deleting ${pinned.length} processed Pin Requests`) - await retry(() => db.query(DELETE_PIN_REQUESTS, { requests: pinned }), { onFailedAttempt: log }) + await retry(() => db.deletePinRequests(pinned), { onFailedAttempt: log }) log(`πŸŽ‰ Done! Pinned ${pinned.length} of ${total}`) return { total, pinned }