Skip to content

Commit

Permalink
Reject jobs that are submitted by other jobs
Browse files Browse the repository at this point in the history
Now that we have networking, it is possible for networked jobs to run
Bacalhau themselves and trigger new jobs. Whilst this is a powerful
feature, it can also lead to a “fork bomb”-style exhaustion attack as
jobs could clone themselves with ever increasing concurrency. This is
particularly dangerous as it would require taking down a majority of
the nodes on the network to prevent the jobs from respawning.

Now, we append a header to all outbound HTTP requests made by a job
using HTTP networking that signals that the HTTP request is coming
from a running job. We configure the submission endpoint to reject
any job creation request that contains this header. In this way, we
now prevent jobs from creating other jobs.

When we have paid jobs, this is less of an issue as the payment
channel will eventually exhaust. We can later selectively add the
header only for unpaid jobs, but for now it covers all jobs.
  • Loading branch information
simonwo committed Jan 9, 2023
1 parent e76e6ef commit 9eaca5b
Show file tree
Hide file tree
Showing 4 changed files with 80 additions and 49 deletions.
5 changes: 5 additions & 0 deletions pkg/executor/docker/gateway/gateway.sh
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@ for IFACE in $(ip --json address show | jq -rc '.[] | .ifname'); do
tc qdisc add dev $IFACE root tbf rate 1mbit burst 32kbit latency 10sec
done

# Add Bacalhau job ID to outgoing requests. We can use this to detect jobs
# trying to spawn other jobs.
echo request_header_access X-Bacalhau-Job-ID deny all > /etc/squid/conf.d/bac-job.conf
echo request_header_add X-Bacalhau-Job-ID "$BACALHAU_JOB_ID" all >> /etc/squid/conf.d/bac-job.conf

# Now that everything is configured, run Squid.
squid -d2
sleep 1
Expand Down
9 changes: 7 additions & 2 deletions pkg/publicapi/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,17 @@ const APIShortTimeoutSeconds = 10

// APIClient is a utility for interacting with a node's API server.
type APIClient struct {
BaseURI string
BaseURI string
DefaultHeaders map[string]string

client *http.Client
}

