Skip to content

Commit

Permalink
feat: implement RateLimitStorage and integrate it into upload-api (#216)
Browse files Browse the repository at this point in the history
Implement the new RateLimitStorage interface from
storacha/w3up#832 and integrate it into the
upload-api.

This gives us the ability to block uploads to a space and to block an
email or domain name from authorizing with our service.

Note that we leave a couple of the new capabilities unimplemented for
now since we don't need them urgently. We are actively hoping to block
some abusive users ASAP, so I'm getting the blocking part of this work
in now and will then turn my attention to the remaining capabilities.

TODO
- [x] remove file dependencies from package.json and point at latest
versions of deps once storacha/w3up#832 is
merged and released
  • Loading branch information
travis authored Aug 11, 2023
1 parent 2c279cd commit 3b032d1
Show file tree
Hide file tree
Showing 20 changed files with 16,633 additions and 19,654 deletions.
35,713 changes: 16,358 additions & 19,355 deletions package-lock.json

Large diffs are not rendered by default.

4 changes: 3 additions & 1 deletion stacks/upload-api-stack.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ export function UploadApiStack({ stack, app }) {

// Get references to constructs created in other stacks
const { carparkBucket } = use(CarparkStack)
const { storeTable, uploadTable, delegationBucket, delegationTable, adminMetricsTable, consumerTable, subscriptionTable } = use(UploadDbStack)
const { storeTable, uploadTable, delegationBucket, delegationTable, adminMetricsTable, consumerTable, subscriptionTable, rateLimitTable } = use(UploadDbStack)
const { invocationBucket, taskBucket, workflowBucket, ucanStream } = use(UcanInvocationStack)

// Setup API
Expand All @@ -43,6 +43,7 @@ export function UploadApiStack({ stack, app }) {
delegationBucket,
consumerTable,
subscriptionTable,
rateLimitTable,
adminMetricsTable,
carparkBucket,
invocationBucket,
Expand All @@ -56,6 +57,7 @@ export function UploadApiStack({ stack, app }) {
UPLOAD_TABLE_NAME: uploadTable.tableName,
CONSUMER_TABLE_NAME: consumerTable.tableName,
SUBSCRIPTION_TABLE_NAME: subscriptionTable.tableName,
RATE_LIMIT_TABLE_NAME: rateLimitTable.tableName,
DELEGATION_TABLE_NAME: delegationTable.tableName,
DELEGATION_BUCKET_NAME: delegationBucket.bucketName,
INVOCATION_BUCKET_NAME: invocationBucket.bucketName,
Expand Down
13 changes: 10 additions & 3 deletions stacks/upload-db-stack.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ import {
uploadTableProps,
consumerTableProps,
subscriptionTableProps,
delegationTableProps
delegationTableProps,
rateLimitTableProps
} from '../upload-api/tables/index.js'
import {
adminMetricsTableProps,
Expand All @@ -25,7 +26,7 @@ export function UploadDbStack({ stack, app }) {
* This table takes a stored CAR and makes an entry in the store table
* Used by the store/* service capabilities.
*/
const storeTable = new Table(stack, 'store', storeTableProps)
const storeTable = new Table(stack, 'store', storeTableProps)

/**
* This table maps stored CAR files (shards) to an upload root cid.
Expand All @@ -42,7 +43,12 @@ export function UploadDbStack({ stack, app }) {
* This table tracks the relationship between subscriptions and consumers (ie, spaces).
*/
const consumerTable = new Table(stack, 'consumer', consumerTableProps)


/**
* This table tracks rate limits we have imposed on subjects.
*/
const rateLimitTable = new Table(stack, 'rate-limit', rateLimitTableProps)

/**
* This bucket stores delegations extracted from UCAN invocations.
*/
Expand Down Expand Up @@ -73,6 +79,7 @@ export function UploadDbStack({ stack, app }) {
uploadTable,
consumerTable,
subscriptionTable,
rateLimitTable,
delegationBucket,
delegationTable,
adminMetricsTable,
Expand Down
14 changes: 9 additions & 5 deletions test/helpers/up-client.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ function getAuthLinkFromEmail (email, uploadServiceUrl) {
const link = email.match(/<a href="([^"]*)".*Verify email address/)[1]

// test auth services always link to the staging URL but we want to hit the service we're testing
return link.replace("https://w3access-staging.protocol-labs.workers.dev", uploadServiceUrl)
return link.replace("https://staging.up.web3.storage", uploadServiceUrl)
}

async function createMailSlurpInbox() {
Expand All @@ -31,17 +31,21 @@ async function createMailSlurpInbox() {
}
}

export async function setupNewClient (uploadServiceUrl, options = {}) {
// create an inbox
const { mailslurp, id: inboxId, email } = await createMailSlurpInbox()
export async function createNewClient(uploadServiceUrl) {
const principal = await Signer.generate()
const data = await AgentData.create({ principal })
const client = new Client(data, {
return new Client(data, {
serviceConf: {
upload: getUploadServiceConnection(uploadServiceUrl),
access: getAccessServiceConnection(uploadServiceUrl)
},
})
}

export async function setupNewClient (uploadServiceUrl, options = {}) {
// create an inbox
const { mailslurp, id: inboxId, email } = await createMailSlurpInbox()
const client = await createNewClient(uploadServiceUrl)

const timeoutMs = process.env.MAILSLURP_TIMEOUT ? parseInt(process.env.MAILSLURP_TIMEOUT) : 60_000
const authorizePromise = client.authorize(email)
Expand Down
66 changes: 64 additions & 2 deletions test/integration.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@ import { fetch } from '@web-std/fetch'
import git from 'git-rev-sync'
import pWaitFor from 'p-wait-for'
import { HeadObjectCommand } from '@aws-sdk/client-s3'
import {
PutItemCommand
} from '@aws-sdk/client-dynamodb'
import { marshall } from '@aws-sdk/util-dynamodb'

import { METRICS_NAMES, SPACE_METRICS_NAMES } from '../ucan-invocation/constants.js'
import { test } from './helpers/context.js'
Expand All @@ -14,15 +18,16 @@ import {
getCarparkBucketInfo,
getDynamoDb
} from './helpers/deployment.js'
import { setupNewClient } from './helpers/up-client.js'
import { createNewClient, setupNewClient } from './helpers/up-client.js'
import { randomFile } from './helpers/random.js'
import { getTableItem, getAllTableRows } from './helpers/table.js'

test.before(t => {
t.context = {
apiEndpoint: getApiEndpoint(),
metricsDynamo: getDynamoDb('admin-metrics'),
spaceMetricsDynamo: getDynamoDb('space-metrics')
spaceMetricsDynamo: getDynamoDb('space-metrics'),
rateLimitsDynamo: getDynamoDb('rate-limit')
}
})

Expand Down Expand Up @@ -65,6 +70,48 @@ test('upload-api /metrics', async t => {
t.is((body.match(/w3up_invocations_total/g) || []).length, 6)
})

test('authorizations can be blocked by email or domain', async t => {
const client = await createNewClient(t.context.apiEndpoint)

// test email blocking
await t.context.rateLimitsDynamo.client.send(new PutItemCommand({
TableName: t.context.rateLimitsDynamo.tableName,
Item: marshall({
id: Math.random().toString(10),
subject: 'travis@example.com',
rate: 0
})
}))

// it would be nice to use t.throwsAsync here, but that doesn't work with errors that aren't exceptions: https://github.com/avajs/ava/issues/2517
try {
await client.authorize('travis@example.com')
t.fail('authorize should fail with a blocked email address')
} catch (e) {
t.is(e.name, 'AccountBlocked')
t.is(e.message, 'Account identified by did:mailto:example.com:travis is blocked')
}

// test domain blocking
await t.context.rateLimitsDynamo.client.send(new PutItemCommand({
TableName: t.context.rateLimitsDynamo.tableName,
Item: marshall({
id: Math.random().toString(10),
subject: 'example2.com',
rate: 0
})
}))

// it would be nice to use t.throwsAsync here, but that doesn't work with errors that aren't exceptions: https://github.com/avajs/ava/issues/2517
try {
await client.authorize('travis@example2.com')
t.fail('authorize should fail with a blocked domain')
} catch (e) {
t.is(e.name, 'AccountBlocked')
t.is(e.message, 'Account identified by did:mailto:example2.com:travis is blocked')
}
})

// Integration test for all flow from uploading a file to Kinesis events consumers and replicator
test('w3infra integration flow', async t => {
const client = await setupNewClient(t.context.apiEndpoint)
Expand Down Expand Up @@ -222,6 +269,21 @@ test('w3infra integration flow', async t => {
)
})
}

// verify that blocking a space makes it impossible to upload a file to it
await t.context.rateLimitsDynamo.client.send(new PutItemCommand({
TableName: t.context.rateLimitsDynamo.tableName,
Item: marshall({
id: Math.random().toString(10),
subject: client.currentSpace().did(),
rate: 0
})
}))
const uploadError = await t.throwsAsync(async () => {
await client.uploadFile(await randomFile(100))
})

t.is(uploadError.message, 'failed store/add invocation')
})

/**
Expand Down
4 changes: 2 additions & 2 deletions ucan-invocation/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@
"version": "0.0.0",
"type": "module",
"scripts": {
"test": "ava --verbose --timeout=60s test/{*.test.js,**/*.test.js}"
"test": "ava --verbose --timeout=60s --no-worker-threads --serial test/{*.test.js,**/*.test.js}"
},
"dependencies": {
"@aws-sdk/client-dynamodb": "^3.226.0",
"@aws-sdk/client-eventbridge": "^3.218.0",
"@sentry/serverless": "^7.22.0",
"@web3-storage/capabilities": "^6.0.1",
"@web3-storage/capabilities": "^9.0.0",
"uint8arrays": "^4.0.2"
},
"devDependencies": {
Expand Down
55 changes: 0 additions & 55 deletions upload-api/access.js

This file was deleted.

15 changes: 6 additions & 9 deletions upload-api/functions/ucan-invocation-router.js
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
import { DID, API } from '@ucanto/core'
import { API } from '@ucanto/core'
import * as Server from '@ucanto/server'
import { Kinesis } from '@aws-sdk/client-kinesis'
import * as Sentry from '@sentry/serverless'

import { createAccessClient } from '../access.js'
import { processAgentMessageArchive } from '../ucan-invocation.js'
import { createCarStore } from '../buckets/car-store.js'
import { createDudewhereStore } from '../buckets/dudewhere-store.js'
Expand All @@ -22,6 +21,7 @@ import { createDelegationsTable } from '../tables/delegations.js'
import { createDelegationsStore } from '../buckets/delegations-store.js'
import { createSubscriptionTable } from '../tables/subscription.js'
import { createConsumerTable } from '../tables/consumer.js'
import { createRateLimitTable } from '../tables/rate-limit.js'

Sentry.AWSLambda.init({
environment: process.env.SST_STAGE,
Expand Down Expand Up @@ -85,6 +85,7 @@ export async function ucanInvocationRouter(request) {
CONSUMER_TABLE_NAME: consumerTableName = '',
SUBSCRIPTION_TABLE_NAME: subscriptionTableName = '',
DELEGATION_TABLE_NAME: delegationTableName = '',
RATE_LIMIT_TABLE_NAME: rateLimitTableName = '',
R2_ENDPOINT: r2DelegationBucketEndpoint = '',
R2_ACCESS_KEY_ID: r2DelegationBucketAccessKeyId = '',
R2_SECRET_ACCESS_KEY: r2DelegationBucketSecretAccessKey = '',
Expand Down Expand Up @@ -123,6 +124,7 @@ export async function ucanInvocationRouter(request) {
const consumerTable = createConsumerTable(AWS_REGION, consumerTableName, {
endpoint: dbEndpoint
});
const rateLimitsStorage = createRateLimitTable(AWS_REGION, rateLimitTableName)
const provisionsStorage = useProvisionStore(subscriptionTable, consumerTable, [
/** @type {import('@web3-storage/upload-api').ServiceDID} */
(accessServiceDID)
Expand All @@ -145,18 +147,13 @@ export async function ucanInvocationRouter(request) {
uploadTable: createUploadTable(AWS_REGION, uploadTableName, {
endpoint: dbEndpoint,
}),
access: createAccessClient(
serviceSigner,
DID.parse(accessServiceDID),
provisionsStorage,
delegationsStorage
),
signer: serviceSigner,
// TODO: we should set URL from a different env var, doing this for now to avoid that refactor - tracking in https://github.com/web3-storage/w3infra/issues/209
url: new URL(accessServiceURL),
email: new Email({ token: postmarkToken }),
provisionsStorage,
delegationsStorage
delegationsStorage,
rateLimitsStorage
})

const processingCtx = {
Expand Down
3 changes: 3 additions & 0 deletions upload-api/functions/validate-email.jsx
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import {
ValidateEmailError,
PendingValidateEmail,
} from '../html.jsx'
import { createRateLimitTable } from '../tables/rate-limit.js'

Sentry.AWSLambda.init({
environment: process.env.SST_STAGE,
Expand Down Expand Up @@ -66,6 +67,7 @@ function createAuthorizeContext () {
ACCESS_SERVICE_DID = '',
AWS_REGION = '',
DELEGATION_TABLE_NAME = '',
RATE_LIMIT_TABLE_NAME = '',
R2_ENDPOINT = '',
R2_ACCESS_KEY_ID = '',
R2_SECRET_ACCESS_KEY = '',
Expand Down Expand Up @@ -102,6 +104,7 @@ function createAuthorizeContext () {
/** @type {import('@web3-storage/upload-api').ServiceDID} */
(ACCESS_SERVICE_DID)
]),
rateLimitsStorage: createRateLimitTable(AWS_REGION, RATE_LIMIT_TABLE_NAME)
}
}

Expand Down
9 changes: 5 additions & 4 deletions upload-api/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
"version": "3.0.0",
"type": "module",
"scripts": {
"test": "ava --node-arguments='--experimental-fetch' --verbose --timeout=60s '**/*.test.js'"
"test": "ava --node-arguments='--experimental-fetch' --verbose --timeout=60s --no-worker-threads --serial '**/*.test.js'"
},
"dependencies": {
"@aws-sdk/client-dynamodb": "^3.226.0",
Expand All @@ -23,10 +23,11 @@
"@ucanto/validator": "^8.0.0",
"@web-std/fetch": "^4.1.0",
"@web3-storage/access": "^14.0.0",
"@web3-storage/capabilities": "^6.0.1",
"@web3-storage/upload-api": "^4.1.0",
"@web3-storage/capabilities": "^9.0.0",
"@web3-storage/upload-api": "^5.0.0",
"@web3-storage/w3infra-ucan-invocation": "*",
"multiformats": "^11.0.1",
"nanoid": "^4.0.2",
"preact": "^10.14.1",
"preact-render-to-string": "^5.2.6",
"prom-client": "^14.2.0",
Expand All @@ -38,9 +39,9 @@
"@types/aws-lambda": "^8.10.108",
"@ucanto/core": "^8.0.0",
"@web-std/blob": "3.0.4",
"@web3-storage/sigv4": "^1.0.2",
"ava": "^4.3.3",
"aws-lambda-test-utils": "^1.3.0",
"nanoid": "^4.0.0",
"testcontainers": "^8.13.0"
},
"engines": {
Expand Down
Loading

0 comments on commit 3b032d1

Please sign in to comment.