Skip to content

Commit

Permalink
move r2-pump from GCP function to binary
Browse files Browse the repository at this point in the history
  • Loading branch information
xtuc committed Mar 5, 2024
1 parent 5d42894 commit 6ecc836
Show file tree
Hide file tree
Showing 14 changed files with 592 additions and 207 deletions.
10 changes: 10 additions & 0 deletions .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,16 @@ jobs:
asset_name: git-sync
asset_content_type: application/octet-stream

- uses: actions/upload-release-asset@v1
name: release r2-pump
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
with:
upload_url: ${{ steps.create_release.outputs.upload_url }}
asset_path: ./bin/r2-pump
asset_name: r2-pump
asset_content_type: application/octet-stream

test:
name: Test
runs-on: ubuntu-latest
Expand Down
5 changes: 4 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ define generate-func-make
endef

.PHONY: all
all: bin/process-version-host bin/git-sync bin/checker \
all: bin/process-version-host bin/git-sync bin/checker bin/r2-pump \
;$(foreach n,${CLOUD_FUNCTIONS},$(call generate-func-make,$n))

bin/checker:
Expand All @@ -22,6 +22,9 @@ bin/process-version-host:
bin/process-version:
go build $(GO_BUILD_ARGS) -o bin/process-version ./cmd/process-version

bin/r2-pump:
go build $(GO_BUILD_ARGS) -o bin/r2-pump ./cmd/r2-pump

.PHONY: schema
schema:
./bin/packages human > schema_human.json
Expand Down
4 changes: 2 additions & 2 deletions audit/audit.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,14 +187,14 @@ func WroteAlgolia(ctx context.Context, pkgName string, currVersion string, lastV
return nil
}

func WroteR2(ctx context.Context, pkgName string, version string, keys []string, ext string) error {
func WroteR2(ctx context.Context, pkgName string, version string, keys []string) error {
content := bytes.NewBufferString("")
fmt.Fprint(content, "Files:\n")
for _, key := range keys {
fmt.Fprintf(content, "- %s\n", key)
}

if err := create(ctx, pkgName, version, "r2-publish/"+ext, content); err != nil {
if err := create(ctx, pkgName, version, "r2-publish", content); err != nil {
return errors.Wrap(err, "could not create audit log file")
}
return nil
Expand Down
201 changes: 201 additions & 0 deletions cmd/r2-pump/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
package main

import (
"bytes"
"context"
b64 "encoding/base64"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"log"
"net/http"
"os"
"runtime"
"time"

"github.com/pkg/errors"

"github.com/cdnjs/tools/audit"
"github.com/cdnjs/tools/gcp"
"github.com/cdnjs/tools/metrics"
"github.com/cdnjs/tools/packages"
"github.com/cdnjs/tools/sentry"

"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/credentials"
"github.com/aws/aws-sdk-go-v2/service/s3"

"cloud.google.com/go/pubsub"
)

var (
PROJECT = os.Getenv("PROJECT")
SUBSCRIPTION = os.Getenv("SUBSCRIPTION")

R2_BUCKET = os.Getenv("R2_BUCKET")
R2_KEY_ID = os.Getenv("R2_KEY_ID")
R2_KEY_SECRET = os.Getenv("R2_KEY_SECRET")
R2_ENDPOINT = os.Getenv("R2_ENDPOINT")
)

func init() {
sentry.Init()
}

func main() {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, PROJECT)
if err != nil {
log.Fatalf("could not create pubsub Client: %v", err)
}
sub := client.Subscription(SUBSCRIPTION)
sub.ReceiveSettings.Synchronous = true
sub.ReceiveSettings.MaxOutstandingMessages = 5
sub.ReceiveSettings.NumGoroutines = runtime.NumCPU()

for {
log.Printf("started consuming messages\n")
if err := consume(ctx, client, sub); err != nil {
log.Fatalf("could not pull messages: %s", err)
}
}
}

type Message struct {
GCSEvent gcp.GCSEvent `json:"gcsEvent"`
}

func consume(ctx context.Context, client *pubsub.Client, sub *pubsub.Subscription) error {
err := sub.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
log.Printf("received message: %s\n", msg.Data)

msg.Ack()
if err := processMessage(ctx, msg.Data); err != nil {
log.Printf("failed to process message: %s\n", err)
}
})
if err != nil {
return errors.Wrap(err, "could not receive from subscription")
}
return nil
}

func processMessage(ctx context.Context, data []byte) error {
var message Message
if err := json.Unmarshal(data, &message); err != nil {
return errors.Wrap(err, "failed to parse")
}

return invoke(ctx, message.GCSEvent)
}

