diff --git a/.changelog/1386.txt b/.changelog/1386.txt new file mode 100644 index 00000000000..64d0db476af --- /dev/null +++ b/.changelog/1386.txt @@ -0,0 +1,3 @@ +```release-note:enhancement +streams: adds support for stream create parameters for tus upload initiate +``` diff --git a/.changelog/1468.txt b/.changelog/1468.txt new file mode 100644 index 00000000000..3c91f640125 --- /dev/null +++ b/.changelog/1468.txt @@ -0,0 +1,3 @@ +```release-note:enhancement +logpush: Add support for Output Options +``` diff --git a/.github/workflows/coverage.yml b/.github/workflows/coverage.yml deleted file mode 100644 index 39f43fba464..00000000000 --- a/.github/workflows/coverage.yml +++ /dev/null @@ -1,19 +0,0 @@ -on: [pull_request] -name: Coverage -jobs: - coverage: - runs-on: ubuntu-latest - steps: - - name: Checkout code - uses: actions/checkout@v4 - - uses: actions/setup-go@v5 - with: - go-version-file: 'go.mod' - - uses: actions/cache@v4 - with: - path: ~/go/pkg/mod - key: ${{ runner.os }}-go-${{ hashFiles('**/go.mod') }}-${{ hashFiles('**/go.sum') }} - - name: Run coverage - run: go test ./... -coverprofile=coverage.txt -covermode=atomic - - name: Upload coverage to Codecov - uses: codecov/codecov-action@v4.0.0-beta.3 diff --git a/CHANGELOG.md b/CHANGELOG.md index 31cd7c6da95..1caba0c2bf0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,8 @@ ENHANCEMENTS: * dlp: add support for EDM and CWL datasets ([#1485](https://github.com/cloudflare/cloudflare-go/issues/1485)) +* logpush: Add support for Output Options ([#1468](https://github.com/cloudflare/cloudflare-go/issues/1468)) +* streams: adds support for stream create parameters for tus upload initiate ([#1386](https://github.com/cloudflare/cloudflare-go/issues/1386)) BUG FIXES: diff --git a/logpush.go b/logpush.go index 3fbb1af5059..34e32e9cb58 100644 --- a/logpush.go +++ b/logpush.go @@ -12,22 +12,23 @@ import ( // LogpushJob describes a Logpush job. type LogpushJob struct { - ID int `json:"id,omitempty"` - Dataset string `json:"dataset"` - Enabled bool `json:"enabled"` - Kind string `json:"kind,omitempty"` - Name string `json:"name"` - LogpullOptions string `json:"logpull_options"` - DestinationConf string `json:"destination_conf"` - OwnershipChallenge string `json:"ownership_challenge,omitempty"` - LastComplete *time.Time `json:"last_complete,omitempty"` - LastError *time.Time `json:"last_error,omitempty"` - ErrorMessage string `json:"error_message,omitempty"` - Frequency string `json:"frequency,omitempty"` - Filter *LogpushJobFilters `json:"filter,omitempty"` - MaxUploadBytes int `json:"max_upload_bytes,omitempty"` - MaxUploadRecords int `json:"max_upload_records,omitempty"` - MaxUploadIntervalSeconds int `json:"max_upload_interval_seconds,omitempty"` + ID int `json:"id,omitempty"` + Dataset string `json:"dataset"` + Enabled bool `json:"enabled"` + Kind string `json:"kind,omitempty"` + Name string `json:"name"` + LogpullOptions string `json:"logpull_options,omitempty"` + OutputOptions *LogpushOutputOptions `json:"output_options,omitempty"` + DestinationConf string `json:"destination_conf"` + OwnershipChallenge string `json:"ownership_challenge,omitempty"` + LastComplete *time.Time `json:"last_complete,omitempty"` + LastError *time.Time `json:"last_error,omitempty"` + ErrorMessage string `json:"error_message,omitempty"` + Frequency string `json:"frequency,omitempty"` + Filter *LogpushJobFilters `json:"filter,omitempty"` + MaxUploadBytes int `json:"max_upload_bytes,omitempty"` + MaxUploadRecords int `json:"max_upload_records,omitempty"` + MaxUploadIntervalSeconds int `json:"max_upload_interval_seconds,omitempty"` } type LogpushJobFilters struct { @@ -63,6 +64,21 @@ type LogpushJobFilter struct { Value interface{} `json:"value,omitempty"` } +type LogpushOutputOptions struct { + FieldNames []string `json:"field_names"` + OutputType string `json:"output_type,omitempty"` + BatchPrefix string `json:"batch_prefix,omitempty"` + BatchSuffix string `json:"batch_suffix,omitempty"` + RecordPrefix string `json:"record_prefix,omitempty"` + RecordSuffix string `json:"record_suffix,omitempty"` + RecordTemplate string `json:"record_template,omitempty"` + RecordDelimiter string `json:"record_delimiter,omitempty"` + FieldDelimiter string `json:"field_delimiter,omitempty"` + TimestampFormat string `json:"timestamp_format,omitempty"` + SampleRate float64 `json:"sample_rate,omitempty"` + CVE202144228 *bool `json:"CVE-2021-44228,omitempty"` +} + // LogpushJobsResponse is the API response, containing an array of Logpush Jobs. type LogpushJobsResponse struct { Response @@ -323,19 +339,20 @@ func (filter *LogpushJobFilter) Validate() error { } type CreateLogpushJobParams struct { - Dataset string `json:"dataset"` - Enabled bool `json:"enabled"` - Kind string `json:"kind,omitempty"` - Name string `json:"name"` - LogpullOptions string `json:"logpull_options"` - DestinationConf string `json:"destination_conf"` - OwnershipChallenge string `json:"ownership_challenge,omitempty"` - ErrorMessage string `json:"error_message,omitempty"` - Frequency string `json:"frequency,omitempty"` - Filter *LogpushJobFilters `json:"filter,omitempty"` - MaxUploadBytes int `json:"max_upload_bytes,omitempty"` - MaxUploadRecords int `json:"max_upload_records,omitempty"` - MaxUploadIntervalSeconds int `json:"max_upload_interval_seconds,omitempty"` + Dataset string `json:"dataset"` + Enabled bool `json:"enabled"` + Kind string `json:"kind,omitempty"` + Name string `json:"name"` + LogpullOptions string `json:"logpull_options,omitempty"` + OutputOptions *LogpushOutputOptions `json:"output_options,omitempty"` + DestinationConf string `json:"destination_conf"` + OwnershipChallenge string `json:"ownership_challenge,omitempty"` + ErrorMessage string `json:"error_message,omitempty"` + Frequency string `json:"frequency,omitempty"` + Filter *LogpushJobFilters `json:"filter,omitempty"` + MaxUploadBytes int `json:"max_upload_bytes,omitempty"` + MaxUploadRecords int `json:"max_upload_records,omitempty"` + MaxUploadIntervalSeconds int `json:"max_upload_interval_seconds,omitempty"` } type ListLogpushJobsParams struct{} @@ -349,22 +366,23 @@ type GetLogpushFieldsParams struct { } type UpdateLogpushJobParams struct { - ID int `json:"-"` - Dataset string `json:"dataset"` - Enabled bool `json:"enabled"` - Kind string `json:"kind,omitempty"` - Name string `json:"name"` - LogpullOptions string `json:"logpull_options"` - DestinationConf string `json:"destination_conf"` - OwnershipChallenge string `json:"ownership_challenge,omitempty"` - LastComplete *time.Time `json:"last_complete,omitempty"` - LastError *time.Time `json:"last_error,omitempty"` - ErrorMessage string `json:"error_message,omitempty"` - Frequency string `json:"frequency,omitempty"` - Filter *LogpushJobFilters `json:"filter,omitempty"` - MaxUploadBytes int `json:"max_upload_bytes,omitempty"` - MaxUploadRecords int `json:"max_upload_records,omitempty"` - MaxUploadIntervalSeconds int `json:"max_upload_interval_seconds,omitempty"` + ID int `json:"-"` + Dataset string `json:"dataset"` + Enabled bool `json:"enabled"` + Kind string `json:"kind,omitempty"` + Name string `json:"name"` + LogpullOptions string `json:"logpull_options,omitempty"` + OutputOptions *LogpushOutputOptions `json:"output_options,omitempty"` + DestinationConf string `json:"destination_conf"` + OwnershipChallenge string `json:"ownership_challenge,omitempty"` + LastComplete *time.Time `json:"last_complete,omitempty"` + LastError *time.Time `json:"last_error,omitempty"` + ErrorMessage string `json:"error_message,omitempty"` + Frequency string `json:"frequency,omitempty"` + Filter *LogpushJobFilters `json:"filter,omitempty"` + MaxUploadBytes int `json:"max_upload_bytes,omitempty"` + MaxUploadRecords int `json:"max_upload_records,omitempty"` + MaxUploadIntervalSeconds int `json:"max_upload_interval_seconds,omitempty"` } type ValidateLogpushOwnershipChallengeParams struct { diff --git a/logpush_test.go b/logpush_test.go index 72d97578e50..f0872c624ad 100644 --- a/logpush_test.go +++ b/logpush_test.go @@ -32,6 +32,28 @@ const ( "frequency": "high", "max_upload_bytes": 5000000 } +` + serverLogpushJobWithOutputOptionsDescription = `{ + "id": %d, + "dataset": "http_requests", + "kind": "", + "enabled": false, + "name": "example.com", + "output_options": { + "field_names":[ + "RayID", + "ClientIP", + "EdgeStartTimestamp" + ], + "timestamp_format": "rfc3339" + }, + "destination_conf": "s3://mybucket/logs?region=us-west-2", + "last_complete": "%[2]s", + "last_error": "%[2]s", + "error_message": "test", + "frequency": "high", + "max_upload_bytes": 5000000 + } ` serverEdgeLogpushJobDescription = `{ "id": %d, @@ -76,6 +98,26 @@ var ( Frequency: "high", MaxUploadBytes: 5000000, } + expectedLogpushJobWithOutputOptionsStruct = LogpushJob{ + ID: jobID, + Dataset: "http_requests", + Enabled: false, + Name: "example.com", + OutputOptions: &LogpushOutputOptions{ + FieldNames: []string{ + "RayID", + "ClientIP", + "EdgeStartTimestamp", + }, + TimestampFormat: "rfc3339", + }, + DestinationConf: "s3://mybucket/logs?region=us-west-2", + LastComplete: &testLogpushTimestamp, + LastError: &testLogpushTimestamp, + ErrorMessage: "test", + Frequency: "high", + MaxUploadBytes: 5000000, + } expectedEdgeLogpushJobStruct = LogpushJob{ ID: jobID, Dataset: "http_requests", @@ -138,6 +180,10 @@ func TestGetLogpushJob(t *testing.T) { result: serverLogpushJobDescription, want: expectedLogpushJobStruct, }, + "core logpush job with output options": { + result: serverLogpushJobWithOutputOptionsDescription, + want: expectedLogpushJobWithOutputOptionsStruct, + }, "edge logpush job": { result: serverEdgeLogpushJobDescription, want: expectedEdgeLogpushJobStruct, @@ -198,6 +244,40 @@ func TestCreateLogpushJob(t *testing.T) { result: serverLogpushJobDescription, want: expectedLogpushJobStruct, }, + "core logpush job with output options": { + newJob: CreateLogpushJobParams{ + Dataset: "http_requests", + Enabled: false, + Name: "example.com", + OutputOptions: &LogpushOutputOptions{ + FieldNames: []string{ + "RayID", + "ClientIP", + "EdgeStartTimestamp", + }, + TimestampFormat: "rfc3339", + }, + DestinationConf: "s3://mybucket/logs?region=us-west-2", + MaxUploadRecords: 1000, + }, + payload: `{ + "dataset": "http_requests", + "enabled":false, + "name":"example.com", + "output_options": { + "field_names":[ + "RayID", + "ClientIP", + "EdgeStartTimestamp" + ], + "timestamp_format": "rfc3339" + }, + "destination_conf":"s3://mybucket/logs?region=us-west-2", + "max_upload_records": 1000 + }`, + result: serverLogpushJobWithOutputOptionsDescription, + want: expectedLogpushJobWithOutputOptionsStruct, + }, "edge logpush job": { newJob: CreateLogpushJobParams{ Dataset: "http_requests", diff --git a/stream.go b/stream.go index ebb175cb865..38806499624 100644 --- a/stream.go +++ b/stream.go @@ -209,10 +209,12 @@ type StreamInitiateTUSUploadResponse struct { type TUSUploadMetadata struct { Name string `json:"name,omitempty"` + MaxDurationSeconds int `json:"maxDurationSeconds,omitempty"` RequireSignedURLs bool `json:"requiresignedurls,omitempty"` AllowedOrigins string `json:"allowedorigins,omitempty"` ThumbnailTimestampPct float64 `json:"thumbnailtimestamppct,omitempty"` ScheduledDeletion *time.Time `json:"scheduledDeletion,omitempty"` + Expiry *time.Time `json:"expiry,omitempty"` Watermark string `json:"watermark,omitempty"` } @@ -221,6 +223,9 @@ func (t TUSUploadMetadata) ToTUSCsv() (string, error) { if t.Name != "" { metadataValues = append(metadataValues, fmt.Sprintf("%s %s", "name", base64.StdEncoding.EncodeToString([]byte(t.Name)))) } + if t.MaxDurationSeconds != 0 { + metadataValues = append(metadataValues, fmt.Sprintf("%s %s", "maxDurationSeconds", base64.StdEncoding.EncodeToString([]byte(strconv.Itoa(t.MaxDurationSeconds))))) + } if t.RequireSignedURLs { metadataValues = append(metadataValues, "requiresignedurls") } @@ -233,6 +238,9 @@ func (t TUSUploadMetadata) ToTUSCsv() (string, error) { if t.ScheduledDeletion != nil { metadataValues = append(metadataValues, fmt.Sprintf("%s %s", "scheduledDeletion", base64.StdEncoding.EncodeToString([]byte(t.ScheduledDeletion.Format(time.RFC3339))))) } + if t.Expiry != nil { + metadataValues = append(metadataValues, fmt.Sprintf("%s %s", "expiry", base64.StdEncoding.EncodeToString([]byte(t.Expiry.Format(time.RFC3339))))) + } if t.Watermark != "" { metadataValues = append(metadataValues, fmt.Sprintf("%s %s", "watermark", base64.StdEncoding.EncodeToString([]byte(t.Watermark)))) } diff --git a/stream_test.go b/stream_test.go index fec0d4cc00f..510e2bb5a7e 100644 --- a/stream_test.go +++ b/stream_test.go @@ -581,10 +581,21 @@ func TestStream_TUSUploadMetadataToTUSCsv(t *testing.T) { assert.NoError(t, err) assert.Equal(t, "name dGVzdC5tcDQ=,requiresignedurls,allowedorigins ZXhhbXBsZS5jb20=,thumbnailtimestamppct MC41,scheduledDeletion MjAyMy0xMC0wMVQwMjoyMDowMFo=", csv) + expiry, _ := time.Parse(time.RFC3339, "2023-09-25T02:45:00Z") + md.Expiry = &expiry + csv, err = md.ToTUSCsv() + assert.NoError(t, err) + assert.Equal(t, "name dGVzdC5tcDQ=,requiresignedurls,allowedorigins ZXhhbXBsZS5jb20=,thumbnailtimestamppct MC41,scheduledDeletion MjAyMy0xMC0wMVQwMjoyMDowMFo=,expiry MjAyMy0wOS0yNVQwMjo0NTowMFo=", csv) + md.Watermark = "watermark-profile-uid" csv, err = md.ToTUSCsv() assert.NoError(t, err) - assert.Equal(t, "name dGVzdC5tcDQ=,requiresignedurls,allowedorigins ZXhhbXBsZS5jb20=,thumbnailtimestamppct MC41,scheduledDeletion MjAyMy0xMC0wMVQwMjoyMDowMFo=,watermark d2F0ZXJtYXJrLXByb2ZpbGUtdWlk", csv) + assert.Equal(t, "name dGVzdC5tcDQ=,requiresignedurls,allowedorigins ZXhhbXBsZS5jb20=,thumbnailtimestamppct MC41,scheduledDeletion MjAyMy0xMC0wMVQwMjoyMDowMFo=,expiry MjAyMy0wOS0yNVQwMjo0NTowMFo=,watermark d2F0ZXJtYXJrLXByb2ZpbGUtdWlk", csv) + + md.MaxDurationSeconds = 300 + csv, err = md.ToTUSCsv() + assert.NoError(t, err) + assert.Equal(t, "name dGVzdC5tcDQ=,maxDurationSeconds MzAw,requiresignedurls,allowedorigins ZXhhbXBsZS5jb20=,thumbnailtimestamppct MC41,scheduledDeletion MjAyMy0xMC0wMVQwMjoyMDowMFo=,expiry MjAyMy0wOS0yNVQwMjo0NTowMFo=,watermark d2F0ZXJtYXJrLXByb2ZpbGUtdWlk", csv) // empty metadata should return empty string md = TUSUploadMetadata{}