-
Notifications
You must be signed in to change notification settings - Fork 0
/
handler.go
88 lines (74 loc) · 2.68 KB
/
handler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
package handler
import (
"context"
"fmt"
"net/http"
"os"
"time"
"github.com/GoogleCloudPlatform/functions-framework-go/functions"
"github.com/cloudevents/sdk-go/v2/event"
"github.com/tablelandnetwork/basin-storage/pkg/storage"
)
func init() {
// Register a CloudEvent function with the Functions Framework
functions.CloudEvent("Uploader", Uploader)
functions.HTTP("StatusChecker", StatusChecker)
}
// Uploader is the CloudEvent function that is called by the Functions Framework.
// It is triggered by a CloudEvent that is published by the GCS bucket.
// The CloudEvent contains the name of the bucket and the name of the file.
// The file is downloaded from GCS and uploaded to web3.storage.
func Uploader(ctx context.Context, e event.Event) error {
// Set a timeout of 60 minutes, thats the max time a function can run on GCP (gen2)
// we want to ensure larger files can be uploaded
cctx, cancel := context.WithTimeout(ctx, 60*time.Minute)
defer cancel()
// Read config from environment variables
cfg := &storage.UploaderConfig{
W3SToken: os.Getenv("WEB3STORAGE_TOKEN"),
CrdbConn: os.Getenv("CRDB_CONN_STRING"),
}
// Initialize file uploader
u, err := storage.NewFileUploader(cctx, e.Data(), cfg)
if err != nil {
return fmt.Errorf("failed to initialize file uploader: %v", err)
}
// Upload file (from event) to web3.storage
err = u.Upload(cctx)
if err != nil {
return fmt.Errorf("failed to upload file: %v", err)
}
return nil
}
// StatusChecker is the HTTP function that is called by the Functions Framework.
func StatusChecker(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
cfg := &storage.StatusCheckerConfig{
W3SToken: os.Getenv("WEB3STORAGE_TOKEN"),
CrdbConn: os.Getenv("CRDB_CONN_STRING"),
PrivateKey: os.Getenv("PRIVATE_KEY"),
ChainID: os.Getenv("CHAIN_ID"),
BackendURL: "https://api.calibration.node.glif.io/rpc/v1", // TODO: move to config
BasinStorageAddr: "0xaB16d51Fa80EaeAF9668CE102a783237A045FC37", // TODO: move to config
}
if err := r.ParseForm(); err != nil {
errMsg := fmt.Sprintf("failed to parse form: %v", err)
fmt.Println(errMsg)
http.Error(w, errMsg, http.StatusInternalServerError)
return
}
sc, err := storage.NewStatusChecker(ctx, cfg)
if err != nil {
errMsg := fmt.Sprintf("failed to initialize status checker: %v", err)
fmt.Println(errMsg) // todo: enbale proper logging
http.Error(w, errMsg, http.StatusInternalServerError)
return
}
if err = sc.ProcessJobs(ctx); err != nil {
errMsg := fmt.Sprintf("failed to process jobs: %v", err)
fmt.Println(errMsg) // todo: enbale proper logging
http.Error(w, errMsg, http.StatusInternalServerError)
return
}
fmt.Fprintln(w, "OK")
}