func invoke(ctx context.Context, e gcp.GCSEvent) error {
sentry.Init()
defer sentry.PanicHandler()

pkgName := e.Metadata["package"].(string)
version := e.Metadata["version"].(string)
log.Printf("Invoke %s %s\n", pkgName, version)

configStr, err := b64.StdEncoding.DecodeString(e.Metadata["config"].(string))
if err != nil {
return fmt.Errorf("could not decode config: %v", err)
}

archive, err := gcp.ReadObject(ctx, e.Bucket, e.Name)
if err != nil {
return fmt.Errorf("could not read object: %v", err)
}

bucket := aws.String(R2_BUCKET)

r2Resolver := aws.EndpointResolverWithOptionsFunc(func(service, region string, options ...interface{}) (aws.Endpoint, error) {
return aws.Endpoint{
URL: R2_ENDPOINT,
}, nil
})

cfg, err := config.LoadDefaultConfig(ctx,
config.WithEndpointResolverWithOptions(r2Resolver),
config.WithCredentialsProvider(credentials.NewStaticCredentialsProvider(R2_KEY_ID, R2_KEY_SECRET, "")),
)
if err != nil {
return fmt.Errorf("could not load config: %s", err)
}

s3Client := s3.NewFromConfig(cfg)

keys := make([]string, 0)

onFile := func(name string, r io.Reader) error {
// remove leading slash
name = name[1:]
key := fmt.Sprintf("%s/%s/%s", pkgName, version, name)

content, err := ioutil.ReadAll(r)
if err != nil {
return errors.Wrap(err, "could not read file")
}

keys = append(keys, key)

meta := newMetadata(len(content))

s3Object := s3.PutObjectInput{
Body: bytes.NewReader(content),
Bucket: bucket,
Key: aws.String(key),
Metadata: meta,
}
if err := uploadFile(ctx, s3Client, &s3Object); err != nil {
return errors.Wrap(err, "failed to upload file")
}
return nil
}
if err := gcp.Inflate(bytes.NewReader(archive), onFile); err != nil {
return fmt.Errorf("could not inflate archive: %s", err)
}

if len(keys) == 0 {
log.Printf("%s: no files to publish\n", pkgName)
}

pkg := new(packages.Package)
if err := json.Unmarshal([]byte(configStr), &pkg); err != nil {
return fmt.Errorf("failed to parse config: %s", err)
}

if err := audit.WroteR2(ctx, pkgName, version, keys); err != nil {
log.Printf("failed to audit: %s\n", err)
}
if err := metrics.NewUpdatePublishedR2(); err != nil {
return errors.Wrap(err, "could not report metrics")
}

return nil
}

func newMetadata(size int) map[string]string {
lastModifiedTime := time.Now()
lastModifiedSeconds := lastModifiedTime.UnixNano() / int64(time.Second)
lastModifiedStr := lastModifiedTime.Format(http.TimeFormat)
etag := fmt.Sprintf("%x-%x", lastModifiedSeconds, size)

meta := make(map[string]string)

// https://github.com/cdnjs/origin-worker/blob/ff91d30586c9e924ff919407401dff6f52826b4d/src/index.js#L212-L213
meta["etag"] = etag
meta["last_modified"] = lastModifiedStr

return meta
}

func uploadFile(ctx context.Context, s3Client *s3.Client, obj *s3.PutObjectInput) error {
if _, err := s3Client.PutObject(ctx, obj); err != nil {
return errors.Wrapf(err, "failed to put Object %s", *obj.Key)
}

return nil
}
8 changes: 5 additions & 3 deletions functions/check-pkg-updates/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ go 1.20

require (
github.com/cdnjs/tools v0.0.0-00010101000000-000000000000
github.com/cloudflare/cloudflare-go v0.16.0
github.com/cloudflare/cloudflare-go v0.69.0
github.com/pkg/errors v0.9.1
)

Expand All @@ -19,8 +19,10 @@ require (
github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/google/go-github v17.0.0+incompatible // indirect
github.com/google/go-querystring v1.0.0 // indirect
github.com/google/go-querystring v1.1.0 // indirect
github.com/googleapis/gax-go/v2 v2.0.5 // indirect
github.com/hashicorp/go-cleanhttp v0.5.2 // indirect
github.com/hashicorp/go-retryablehttp v0.7.3 // indirect
github.com/jstemmer/go-junit-report v0.9.1 // indirect
github.com/karrick/godirwalk v1.15.6 // indirect
github.com/xeipuuv/gojsonpointer v0.0.0-20180127040702-4e3ac2762d5f // indirect
Expand All @@ -33,7 +35,7 @@ require (
golang.org/x/oauth2 v0.0.0-20210413134643-5e61552d6c78 // indirect
golang.org/x/sys v0.16.0 // indirect
golang.org/x/text v0.14.0 // indirect
golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba // indirect
golang.org/x/time v0.3.0 // indirect
golang.org/x/tools v0.17.0 // indirect
google.golang.org/api v0.45.0 // indirect
google.golang.org/appengine v1.6.7 // indirect
Expand Down
Loading

0 comments on commit 6ecc836

Please sign in to comment.