// NewAPIClient returns a new client for a node's API server.
func NewAPIClient(baseURI string) *APIClient {
return &APIClient{
BaseURI: baseURI,
BaseURI: baseURI,
DefaultHeaders: map[string]string{},

client: &http.Client{
Timeout: 300 * time.Second,
Expand Down Expand Up @@ -327,6 +329,9 @@ func (apiClient *APIClient) post(ctx context.Context, api string, reqData, resDa
return bacerrors.NewResponseUnknownError(fmt.Errorf("publicapi: error creating post request: %v", err))
}
req.Header.Set("Content-type", "application/json")
for header, value := range apiClient.DefaultHeaders {
req.Header.Set(header, value)
}
req.Close = true // don't keep connections lying around

var res *http.Response
Expand Down
104 changes: 57 additions & 47 deletions pkg/publicapi/endpoints_submit.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ package publicapi

import (
"bytes"
"context"
"encoding/base64"
"encoding/json"
"fmt"
"net/http"
"os"
"path/filepath"
Expand All @@ -14,6 +16,7 @@ import (
"github.com/filecoin-project/bacalhau/pkg/publicapi/handlerwrapper"
"github.com/filecoin-project/bacalhau/pkg/system"
"github.com/filecoin-project/bacalhau/pkg/util/targzip"
"github.com/pkg/errors"
"github.com/rs/zerolog/log"
"go.opentelemetry.io/otel/attribute"
)
Expand Down Expand Up @@ -49,6 +52,13 @@ func (apiServer *APIServer) submit(res http.ResponseWriter, req *http.Request) {
ctx, span := system.GetSpanFromRequest(req, "pkg/apiServer.submit")
defer span.End()

if otherJobID := req.Header.Get("X-Bacalhau-Job-ID"); otherJobID != "" {
err := fmt.Errorf("rejecting job because HTTP header X-Bacalhau-Job-ID was set")
log.Ctx(ctx).Info().Str("X-Bacalhau-Job-ID", otherJobID).Err(err).Send()
http.Error(res, bacerrors.ErrorToErrorResponse(err), http.StatusBadRequest)
return
}

var submitReq submitRequest
if err := json.NewDecoder(req.Body).Decode(&submitReq); err != nil {
log.Ctx(ctx).Debug().Msgf("====> Decode submitReq error: %s", err)
Expand All @@ -73,56 +83,16 @@ func (apiServer *APIServer) submit(res http.ResponseWriter, req *http.Request) {

// If we have a build context, pin it to IPFS and mount it in the job:
if submitReq.JobCreatePayload.Context != "" {
// TODO: gc pinned contexts
decoded, err := base64.StdEncoding.DecodeString(submitReq.JobCreatePayload.Context)
if err != nil {
log.Ctx(ctx).Debug().Msgf("====> DecodeContext error: %s", err)
errorResponse := bacerrors.ErrorToErrorResponse(err)
http.Error(res, errorResponse, http.StatusInternalServerError)
return
}

tmpDir, err := os.MkdirTemp("", "bacalhau-pin-context-")
if err != nil {
log.Ctx(ctx).Debug().Msgf("====> Create tmp dir error: %s", err)
errorResponse := bacerrors.ErrorToErrorResponse(err)
http.Error(res, errorResponse, http.StatusInternalServerError)
return
}

tarReader := bytes.NewReader(decoded)
err = targzip.Decompress(tarReader, filepath.Join(tmpDir, "context"))
if err != nil {
log.Ctx(ctx).Debug().Msgf("====> Decompress error: %s", err)
errorResponse := bacerrors.ErrorToErrorResponse(err)
http.Error(res, errorResponse, http.StatusInternalServerError)
return
}

// write the "context" for a job to storage
// this is used to upload code files
// we presently just fix on ipfs to do this
ipfsStorage, err := apiServer.StorageProviders.GetStorage(ctx, model.StorageSourceIPFS)
if err != nil {
log.Ctx(ctx).Debug().Msgf("====> GetStorage error: %s", err)
http.Error(res, err.Error(), http.StatusInternalServerError)
return
}
result, err := ipfsStorage.Upload(ctx, filepath.Join(tmpDir, "context"))
spec, err := apiServer.saveInlineTarball(ctx, submitReq.JobCreatePayload.Context)
if err != nil {
log.Ctx(ctx).Debug().Msgf("====> PinContext error: %s", err)
errorResponse := bacerrors.ErrorToErrorResponse(err)
http.Error(res, errorResponse, http.StatusInternalServerError)
log.Ctx(ctx).Error().Err(err).Msg("error saving build context")
http.Error(res, bacerrors.ErrorToErrorResponse(err), http.StatusInternalServerError)
return
}

// NOTE(luke): we could do some kind of storage multiaddr here, e.g.:
// --cid ipfs:abc --cid filecoin:efg
submitReq.JobCreatePayload.Spec.Contexts = append(submitReq.JobCreatePayload.Spec.Contexts, model.StorageSpec{
StorageSource: model.StorageSourceIPFS,
CID: result.CID,
Path: "/job",
})
submitReq.JobCreatePayload.Spec.Contexts = append(
submitReq.JobCreatePayload.Spec.Contexts,
spec,
)
}

j, err := apiServer.Requester.SubmitJob(
Expand All @@ -146,3 +116,43 @@ func (apiServer *APIServer) submit(res http.ResponseWriter, req *http.Request) {
return
}
}

func (apiServer *APIServer) saveInlineTarball(ctx context.Context, base64tar string) (model.StorageSpec, error) {
// TODO: gc pinned contexts
decoded, err := base64.StdEncoding.DecodeString(base64tar)
if err != nil {
return model.StorageSpec{}, errors.Wrap(err, "error base64 decoding context")
}

tmpDir, err := os.MkdirTemp("", "bacalhau-pin-context-")
if err != nil {
return model.StorageSpec{}, errors.Wrap(err, "error creating temp dir")
}

tarReader := bytes.NewReader(decoded)
err = targzip.Decompress(tarReader, filepath.Join(tmpDir, "context"))
if err != nil {
return model.StorageSpec{}, errors.Wrap(err, "error decompressing context")
}

// write the "context" for a job to storage
// this is used to upload code files
// we presently just fix on ipfs to do this
ipfsStorage, err := apiServer.StorageProviders.GetStorage(ctx, model.StorageSourceIPFS)
if err != nil {
return model.StorageSpec{}, errors.Wrap(err, "error getting storage provider")
}

result, err := ipfsStorage.Upload(ctx, filepath.Join(tmpDir, "context"))
if err != nil {
return model.StorageSpec{}, errors.Wrap(err, "error uploading context to IPFS")
}

// NOTE(luke): we could do some kind of storage multiaddr here, e.g.:
// --cid ipfs:abc --cid filecoin:efg
return model.StorageSpec{
StorageSource: model.StorageSourceIPFS,
CID: result.CID,
Path: "/job",
}, nil
}
11 changes: 11 additions & 0 deletions pkg/test/publicapi/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/filecoin-project/bacalhau/pkg/publicapi"
testutils "github.com/filecoin-project/bacalhau/pkg/test/utils"
"github.com/filecoin-project/bacalhau/pkg/types"
"github.com/google/uuid"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
)
Expand Down Expand Up @@ -68,6 +69,16 @@ func (s *ServerSuite) TestList() {
require.Len(s.T(), jobs, 1)
}

func (s *ServerSuite) TestSubmitRejectsJobWithSigilHeader() {
j := testutils.MakeNoopJob()
jobID, err := uuid.NewRandom()
require.NoError(s.T(), err)

s.client.DefaultHeaders["X-Bacalhau-Job-ID"] = jobID.String()
_, err = s.client.Submit(context.Background(), j, nil)
require.Error(s.T(), err)
}

func (s *ServerSuite) TestHealthz() {
rawHealthData := s.testEndpoint(s.T(), "/healthz", "FreeSpace")

Expand Down

0 comments on commit 9eaca5b

Please sign in to comment.