Skip to content

Commit

Permalink
fix: use rpc for pins upsert (#1088)
Browse files Browse the repository at this point in the history
* fix: use rpc for pins upsert

* fix: lint correction

* fix: remove code duplication in functions

* fix: included ipfs peerid + now returning pin ids

* fix: cron mock for upsert pins

* chore: add migration script, adjust mock

* fix: improve pin_location insert

* fix: corrected migration
  • Loading branch information
GaryHomewood authored Mar 16, 2022
1 parent 72b8073 commit 6a8e394
Show file tree
Hide file tree
Showing 8 changed files with 71 additions and 19 deletions.
4 changes: 2 additions & 2 deletions packages/api/src/utils/pin.js
Original file line number Diff line number Diff line change
Expand Up @@ -148,8 +148,8 @@ export async function waitAndUpdateOkPins (cid, cluster, db, waitTime = MAX_PIN_
return {
id: pin._id,
status: pin.status,
cid,
locationId: pin.location._id
contentCid: cid,
location: pin.location
}
})
await db.upsertPins(pins)
Expand Down
4 changes: 2 additions & 2 deletions packages/cron/src/jobs/pins.js
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,8 @@ export async function updatePinStatuses ({ cluster, db }) {
return {
id: pin._id,
status: status,
cid: pin.contentCid,
locationId: pin.location._id
contentCid: pin.contentCid,
location: pin.location
}
}))

Expand Down
1 change: 1 addition & 0 deletions packages/cron/test/mocks/pgrest/rpc#upsert_pins.json
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
["1", "2", "3"]
4 changes: 2 additions & 2 deletions packages/db/db-client-types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,8 @@ export type PinItem = PinUpsertInput & {
export type PinsUpsertInput = {
id: string
status: definitions['pin']['status']
cid: definitions['pin_request']['content_cid']
locationId: string
contentCid: definitions['pin_request']['content_cid']
location: Location
}

export type PinItemOutput = {
Expand Down
30 changes: 20 additions & 10 deletions packages/db/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -445,20 +445,30 @@ export class DBClient {
* @param {Array<import('./db-client-types').PinsUpsertInput>} pins
*/
async upsertPins (pins) {
const now = new Date().toISOString()
const { error } = await this._client
.from('pin')
.upsert(pins.map(pin => ({
id: pin.id,
status: pin.status,
content_cid: pin.cid,
pin_location_id: pin.locationId,
updated_at: now
})), { count: 'exact', returning: 'minimal' })
const { data: pinIds, error } = await this._client.rpc('upsert_pins', {
data: {
pins: pins.map((pin) => ({
data: {
content_cid: pin.contentCid,
pin: {
status: pin.status,
location: {
peer_id: pin.location.peerId,
peer_name: pin.location.peerName,
ipf_peer_id: pin.location.ipfsPeerId,
region: pin.location.region
}
}
}
}))
}
}).single()

if (error) {
throw new DBError(error)
}

return pinIds
}

/**
Expand Down
20 changes: 20 additions & 0 deletions packages/db/postgres/functions.sql
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ DROP FUNCTION IF EXISTS json_arr_to_json_element_array;
DROP FUNCTION IF EXISTS create_key;
DROP FUNCTION IF EXISTS create_upload;
DROP FUNCTION IF EXISTS upsert_pin;
DROP FUNCTION IF EXISTS upsert_pins;
DROP FUNCTION IF EXISTS user_used_storage;
DROP FUNCTION IF EXISTS user_auth_keys_list;
DROP FUNCTION IF EXISTS find_deals_by_content_cids;
Expand Down Expand Up @@ -249,6 +250,25 @@ BEGIN
END
$$;

CREATE OR REPLACE FUNCTION upsert_pins(data json) RETURNS TEXT[]
LANGUAGE plpgsql
volatile
PARALLEL UNSAFE
AS
$$
DECLARE
pin json;
pin_ids TEXT[];
BEGIN
FOREACH pin IN array json_arr_to_json_element_array(data -> 'pins')
LOOP
SELECT pin_ids || upsert_pin(pin -> 'data') INTO pin_ids;
END LOOP;

RETURN pin_ids;
END
$$;

CREATE OR REPLACE FUNCTION user_used_storage(query_user_id BIGINT) RETURNS TEXT
LANGUAGE plpgsql
AS
Expand Down
20 changes: 20 additions & 0 deletions packages/db/postgres/migrations/005-add-upsert-pins-function.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
DROP FUNCTION IF EXISTS upsert_pins;

CREATE OR REPLACE FUNCTION upsert_pins(data json) RETURNS TEXT[]
LANGUAGE plpgsql
volatile
PARALLEL UNSAFE
AS
$$
DECLARE
pin json;
pin_ids TEXT[];
BEGIN
FOREACH pin IN array json_arr_to_json_element_array(data -> 'pins')
LOOP
SELECT pin_ids || upsert_pin(pin -> 'data') INTO pin_ids;
END LOOP;

RETURN pin_ids;
END
$$;
7 changes: 4 additions & 3 deletions packages/db/test/pin-sync-request.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,6 @@ describe('pin-sync-request', () => {
it('can update multiple pin status', async () => {
const to = new Date().toISOString()
const { data: pinSyncReqs } = await client.getPinSyncRequests({ to })

// Assert Previous pin state
pinSyncReqs.forEach(psr => {
assert.strictEqual(psr.pin.status, 'Pinning', 'pin sync requests have Pinning state')
Expand All @@ -115,8 +114,10 @@ describe('pin-sync-request', () => {
await client.upsertPins(pinSyncReqs.map(psr => ({
id: psr.pin._id,
status: 'Pinned',
cid: psr.pin.contentCid,
locationId: psr.pin.location._id
contentCid: psr.pin.contentCid,
location: {
...psr.pin.location
}
})))

const { data: pinSyncReqsAfterUpdate } = await client.getPinSyncRequests({ to })
Expand Down

0 comments on commit 6a8e394

Please sign in to comment.