diff --git a/packages/api/src/utils/pin.js b/packages/api/src/utils/pin.js index afa2b1d1ec..1d58be4eaf 100644 --- a/packages/api/src/utils/pin.js +++ b/packages/api/src/utils/pin.js @@ -148,8 +148,8 @@ export async function waitAndUpdateOkPins (cid, cluster, db, waitTime = MAX_PIN_ return { id: pin._id, status: pin.status, - cid, - locationId: pin.location._id + contentCid: cid, + location: pin.location } }) await db.upsertPins(pins) diff --git a/packages/cron/src/jobs/pins.js b/packages/cron/src/jobs/pins.js index ef78ce77fa..1829af8a84 100644 --- a/packages/cron/src/jobs/pins.js +++ b/packages/cron/src/jobs/pins.js @@ -113,8 +113,8 @@ export async function updatePinStatuses ({ cluster, db }) { return { id: pin._id, status: status, - cid: pin.contentCid, - locationId: pin.location._id + contentCid: pin.contentCid, + location: pin.location } })) diff --git a/packages/cron/test/mocks/pgrest/rpc#upsert_pins.json b/packages/cron/test/mocks/pgrest/rpc#upsert_pins.json new file mode 100644 index 0000000000..28120b42bf --- /dev/null +++ b/packages/cron/test/mocks/pgrest/rpc#upsert_pins.json @@ -0,0 +1 @@ +["1", "2", "3"] \ No newline at end of file diff --git a/packages/db/db-client-types.ts b/packages/db/db-client-types.ts index 8e7bd54881..4fe55e1ccc 100644 --- a/packages/db/db-client-types.ts +++ b/packages/db/db-client-types.ts @@ -76,8 +76,8 @@ export type PinItem = PinUpsertInput & { export type PinsUpsertInput = { id: string status: definitions['pin']['status'] - cid: definitions['pin_request']['content_cid'] - locationId: string + contentCid: definitions['pin_request']['content_cid'] + location: Location } export type PinItemOutput = { diff --git a/packages/db/index.js b/packages/db/index.js index 29ce140b94..bf347e08db 100644 --- a/packages/db/index.js +++ b/packages/db/index.js @@ -445,20 +445,30 @@ export class DBClient { * @param {Array} pins */ async upsertPins (pins) { - const now = new Date().toISOString() - const { error } = await this._client - .from('pin') - .upsert(pins.map(pin => ({ - id: pin.id, - status: pin.status, - content_cid: pin.cid, - pin_location_id: pin.locationId, - updated_at: now - })), { count: 'exact', returning: 'minimal' }) + const { data: pinIds, error } = await this._client.rpc('upsert_pins', { + data: { + pins: pins.map((pin) => ({ + data: { + content_cid: pin.contentCid, + pin: { + status: pin.status, + location: { + peer_id: pin.location.peerId, + peer_name: pin.location.peerName, + ipf_peer_id: pin.location.ipfsPeerId, + region: pin.location.region + } + } + } + })) + } + }).single() if (error) { throw new DBError(error) } + + return pinIds } /** diff --git a/packages/db/postgres/functions.sql b/packages/db/postgres/functions.sql index bf96b613e5..9d453d7100 100644 --- a/packages/db/postgres/functions.sql +++ b/packages/db/postgres/functions.sql @@ -3,6 +3,7 @@ DROP FUNCTION IF EXISTS json_arr_to_json_element_array; DROP FUNCTION IF EXISTS create_key; DROP FUNCTION IF EXISTS create_upload; DROP FUNCTION IF EXISTS upsert_pin; +DROP FUNCTION IF EXISTS upsert_pins; DROP FUNCTION IF EXISTS user_used_storage; DROP FUNCTION IF EXISTS user_auth_keys_list; DROP FUNCTION IF EXISTS find_deals_by_content_cids; @@ -249,6 +250,25 @@ BEGIN END $$; +CREATE OR REPLACE FUNCTION upsert_pins(data json) RETURNS TEXT[] + LANGUAGE plpgsql + volatile + PARALLEL UNSAFE +AS +$$ +DECLARE + pin json; + pin_ids TEXT[]; +BEGIN + FOREACH pin IN array json_arr_to_json_element_array(data -> 'pins') + LOOP + SELECT pin_ids || upsert_pin(pin -> 'data') INTO pin_ids; + END LOOP; + + RETURN pin_ids; +END +$$; + CREATE OR REPLACE FUNCTION user_used_storage(query_user_id BIGINT) RETURNS TEXT LANGUAGE plpgsql AS diff --git a/packages/db/postgres/migrations/005-add-upsert-pins-function.sql b/packages/db/postgres/migrations/005-add-upsert-pins-function.sql new file mode 100644 index 0000000000..24248dc6b0 --- /dev/null +++ b/packages/db/postgres/migrations/005-add-upsert-pins-function.sql @@ -0,0 +1,20 @@ +DROP FUNCTION IF EXISTS upsert_pins; + +CREATE OR REPLACE FUNCTION upsert_pins(data json) RETURNS TEXT[] + LANGUAGE plpgsql + volatile + PARALLEL UNSAFE +AS +$$ +DECLARE + pin json; + pin_ids TEXT[]; +BEGIN + FOREACH pin IN array json_arr_to_json_element_array(data -> 'pins') + LOOP + SELECT pin_ids || upsert_pin(pin -> 'data') INTO pin_ids; + END LOOP; + + RETURN pin_ids; +END +$$; diff --git a/packages/db/test/pin-sync-request.spec.js b/packages/db/test/pin-sync-request.spec.js index 50c346939a..5bd437fa3e 100644 --- a/packages/db/test/pin-sync-request.spec.js +++ b/packages/db/test/pin-sync-request.spec.js @@ -105,7 +105,6 @@ describe('pin-sync-request', () => { it('can update multiple pin status', async () => { const to = new Date().toISOString() const { data: pinSyncReqs } = await client.getPinSyncRequests({ to }) - // Assert Previous pin state pinSyncReqs.forEach(psr => { assert.strictEqual(psr.pin.status, 'Pinning', 'pin sync requests have Pinning state') @@ -115,8 +114,10 @@ describe('pin-sync-request', () => { await client.upsertPins(pinSyncReqs.map(psr => ({ id: psr.pin._id, status: 'Pinned', - cid: psr.pin.contentCid, - locationId: psr.pin.location._id + contentCid: psr.pin.contentCid, + location: { + ...psr.pin.location + } }))) const { data: pinSyncReqsAfterUpdate } = await client.getPinSyncRequests({ to })