Skip to content

Commit

Permalink
fix: use proper status code to handle warehouse process (#2659)
Browse files Browse the repository at this point in the history
  • Loading branch information
achettyiitr authored Nov 7, 2022
1 parent 0312c55 commit a53657d
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 6 deletions.
22 changes: 16 additions & 6 deletions router/batchrouter/batchrouter.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
stdjson "encoding/json"
"errors"
"fmt"
"io"
"net/http"
"net/url"
"os"
Expand Down Expand Up @@ -58,7 +59,6 @@ var (
mainLoopSleep, diagnosisTickerTime time.Duration
uploadFreqInS int64
objectStorageDestinations []string
warehouseURL string
warehouseServiceFailedTime time.Time
warehouseServiceFailedTimeLock sync.RWMutex
warehouseServiceMaxRetryTime time.Duration
Expand Down Expand Up @@ -120,6 +120,7 @@ type HandleT struct {
successfulJobCount stats.Measurement
failedJobCount stats.Measurement
abortedJobCount stats.Measurement
warehouseURL string

backgroundGroup *errgroup.Group
backgroundCtx context.Context
Expand Down Expand Up @@ -1124,16 +1125,23 @@ func (brt *HandleT) postToWarehouse(batchJobs *BatchJobsT, output StorageUploadO
if err != nil {
brt.logger.Errorf("BRT: Failed to marshal WH staging file payload error:%v", err)
}
uri := fmt.Sprintf(`%s/v1/process`, warehouseURL)
uri := fmt.Sprintf(`%s/v1/process`, brt.warehouseURL)
resp, err := brt.netHandle.Post(uri, "application/json; charset=utf-8",
bytes.NewBuffer(jsonPayload))
if err != nil {
brt.logger.Errorf("BRT: Failed to route staging file URL to warehouse service@%v, error:%v", uri, err)
} else {
brt.logger.Infof("BRT: Routed successfully staging file URL to warehouse service@%v", uri)
defer resp.Body.Close()
return
}
defer func() { _ = resp.Body.Close() }()

if resp.StatusCode == http.StatusOK {
_, err = io.Copy(io.Discard, resp.Body)
brt.logger.Infof("BRT: Routed successfully staging file URL to warehouse service@%v", uri)
} else {
body, _ := io.ReadAll(resp.Body)
err = fmt.Errorf("BRT: Failed to route staging file URL to warehouse service@%v, status: %v, body: %v", uri, resp.Status, string(body))
brt.logger.Error(err)
}
return
}

Expand Down Expand Up @@ -2252,7 +2260,6 @@ func loadConfig() {
config.RegisterInt64ConfigVariable(30, &uploadFreqInS, true, 1, "BatchRouter.uploadFreqInS")
objectStorageDestinations = []string{"S3", "GCS", "AZURE_BLOB", "MINIO", "DIGITAL_OCEAN_SPACES"}
asyncDestinations = []string{"MARKETO_BULK_UPLOAD"}
warehouseURL = misc.GetWarehouseURL()
// Time period for diagnosis ticker
config.RegisterDurationConfigVariable(600, &diagnosisTickerTime, false, time.Second, []string{"Diagnostics.batchRouterTimePeriod", "Diagnostics.batchRouterTimePeriodInS"}...)
config.RegisterDurationConfigVariable(3, &warehouseServiceMaxRetryTime, true, time.Hour, []string{"BatchRouter.warehouseServiceMaxRetryTime", "BatchRouter.warehouseServiceMaxRetryTimeinHr"}...)
Expand Down Expand Up @@ -2295,6 +2302,9 @@ func (brt *HandleT) Setup(backendConfig backendconfig.BackendConfig, jobsDB, err
// error is ignored as context.TODO() is passed, err is not expected.
_ = brt.reporting.WaitForSetup(context.TODO(), types.CORE_REPORTING_CLIENT)
}
if brt.warehouseURL == "" {
brt.warehouseURL = misc.GetWarehouseURL()
}

brt.inProgressMap = map[string]bool{}
brt.lastExecMap = map[string]int64{}
Expand Down
70 changes: 70 additions & 0 deletions router/batchrouter/batchrouter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,16 @@ package batchrouter

import (
"context"
jsonb "encoding/json"
"errors"
"fmt"
"net/http"
"net/http/httptest"
"testing"
"time"

"github.com/stretchr/testify/require"

"github.com/gofrs/uuid"
"github.com/golang/mock/gomock"
. "github.com/onsi/ginkgo/v2"
Expand Down Expand Up @@ -300,3 +307,66 @@ func assertJobStatus(job *jobsdb.JobT, status *jobsdb.JobStatusT, expectedState,
Expect(status.ExecTime).To(BeTemporally("~", time.Now(), 10*time.Second))
Expect(status.AttemptNum).To(Equal(attemptNum))
}

func TestPostToWarehouse(t *testing.T) {
// TOT: Decouple this test from the actual warehouse
inputs := []struct {
name string
responseCode int
responseBody string
expectedError error
}{
{
name: "should successfully post to warehouse",
responseBody: "OK",
responseCode: http.StatusOK,
},
{
name: "should fail to post to warehouse",
responseCode: http.StatusNotFound,
responseBody: "Not Found",
expectedError: errors.New("BRT: Failed to route staging file URL to warehouse service@%s/v1/process, status: 404 Not Found, body: Not Found"),
},
}
for _, input := range inputs {
t.Run(input.name, func(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(input.responseCode)
_, _ = w.Write([]byte(input.responseBody))
}))
t.Cleanup(ts.Close)

job := HandleT{
netHandle: ts.Client(),
logger: logger.NOP,
warehouseURL: ts.URL,
}
batchJobs := BatchJobsT{
Jobs: []*jobsdb.JobT{
{
EventPayload: jsonb.RawMessage(`
{
"receivedAt": "2019-10-12T07:20:50.52Z",
"metadata": {
"columns": {
"id": "string"
},
"table": "tracks"
}
}
`),
WorkspaceId: "test-workspace",
Parameters: jsonb.RawMessage(`{}`),
},
},
BatchDestination: &DestinationT{},
}
err := job.postToWarehouse(&batchJobs, StorageUploadOutput{})
if input.expectedError != nil {
require.Equal(t, fmt.Sprintf(input.expectedError.Error(), ts.URL), err.Error())
} else {
require.NoError(t, err)
}
})
}
}

0 comments on commit a53657d

Please sign in to comment.