Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: use rpc for pins upsert #1088

Merged
merged 14 commits into from
Mar 16, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion packages/api/src/utils/pin.js
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ export async function waitAndUpdateOkPins (cid, cluster, db, waitTime = MAX_PIN_
id: pin._id,
status: pin.status,
cid,
locationId: pin.location._id
location: pin.location
}
})
await db.upsertPins(pins)
Expand Down
2 changes: 1 addition & 1 deletion packages/cron/src/jobs/pins.js
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ export async function updatePinStatuses ({ cluster, db }) {
id: pin._id,
status: status,
cid: pin.contentCid,
locationId: pin.location._id
location: pin.location
}
}))

Expand Down
2 changes: 1 addition & 1 deletion packages/db/db-client-types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ export type PinsUpsertInput = {
id: string
status: definitions['pin']['status']
cid: definitions['pin_request']['content_cid']
locationId: string
location: Location
}

export type PinItemOutput = {
Expand Down
25 changes: 15 additions & 10 deletions packages/db/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -452,20 +452,25 @@ export class DBClient {
* @param {Array<import('./db-client-types').PinsUpsertInput>} pins
*/
async upsertPins (pins) {
const now = new Date().toISOString()
const { error } = await this._client
.from('pin')
.upsert(pins.map(pin => ({
id: pin.id,
status: pin.status,
content_cid: pin.cid,
pin_location_id: pin.locationId,
updated_at: now
})), { count: 'exact', returning: 'minimal' })
const { data, error } = await this._client.rpc('upsert_pins', {
data: {
pins: pins.map((pin) => ({
status: pin.status,
cid: pin.cid,
location: {
peer_id: pin.location.peerId,
peer_name: pin.location.peerName,
region: pin.location.region
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Be aware this will change soon (today) to have one more thing: https://github.com/web3-storage/web3.storage/pull/1093/files

So let's wait on that PR to get in and update this

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Merged latest to include ipfs peer id.

}
}))
}
})

if (error) {
throw new DBError(error)
}

return data
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I'm not mistaken the rpc call is currently returning void, so there's no actual useful data to return here.
It'd be probably nice to return an array of the inserted pin ids, to be consistent with upsertPin. If not we get rid of the return statement here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I think we could be consistent and return Pin Ids

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Amended to return pinIds. Also amended upsert_pin to return pin_id not pin_location_id.

}

/**
Expand Down
38 changes: 38 additions & 0 deletions packages/db/postgres/functions.sql
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,44 @@ BEGIN
END
$$;

CREATE OR REPLACE FUNCTION upsert_pins(data json) RETURNS VOID
LANGUAGE plpgsql
volatile
PARALLEL UNSAFE
AS
$$
DECLARE
pin_data json;
pin_location_result_id BIGINT;
pin_result_id BIGINT;
BEGIN
FOREACH pin_data IN array json_arr_to_json_element_array(data -> 'pins')
LOOP
-- Add to pin_location table if new
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we be calling upsert_pin here instead?
We'd avoid some duplication. There's an inconsistency with the data inputs, in one function we use content_cid and in the other we usecid, but I think that can be easily changed? And we'd benefit from a bit more consistency?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Corrected to re-use upsert_pin and to return an array of pin Ids.
Also changed to use content_cid consistenly.

INSERT INTO pin_location (peer_id, peer_name, region)
VALUES (pin_data -> 'location' ->> 'peer_id',
pin_data -> 'location' ->> 'peer_name',
pin_data -> 'location' ->> 'region')
-- Force update on conflict to get result, otherwise needs a follow up select
ON CONFLICT ( peer_id ) DO UPDATE
SET "peer_name" = pin_data -> 'location' ->> 'peer_name',
"region" = pin_data -> 'location' ->> 'region'
RETURNING id INTO pin_location_result_id;

INSERT INTO pin (content_cid, status, pin_location_id, updated_at)
VALUES (pin_data ->> 'cid',
(pin_data ->> 'status')::pin_status_type,
pin_location_result_id,
(NOW())::timestamptz)
-- Force update on conflict to get result, otherwise needs a follow up select
ON CONFLICT ( content_cid, pin_location_id ) DO UPDATE
SET "status" = (pin_data ->> 'status')::pin_status_type,
"updated_at" = NOW()
RETURNING id INTO pin_result_id;
END LOOP;
END
$$;

CREATE OR REPLACE FUNCTION upsert_pin(data json) RETURNS TEXT
LANGUAGE plpgsql
volatile
Expand Down
5 changes: 3 additions & 2 deletions packages/db/test/pin-sync-request.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,6 @@ describe('pin-sync-request', () => {
it('can update multiple pin status', async () => {
const to = new Date().toISOString()
const { data: pinSyncReqs } = await client.getPinSyncRequests({ to })

// Assert Previous pin state
pinSyncReqs.forEach(psr => {
assert.strictEqual(psr.pin.status, 'Pinning', 'pin sync requests have Pinning state')
Expand All @@ -113,7 +112,9 @@ describe('pin-sync-request', () => {
id: psr.pin._id,
status: 'Pinned',
cid: psr.pin.contentCid,
locationId: psr.pin.location._id
location: {
...psr.pin.location
}
})))

const { data: pinSyncReqsAfterUpdate } = await client.getPinSyncRequests({ to })
Expand Down