Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug]: Windowed Streaming OnTimer State Wiped #32599

Open
2 of 17 tasks
SamStentz opened this issue Sep 30, 2024 · 2 comments
Open
2 of 17 tasks

[Bug]: Windowed Streaming OnTimer State Wiped #32599

SamStentz opened this issue Sep 30, 2024 · 2 comments

Comments

@SamStentz
Copy link

What happened?

OnTimer for processing time on streaming dataflow runner has its state wiped after indeterminate number of retries when windowing applied on pcollection.

I believe compatibility matrix for streaming dataflow says this should work.

I am adding an unbounded source to pipeline separate from this pardo to get dataflow to launch pipeline as streaming.

using github.com/apache/beam/sdks/v2 v2.59.0

donothing.go

package poc

import (
	"context"
	"reflect"
	"time"

	"github.com/apache/beam/sdks/v2/go/pkg/beam"
	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime"
	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/state"
	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/timers"
	"github.com/apache/beam/sdks/v2/go/pkg/beam/log"
)

func init() {
	runtime.RegisterFunction(NewDoNothingTransform)
	runtime.RegisterFunction(DoNothingTransform)
	runtime.RegisterType(reflect.TypeOf((*doNothingTransformFn)(nil)).Elem())
	runtime.RegisterType(reflect.TypeOf((*context.Context)(nil)).Elem())
}

type doNothingTransformFn struct {
	TimerCount      state.Value[int64]
	TimerTimerstamp state.Value[int64]
	Value           state.Value[string]
	OutputState     timers.ProcessingTime
}

func NewDoNothingTransform() *doNothingTransformFn {
	return &doNothingTransformFn{
		TimerCount:      state.MakeValueState[int64]("timerCount"),
		TimerTimerstamp: state.MakeValueState[int64]("timerTimerstamp"),
		Value:           state.MakeValueState[string]("value"),
		OutputState:     timers.InProcessingTime("processingTime"),
	}
}

func (fn *doNothingTransformFn) ProcessElement(ctx context.Context,
	et beam.EventTime,
	sp state.Provider, tp timers.Provider,
	k beam.X,
	v string,
	_ func(string)) {

	log.Infof(ctx, "ProcessElement <%v, %v>", k, v)

	// Set state.
	err := fn.TimerCount.Write(sp, 0)
	if err != nil {
		log.Error(ctx, "couldn't set TimerCount state")
		return
	}
	err = fn.TimerTimerstamp.Write(sp, et.ToTime().UnixMilli())
	if err != nil {
		log.Error(ctx, "couldn't set TimerTimerstamp state")
		return
	}
	err = fn.Value.Write(sp, v)
	if err != nil {
		log.Error(ctx, "couldn't set Value state")
		return
	}
	// Set timer.
	fn.OutputState.Set(tp, time.Now().Add(100*time.Millisecond), timers.WithOutputTimestamp(et.ToTime()))
}

func (fn *doNothingTransformFn) OnTimer(ctx context.Context,
	sp state.Provider, tp timers.Provider,
	k beam.X, timer timers.Context,
	emit func(string)) {
	// Read state.
	tc, tcp, err := fn.TimerCount.Read(sp)
	if err != nil || !tcp {
		log.Error(ctx, "couldn't read TimerCount state")
		return
	}
	timerTimerstamp, tsp, err := fn.TimerTimerstamp.Read(sp)
	if err != nil || !tsp {
		log.Error(ctx, "couldn't read TimerTimerstamp state")
		return
	}
	ts := time.UnixMilli(timerTimerstamp)
	v, vp, err := fn.Value.Read(sp)
	if err != nil || !vp {
		log.Error(ctx, "couldn't read Value state")
		return
	}
	log.Infof(ctx, "OnTimer <%v, %v>: count %d", k, v, tc)
	// terminating condition of 10 calls.
	if tc > 10 {
		emit(v)
		return
	}
	// Set time.
	newTime := time.Now().Add(time.Second)
	fn.OutputState.Set(tp, newTime, timers.WithOutputTimestamp(ts))
	err = fn.TimerCount.Write(sp, tc+1)
	if err != nil {
		log.Errorf(ctx, "error writing timer count: %v", err)
	}
}

func (fn *doNothingTransformFn) FinishBundle(ctx context.Context, _ func(string)) error {
	log.Infof(ctx, "FinishBundle")
	return nil
}

func DoNothingTransform(scope beam.Scope, in beam.PCollection) beam.PCollection {
	return beam.ParDo(scope.Scope("DoNothing"), NewDoNothingTransform(), in)
}

main.go

        beamPipeline, scope := beam.NewPipelineWithRoot()

	col := beam.CreateList(scope, []string{"foo"})
	col = beam.AddFixedKey(scope, col)
	// Windowing causes breakage.
	col = beam.WindowInto(scope, window.NewFixedWindows(5*time.Second), col)
	poc.DoNothingTransform(scope, col)
	
	// Launch pipeline.
	if err := beamx.Run(ctx, beamPipeline); err != nil {
		log.Fatalf(ctx, "failed to invoke pipeline: %v", err)
	}

Dataflow Logs

ps: newest log at top

With windowing

INFO 2024-09-30T16:23:17.580082614Z FinishBundle
ERROR 2024-09-30T16:23:17.579986274Z couldn't read TimerCount state
INFO 2024-09-30T16:23:16.113761331Z FinishBundle
INFO 2024-09-30T16:23:16.112689853Z OnTimer <0, foo>: count 0
INFO 2024-09-30T16:23:12.872389291Z FinishBundle
INFO 2024-09-30T16:23:12.849716490Z ProcessElement <0, foo>

Without windowing

INFO 2024-09-30T16:09:40.198956129Z FinishBundle
INFO 2024-09-30T16:09:40.198888635Z OnTimer <0, foo>: count 11
INFO 2024-09-30T16:09:38.175116421Z FinishBundle
INFO 2024-09-30T16:09:38.174460658Z OnTimer <0, foo>: count 10
INFO 2024-09-30T16:09:36.326253942Z FinishBundle
INFO 2024-09-30T16:09:36.325537572Z OnTimer <0, foo>: count 9
INFO 2024-09-30T16:09:33.816857708Z FinishBundle
INFO 2024-09-30T16:09:33.816091109Z OnTimer <0, foo>: count 8
INFO 2024-09-30T16:09:31.839844866Z FinishBundle
INFO 2024-09-30T16:09:31.839181196Z OnTimer <0, foo>: count 7
INFO 2024-09-30T16:09:29.883666116Z FinishBundle
INFO 2024-09-30T16:09:29.883057679Z OnTimer <0, foo>: count 6
INFO 2024-09-30T16:09:28.548606660Z FinishBundle
INFO 2024-09-30T16:09:28.547808988Z OnTimer <0, foo>: count 5
INFO 2024-09-30T16:09:26.069481677Z FinishBundle
INFO 2024-09-30T16:09:26.068731232Z OnTimer <0, foo>: count 4
INFO 2024-09-30T16:09:23.978673064Z FinishBundle
INFO 2024-09-30T16:09:23.977990148Z OnTimer <0, foo>: count 3
INFO 2024-09-30T16:09:22.185697823Z FinishBundle
INFO 2024-09-30T16:09:22.185073934Z OnTimer <0, foo>: count 2
INFO 2024-09-30T16:09:20.249364760Z FinishBundle
INFO 2024-09-30T16:09:20.248688085Z OnTimer <0, foo>: count 1
INFO 2024-09-30T16:09:17.992121355Z FinishBundle
INFO 2024-09-30T16:09:17.991398544Z OnTimer <0, foo>: count 0
INFO 2024-09-30T16:09:15.002338211Z FinishBundle
INFO 2024-09-30T16:09:14.982766549Z ProcessElement <0, foo>

Issue Priority

Priority: 2 (default / most bugs should be filed as P2)

Issue Components

  • Component: Python SDK
  • Component: Java SDK
  • Component: Go SDK
  • Component: Typescript SDK
  • Component: IO connector
  • Component: Beam YAML
  • Component: Beam examples
  • Component: Beam playground
  • Component: Beam katas
  • Component: Website
  • Component: Infrastructure
  • Component: Spark Runner
  • Component: Flink Runner
  • Component: Samza Runner
  • Component: Twister2 Runner
  • Component: Hazelcast Jet Runner
  • Component: Google Cloud Dataflow Runner
@SamStentz
Copy link
Author

SamStentz commented Sep 30, 2024

A related note for Prism Runner for testing (don't see a tag for prism runner would have added otherwise)

donothing_test.go

package poc

import (
	"testing"

	"github.com/apache/beam/sdks/v2/go/pkg/beam"

	"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert"
	"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/ptest"
)

func TestDoNothingTransform(t *testing.T) {
	beam.Init()
	pipeline, scope, col := ptest.CreateList([]string{"foo"})

	col = beam.AddFixedKey(scope, col)
	col = DoNothingTransform(scope, col)

	// If I comment out this passert all retries don't occur.
	passert.EqualsList(scope, col, []string{"foo"})
	ptest.RunAndValidate(t, pipeline)
}

func TestMain(m *testing.M) {
	ptest.MainWithDefault(m, "prism")
}

I am seeing similar behavior where logs show all retries don't execute if I don't include the passert clause.

With passert

Running tool: /usr/lib/google-golang/bin/go test -timeout 30s -run ^TestDoNothingTransform$ poc

=== RUN   TestDoNothingTransform
2024/09/27 22:21:21 INFO Serving JobManagement endpoint=localhost:41941
2024/09/27 22:21:21 starting Loopback server at 127.0.0.1:41841
2024/09/27 22:21:21 components:{transforms:{key:"e1" value:{unique_name:"Impulse" spec:{urn:"beam:transform:impulse:v1"} outputs:{key:"i0" value:"n1"}}} transforms:{key:"e10" value:{unique_name:"passert.EqualsList/passert.failIfBadEntries" spec:{urn:"beam:transform:pardo:v1" payload:"\n\xe2\x02\n\x19beam:go:transform:dofn:v1\x1a\xc4\x02ChliZWFtOmdvOnRyYW5zZm9ybTpkb2ZuOnYxEtQBCpgBCpUBCktnaXRodWIuY29tL2FwYWNoZS9iZWFtL3Nka3MvdjIvZ28vcGtnL2JlYW0vdGVzdGluZy9wYXNzZXJ0LmZhaWxJZkJhZEVudHJpZXMSRggWIgYIFBICCAgiEAgWIggIGBIECBlADyoCCAEiEAgWIggIGBIECBlADyoCCAEiEAgWIggIGBIECBlADyoCCAEqBAgZQAESDAgBEggKBggUEgIICBIKCAYSBgoECBlADxIKCAYSBgoECBlADxIKCAYSBgoECBlADyIFUGFyRG8=\x1aO\n\x02i1\x12I\n\x1d\n\x1bbeam:side_input:iterable:v1\x12\x05\n\x03foo\x1a!\n\x1fbeam:go:windowmapping:global:v1\x1aO\n\x02i2\x12I\n\x1d\n\x1bbeam:side_input:iterable:v1\x12\x05\n\x03foo\x1a!\n\x1fbeam:go:windowmapping:global:v1\x1aO\n\x02i3\x12I\n\x1d\n\x1bbeam:side_input:iterable:v1\x12\x05\n\x03foo\x1a!\n\x1fbeam:go:windowmapping:global:v1"} inputs:{key
:"i0" value:"n11"} inputs:{key:"i1" value:"n8"} inputs:{key:"i2" value:"n9"} inputs:{key:"i3" value:"n10"} environment_id:"go"}} transforms:{key:"e2" value:{unique_name:"beam.createFn" spec:{urn:"beam:transform:pardo:v1" payload:"\n\xfa\x01\n\x19beam:go:transform:dofn:v1\x1a\xdc\x01ChliZWFtOmdvOnRyYW5zZm9ybTpkb2ZuOnYxEoUBCmQSOwgYEjcIGkozZ2l0aHViLmNvbS9hcGFjaGUvYmVhbS9zZGtzL3YyL2dvL3BrZy9iZWFtLmNyZWF0ZUZuGiV7InZhbHVlcyI6WyJBMlp2Ync9PSJdLCJ0eXBlIjoiQ0F3PSJ9EgwIARIICgYIFBICCAgaCAoGCgQIGUAPIgVQYXJEbw=="} inputs:{key:"i0" value:"n1"} outputs:{key:"i0" value:"n2"} environment_id:"go"}} transforms:{key:"e3" value:{unique_name:"beam.addFixedKeyFn" spec:{urn:"beam:transform:pardo:v1" payload:"\n\xee\x01\n\x19beam:go:transform:dofn:v1\x1a\xd0\x01ChliZWFtOmdvOnRyYW5zZm9ybTpkb2ZuOnYxEn0KUApOCjhnaXRodWIuY29tL2FwYWNoZS9iZWFtL3Nka3MvdjIvZ28vcGtnL2JlYW0uYWRkRml4ZWRLZXlGbhISCBYiBAgZQA8qAggCKgQIGUAPEgoIARIGCgQIGUAPGhYKFAoECBlACxIECgIIAhIGCgQIGUAPIgVQYXJEbw=="} inputs:{key:"i0" value:"n2"} outputs:{key:"i0" value:"n3"} environm
ent_id:"go"}} transforms:{key:"e4" value:{unique_name:"DoNothing/poc.doNothingTransformFn" spec:{urn:"beam:transform:pardo:v1" payload:"\n\xc2\x03\n\x19beam:go:transform:dofn:v1\x1a\xa4\x03ChliZWFtOmdvOnRyYW5zZm9ybTpkb2ZuOnYxEpsCCu8BElkIGBJVCBpKUWdpdGh1Yi5jb20vdmVyaWx5LXNyYy92ZXJpbHkxL2luZ2VzdGlvbi9waXBlbGluZS9zcmMvcGtnL3BvYy5kb05vdGhpbmdUcmFuc2Zvcm1GbhqRAXsiVGltZXJDb3VudCI6eyJLZXkiOiJ0aW1lckNvdW50In0sIlRpbWVyVGltZXJzdGFtcCI6eyJLZXkiOiJ0aW1lclRpbWVyc3RhbXAifSwiVmFsdWUiOnsiS2V5IjoidmFsdWUifSwiT3V0cHV0U3RhdGUiOnsiRmFtaWx5IjoicHJvY2Vzc2luZ1RpbWUifX0SGAgBEhQKBAgZQAsSBgoECBlAExIECgIIDBoGCgQKAggMIgVQYXJEbw==\".\n\ntimerCount\x12 :\x18\n\x16beam:user_state:bag:v1\n\x04\n\x02c6\"3\n\x0ftimerTimerstamp\x12 :\x18\n\x16beam:user_state:bag:v1\n\x04\n\x02c6\")\n\x05value\x12 :\x18\n\x16beam:user_state:bag:v1\n\x04\n\x02c2J\x18\n\x0eprocessingTime\x12\x06\x08\x02\x12\x02c7"} inputs:{key:"i0" value:"n3"} outputs:{key:"i0" value:"n4"} environment_id:"go"}} transforms:{key:"e5" value:{unique_name:"passert.EqualsList/Impulse"
spec:{urn:"beam:transform:impulse:v1"} outputs:{key:"i0" value:"n5"}}} transforms:{key:"e6" value:{unique_name:"passert.EqualsList/beam.createFn" spec:{urn:"beam:transform:pardo:v1" payload:"\n\xfa\x01\n\x19beam:go:transform:dofn:v1\x1a\xdc\x01ChliZWFtOmdvOnRyYW5zZm9ybTpkb2ZuOnYxEoUBCmQSOwgYEjcIGkozZ2l0aHViLmNvbS9hcGFjaGUvYmVhbS9zZGtzL3YyL2dvL3BrZy9iZWFtLmNyZWF0ZUZuGiV7InZhbHVlcyI6WyJBMlp2Ync9PSJdLCJ0eXBlIjoiQ0F3PSJ9EgwIARIICgYIFBICCAgaCAoGCgQIGUAPIgVQYXJEbw=="} inputs:{key:"i0" value:"n5"} outputs:{key:"i0" value:"n6"} environment_id:"go"}} transforms:{key:"e7" value:{unique_name:"passert.EqualsList/Impulse'1" spec:{urn:"beam:transform:impulse:v1"} outputs:{key:"i0" value:"n7"}}} transforms:{key:"e8" value:{unique_name:"passert.EqualsList/passert.diffFn" spec:{urn:"beam:transform:pardo:v1" payload:"\n\xaa\x02\n\x19beam:go:transform:dofn:v1\x1a\x8c\x02ChliZWFtOmdvOnRyYW5zZm9ybTpkb2ZuOnYxEqkBClwSSQgYEkUIGkpBZ2l0aHViLmNvbS9hcGFjaGUvYmVhbS9zZGtzL3YyL2dvL3BrZy9iZWFtL3Rlc3RpbmcvcGFzc2VydC5kaWZmRm4aD3sidHlwZSI6IkNB
dz0ifRIMCAESCAoGCBQSAggIEgoIBhIGCgQIGUAPEgoIBhIGCgQIGUAPGggKBgoECBlADxoICgYKBAgZQA8aCAoGCgQIGUAPIgVQYXJEbw==\x1aO\n\x02i1\x12I\n\x1d\n\x1bbeam:side_input:iterable:v1\x12\x05\n\x03foo\x1a!\n\x1fbeam:go:windowmapping:global:v1\x1aO\n\x02i2\x12I\n\x1d\n\x1bbeam:side_input:iterable:v1\x12\x05\n\x03foo\x1a!\n\x1fbeam:go:windowmapping:global:v1"} inputs:{key:"i0" value:"n7"} inputs:{key:"i1" value:"n4"} inputs:{key:"i2" value:"n6"} outputs:{key:"i0" value:"n8"} outputs:{key:"i1" value:"n9"} outputs:{key:"i2" value:"n10"} environment_id:"go"}} transforms:{key:"e9" value:{unique_name:"passert.EqualsList/Impulse'2" spec:{urn:"beam:transform:impulse:v1"} outputs:{key:"i0" value:"n11"}}} transforms:{key:"s1" value:{unique_name:"DoNothing" subtransforms:"e4" inputs:{key:"n3" value:"n3"} outputs:{key:"n4" value:"n4"} environment_id:"go"}} transforms:{key:"s2" value:{unique_name:"passert.EqualsList" subtransforms:"e9" subtransforms:"e7" subtransforms:"e5" subtransforms:"e6" subtransforms:"e8" subtransforms:"e10" inputs:{ke
y:"n4" value:"n4"} environment_id:"go"}} pcollections:{key:"n1" value:{unique_name:"n1" coder_id:"c0" is_bounded:BOUNDED windowing_strategy_id:"w0"}} pcollections:{key:"n10" value:{unique_name:"n10" coder_id:"c2" is_bounded:BOUNDED windowing_strategy_id:"w0"}} pcollections:{key:"n11" value:{unique_name:"n11" coder_id:"c0" is_bounded:BOUNDED windowing_strategy_id:"w0"}} pcollections:{key:"n2" value:{unique_name:"n2" coder_id:"c2" is_bounded:BOUNDED windowing_strategy_id:"w0"}} pcollections:{key:"n3" value:{unique_name:"n3" coder_id:"c5" is_bounded:BOUNDED windowing_strategy_id:"w0"}} pcollections:{key:"n4" value:{unique_name:"n4" coder_id:"c2" is_bounded:BOUNDED windowing_strategy_id:"w0"}} pcollections:{key:"n5" value:{unique_name:"n5" coder_id:"c0" is_bounded:BOUNDED windowing_strategy_id:"w0"}} pcollections:{key:"n6" value:{unique_name:"n6" coder_id:"c2" is_bounded:BOUNDED windowing_strategy_id:"w0"}} pcollections:{key:"n7" value:{unique_name:"n7" coder_id:"c0" is_bounded:BOUNDED windowing_strategy_id:"w0"}
} pcollections:{key:"n8" value:{unique_name:"n8" coder_id:"c2" is_bounded:BOUNDED windowing_strategy_id:"w0"}} pcollections:{key:"n9" value:{unique_name:"n9" coder_id:"c2" is_bounded:BOUNDED windowing_strategy_id:"w0"}} windowing_strategies:{key:"w0" value:{window_fn:{urn:"beam:window_fn:global_windows:v1"} merge_status:NON_MERGING window_coder_id:"c1" trigger:{default:{}} accumulation_mode:DISCARDING output_time:END_OF_WINDOW closing_behavior:EMIT_IF_NONEMPTY on_time_behavior:FIRE_IF_NONEMPTY environment_id:"go"}} coders:{key:"c0" value:{spec:{urn:"beam:coder:bytes:v1"}}} coders:{key:"c1" value:{spec:{urn:"beam:coder:global_window:v1"}}} coders:{key:"c2" value:{spec:{urn:"beam:coder:string_utf8:v1"}}} coders:{key:"c3" value:{spec:{urn:"beam:go:coder:custom:v1" payload:"Cgd2YXJpbnR6EgIIAhpdCklnaXRodWIuY29tL2FwYWNoZS9iZWFtL3Nka3MvdjIvZ28vcGtnL2JlYW0vY29yZS9ydW50aW1lL2NvZGVyeC5lbmNWYXJJbnRaEhAIFiIECBlADyoGCBQSAggIImkKSWdpdGh1Yi5jb20vYXBhY2hlL2JlYW0vc2Rrcy92Mi9nby9wa2cvYmVhbS9jb3JlL3J1bnRpbWUvY29kZXJ4LmRlY1Zhckl
udFoSHAgWIgQIGUADIgYIFBICCAgqBAgZQA8qBAgZQAE="}}} coders:{key:"c4" value:{spec:{urn:"beam:coder:length_prefix:v1"} component_coder_ids:"c3"}} coders:{key:"c5" value:{spec:{urn:"beam:coder:kv:v1"} component_coder_ids:"c4" component_coder_ids:"c2"}} coders:{key:"c6" value:{spec:{urn:"beam:coder:varint:v1"}}} coders:{key:"c7" value:{spec:{urn:"beam:coder:timer:v1"} component_coder_ids:"c4" component_coder_ids:"c1"}} environments:{key:"go" value:{urn:"beam:env:external:v1" payload:"\n\x11\n\x0flocalhost:41841" capabilities:"beam:protocol:progress_reporting:v1" capabilities:"beam:protocol:multi_core_bundle_processing:v1" capabilities:"beam:transform:sdf_truncate_sized_restrictions:v1" capabilities:"beam:protocol:worker_status:v1" capabilities:"beam:protocol:monitoring_info_short_ids:v1" capabilities:"beam:version:sdk_base:go:apache/beam_go_sdk:2.59.0" capabilities:"beam:transform:to_string:v1" capabilities:"beam:protocol:data_sampling:v1" capabilities:"beam:protocol:sdk_consuming_received_data:v1" capabilities:"be
am:coder:bytes:v1" capabilities:"beam:coder:bool:v1" capabilities:"beam:coder:varint:v1" capabilities:"beam:coder:double:v1" capabilities:"beam:coder:string_utf8:v1" capabilities:"beam:coder:length_prefix:v1" capabilities:"beam:coder:kv:v1" capabilities:"beam:coder:iterable:v1" capabilities:"beam:coder:state_backed_iterable:v1" capabilities:"beam:coder:windowed_value:v1" capabilities:"beam:coder:global_window:v1" capabilities:"beam:coder:interval_window:v1" capabilities:"beam:coder:row:v1" capabilities:"beam:coder:nullable:v1" capabilities:"beam:coder:timer:v1" dependencies:{type_urn:"beam:artifact:type:file:v1" role_urn:"beam:artifact:role:go_worker_binary:v1"}}}} root_transform_ids:"e1" root_transform_ids:"e2" root_transform_ids:"e3" root_transform_ids:"s1" root_transform_ids:"s2" requirements:"beam:requirement:pardo:stateful:v1"
2024/09/27 22:21:21 Prepared job with id: job-001 and staging token: job-001
2024/09/27 22:21:21 Staged binary artifact with token: job-001
2024/09/27 22:21:21 Submitted job: job-001
2024/09/27 22:21:21  (): starting job-001[go-job-1-1727475681963997620]
2024/09/27 22:21:21  (): running job-001[go-job-1-1727475681963997620]
2024/09/27 22:21:21 Job[job-001] state: RUNNING
2024/09/27 22:21:21 starting worker job-001[go-job-1-1727475681963997620]_go
2024/09/27 22:21:21 INFO ProcessElement <0, foo> source=.../poc/donothing.go:45 time=2024-09-27T22:21:21.979Z worker.ID=job-001[go-job-1-1727475681963997620]_go worker.endpoint=localhost:45599
2024/09/27 22:21:21 INFO FinishBundle source=.../poc/donothing.go:104 time=2024-09-27T22:21:21.983Z worker.ID=job-001[go-job-1-1727475681963997620]_go worker.endpoint=localhost:45599
2024/09/27 22:21:21 INFO OnTimer <0, foo>: count 0 source=.../poc/donothing.go:88 time=2024-09-27T22:21:21.985Z worker.ID=job-001[go-job-1-1727475681963997620]_go worker.endpoint=localhost:45599
2024/09/27 22:21:21 INFO FinishBundle source=.../poc/donothing.go:104 time=2024-09-27T22:21:21.985Z worker.ID=job-001[go-job-1-1727475681963997620]_go worker.endpoint=localhost:45599
2024/09/27 22:21:21 INFO OnTimer <0, foo>: count 1 source=.../poc/donothing.go:88 time=2024-09-27T22:21:21.986Z worker.ID=job-001[go-job-1-1727475681963997620]_go worker.endpoint=localhost:45599
2024/09/27 22:21:21 INFO FinishBundle source=.../poc/donothing.go:104 time=2024-09-27T22:21:21.986Z worker.ID=job-001[go-job-1-1727475681963997620]_go worker.endpoint=localhost:45599
2024/09/27 22:21:21 INFO OnTimer <0, foo>: count 2 source=.../poc/donothing.go:88 time=2024-09-27T22:21:21.987Z worker.ID=job-001[go-job-1-1727475681963997620]_go worker.endpoint=localhost:45599
2024/09/27 22:21:21 INFO FinishBundle source=.../poc/donothing.go:104 time=2024-09-27T22:21:21.987Z worker.ID=job-001[go-job-1-1727475681963997620]_go worker.endpoint=localhost:45599
2024/09/27 22:21:21 INFO OnTimer <0, foo>: count 3 source=.../poc/donothing.go:88 time=2024-09-27T22:21:21.988Z worker.ID=job-001[go-job-1-1727475681963997620]_go worker.endpoint=localhost:45599
2024/09/27 22:21:21 INFO FinishBundle source=.../poc/donothing.go:104 time=2024-09-27T22:21:21.988Z worker.ID=job-001[go-job-1-1727475681963997620]_go worker.endpoint=localhost:45599
2024/09/27 22:21:21 INFO OnTimer <0, foo>: count 4 source=.../poc/donothing.go:88 time=2024-09-27T22:21:21.989Z worker.ID=job-001[go-job-1-1727475681963997620]_go worker.endpoint=localhost:45599
2024/09/27 22:21:21 INFO FinishBundle source=.../poc/donothing.go:104 time=2024-09-27T22:21:21.989Z worker.ID=job-001[go-job-1-1727475681963997620]_go worker.endpoint=localhost:45599
2024/09/27 22:21:21 INFO OnTimer <0, foo>: count 5 source=.../poc/donothing.go:88 time=2024-09-27T22:21:21.990Z worker.ID=job-001[go-job-1-1727475681963997620]_go worker.endpoint=localhost:45599
2024/09/27 22:21:21 INFO FinishBundle source=.../poc/donothing.go:104 time=2024-09-27T22:21:21.990Z worker.ID=job-001[go-job-1-1727475681963997620]_go worker.endpoint=localhost:45599
2024/09/27 22:21:21 INFO OnTimer <0, foo>: count 6 source=.../poc/donothing.go:88 time=2024-09-27T22:21:21.991Z worker.ID=job-001[go-job-1-1727475681963997620]_go worker.endpoint=localhost:45599
2024/09/27 22:21:21 INFO FinishBundle source=.../poc/donothing.go:104 time=2024-09-27T22:21:21.991Z worker.ID=job-001[go-job-1-1727475681963997620]_go worker.endpoint=localhost:45599
2024/09/27 22:21:21 INFO OnTimer <0, foo>: count 7 source=.../poc/donothing.go:88 time=2024-09-27T22:21:21.992Z worker.ID=job-001[go-job-1-1727475681963997620]_go worker.endpoint=localhost:45599
2024/09/27 22:21:21 INFO FinishBundle source=.../poc/donothing.go:104 time=2024-09-27T22:21:21.992Z worker.ID=job-001[go-job-1-1727475681963997620]_go worker.endpoint=localhost:45599
2024/09/27 22:21:21 INFO OnTimer <0, foo>: count 8 source=.../poc/donothing.go:88 time=2024-09-27T22:21:21.993Z worker.ID=job-001[go-job-1-1727475681963997620]_go worker.endpoint=localhost:45599
2024/09/27 22:21:21 INFO FinishBundle source=.../poc/donothing.go:104 time=2024-09-27T22:21:21.993Z worker.ID=job-001[go-job-1-1727475681963997620]_go worker.endpoint=localhost:45599
2024/09/27 22:21:21 INFO OnTimer <0, foo>: count 9 source=.../poc/donothing.go:88 time=2024-09-27T22:21:21.994Z worker.ID=job-001[go-job-1-1727475681963997620]_go worker.endpoint=localhost:45599
2024/09/27 22:21:21 INFO FinishBundle source=.../poc/donothing.go:104 time=2024-09-27T22:21:21.994Z worker.ID=job-001[go-job-1-1727475681963997620]_go worker.endpoint=localhost:45599
2024/09/27 22:21:21 INFO OnTimer <0, foo>: count 10 source=.../poc/donothing.go:88 time=2024-09-27T22:21:21.995Z worker.ID=job-001[go-job-1-1727475681963997620]_go worker.endpoint=localhost:45599
2024/09/27 22:21:21 INFO FinishBundle source=.../poc/donothing.go:104 time=2024-09-27T22:21:21.995Z worker.ID=job-001[go-job-1-1727475681963997620]_go worker.endpoint=localhost:45599
2024/09/27 22:21:21 INFO OnTimer <0, foo>: count 11 source=.../poc/donothing.go:88 time=2024-09-27T22:21:21.996Z worker.ID=job-001[go-job-1-1727475681963997620]_go worker.endpoint=localhost:45599
2024/09/27 22:21:21 INFO FinishBundle source=.../poc/donothing.go:104 time=2024-09-27T22:21:21.996Z worker.ID=job-001[go-job-1-1727475681963997620]_go worker.endpoint=localhost:45599
2024/09/27 22:21:21 INFO  (): pipeline completed job-001[go-job-1-1727475681963997620] source=.../go/pkg/mod/github.com/apache/beam/sdks/v2@v2.59.0/go/pkg/beam/runners/universal/runnerlib/execute.go:109 time=2024-09-27T22:21:21.999Z worker.ID=job-001[go-job-1-1727475681963997620]_go worker.endpoint=localhost:45599
2024/09/27 22:21:21 INFO  (): terminating job-001[go-job-1-1727475681963997620] source=.../go/pkg/mod/github.com/apache/beam/sdks/v2@v2.59.0/go/pkg/beam/runners/universal/runnerlib/execute.go:109 time=2024-09-27T22:21:21.999Z worker.ID=job-001[go-job-1-1727475681963997620]_go worker.endpoint=localhost:45599
2024/09/27 22:21:21 INFO Job[job-001] state: DONE source=.../go/pkg/mod/github.com/apache/beam/sdks/v2@v2.59.0/go/pkg/beam/runners/universal/runnerlib/job.go:125 time=2024-09-27T22:21:21.999Z worker.ID=job-001[go-job-1-1727475681963997620]_go worker.endpoint=localhost:45599
2024/09/27 22:21:21 INFO stopping worker job-001[go-job-1-1727475681963997620]_go source=.../go/pkg/mod/github.com/apache/beam/sdks/v2@v2.59.0/go/pkg/beam/runners/universal/extworker/extworker.go:117 time=2024-09-27T22:21:21.999Z worker.ID=job-001[go-job-1-1727475681963997620]_go worker.endpoint=localhost:45599
--- PASS: TestDoNothingTransform (0.04s)
PASS
ok      poc    0.204s

Without passert

Running tool: /usr/lib/google-golang/bin/go test -timeout 30s -run ^TestDoNothingTransform$ .../poc

=== RUN   TestDoNothingTransform
2024/09/27 22:22:36 INFO Serving JobManagement endpoint=localhost:40163
2024/09/27 22:22:36 starting Loopback server at 127.0.0.1:37333
2024/09/27 22:22:36 components:{transforms:{key:"e1" value:{unique_name:"Impulse" spec:{urn:"beam:transform:impulse:v1"} outputs:{key:"i0" value:"n1"}}} transforms:{key:"e2" value:{unique_name:"beam.createFn" spec:{urn:"beam:transform:pardo:v1" payload:"\n\xfa\x01\n\x19beam:go:transform:dofn:v1\x1a\xdc\x01ChliZWFtOmdvOnRyYW5zZm9ybTpkb2ZuOnYxEoUBCmQSOwgYEjcIGkozZ2l0aHViLmNvbS9hcGFjaGUvYmVhbS9zZGtzL3YyL2dvL3BrZy9iZWFtLmNyZWF0ZUZuGiV7InZhbHVlcyI6WyJBMlp2Ync9PSJdLCJ0eXBlIjoiQ0F3PSJ9EgwIARIICgYIFBICCAgaCAoGCgQIGUAPIgVQYXJEbw=="} inputs:{key:"i0" value:"n1"} outputs:{key:"i0" value:"n2"} environment_id:"go"}} transforms:{key:"e3" value:{unique_name:"beam.addFixedKeyFn" spec:{urn:"beam:transform:pardo:v1" payload:"\n\xee\x01\n\x19beam:go:transform:dofn:v1\x1a\xd0\x01ChliZWFtOmdvOnRyYW5zZm9ybTpkb2ZuOnYxEn0KUApOCjhnaXRodWIuY29tL2FwYWNoZS9iZWFtL3Nka3MvdjIvZ28vcGtnL2JlYW0uYWRkRml4ZWRLZXlGbhISCBYiBAgZQA8qAggCKgQIGUAPEgoIARIGCgQIGUAPGhYKFAoECBlACxIECgIIAhIGCgQIGUAPIgVQYXJEbw=="} inputs:{key:"i0" value:"n2"} outputs:{key:"
i0" value:"n3"} environment_id:"go"}} transforms:{key:"e4" value:{unique_name:"DoNothing/poc.doNothingTransformFn" spec:{urn:"beam:transform:pardo:v1" payload:"\n\xc2\x03\n\x19beam:go:transform:dofn:v1\x1a\xa4\x03ChliZWFtOmdvOnRyYW5zZm9ybTpkb2ZuOnYxEpsCCu8BElkIGBJVCBpKUWdpdGh1Yi5jb20vdmVyaWx5LXNyYy92ZXJpbHkxL2luZ2VzdGlvbi9waXBlbGluZS9zcmMvcGtnL3BvYy5kb05vdGhpbmdUcmFuc2Zvcm1GbhqRAXsiVGltZXJDb3VudCI6eyJLZXkiOiJ0aW1lckNvdW50In0sIlRpbWVyVGltZXJzdGFtcCI6eyJLZXkiOiJ0aW1lclRpbWVyc3RhbXAifSwiVmFsdWUiOnsiS2V5IjoidmFsdWUifSwiT3V0cHV0U3RhdGUiOnsiRmFtaWx5IjoicHJvY2Vzc2luZ1RpbWUifX0SGAgBEhQKBAgZQAsSBgoECBlAExIECgIIDBoGCgQKAggMIgVQYXJEbw==\"3\n\x0ftimerTimerstamp\x12 :\x18\n\x16beam:user_state:bag:v1\n\x04\n\x02c6\")\n\x05value\x12 :\x18\n\x16beam:user_state:bag:v1\n\x04\n\x02c2\".\n\ntimerCount\x12 :\x18\n\x16beam:user_state:bag:v1\n\x04\n\x02c6J\x18\n\x0eprocessingTime\x12\x06\x08\x02\x12\x02c7"} inputs:{key:"i0" value:"n3"} outputs:{key:"i0" value:"n4"} environment_id:"go"}} transforms:{key:"s1" value:{unique_name:"DoNo
thing" subtransforms:"e4" inputs:{key:"n3" value:"n3"} outputs:{key:"n4" value:"n4"} environment_id:"go"}} pcollections:{key:"n1" value:{unique_name:"n1" coder_id:"c0" is_bounded:BOUNDED windowing_strategy_id:"w0"}} pcollections:{key:"n2" value:{unique_name:"n2" coder_id:"c2" is_bounded:BOUNDED windowing_strategy_id:"w0"}} pcollections:{key:"n3" value:{unique_name:"n3" coder_id:"c5" is_bounded:BOUNDED windowing_strategy_id:"w0"}} pcollections:{key:"n4" value:{unique_name:"n4" coder_id:"c2" is_bounded:BOUNDED windowing_strategy_id:"w0"}} windowing_strategies:{key:"w0" value:{window_fn:{urn:"beam:window_fn:global_windows:v1"} merge_status:NON_MERGING window_coder_id:"c1" trigger:{default:{}} accumulation_mode:DISCARDING output_time:END_OF_WINDOW closing_behavior:EMIT_IF_NONEMPTY on_time_behavior:FIRE_IF_NONEMPTY environment_id:"go"}} coders:{key:"c0" value:{spec:{urn:"beam:coder:bytes:v1"}}} coders:{key:"c1" value:{spec:{urn:"beam:coder:global_window:v1"}}} coders:{key:"c2" value:{spec:{urn:"beam:coder:string_u
tf8:v1"}}} coders:{key:"c3" value:{spec:{urn:"beam:go:coder:custom:v1" payload:"Cgd2YXJpbnR6EgIIAhpdCklnaXRodWIuY29tL2FwYWNoZS9iZWFtL3Nka3MvdjIvZ28vcGtnL2JlYW0vY29yZS9ydW50aW1lL2NvZGVyeC5lbmNWYXJJbnRaEhAIFiIECBlADyoGCBQSAggIImkKSWdpdGh1Yi5jb20vYXBhY2hlL2JlYW0vc2Rrcy92Mi9nby9wa2cvYmVhbS9jb3JlL3J1bnRpbWUvY29kZXJ4LmRlY1ZhckludFoSHAgWIgQIGUADIgYIFBICCAgqBAgZQA8qBAgZQAE="}}} coders:{key:"c4" value:{spec:{urn:"beam:coder:length_prefix:v1"} component_coder_ids:"c3"}} coders:{key:"c5" value:{spec:{urn:"beam:coder:kv:v1"} component_coder_ids:"c4" component_coder_ids:"c2"}} coders:{key:"c6" value:{spec:{urn:"beam:coder:varint:v1"}}} coders:{key:"c7" value:{spec:{urn:"beam:coder:timer:v1"} component_coder_ids:"c4" component_coder_ids:"c1"}} environments:{key:"go" value:{urn:"beam:env:external:v1" payload:"\n\x11\n\x0flocalhost:37333" capabilities:"beam:protocol:progress_reporting:v1" capabilities:"beam:protocol:multi_core_bundle_processing:v1" capabilities:"beam:transform:sdf_truncate_sized_restrictions:v1" capabilities
:"beam:protocol:worker_status:v1" capabilities:"beam:protocol:monitoring_info_short_ids:v1" capabilities:"beam:version:sdk_base:go:apache/beam_go_sdk:2.59.0" capabilities:"beam:transform:to_string:v1" capabilities:"beam:protocol:data_sampling:v1" capabilities:"beam:protocol:sdk_consuming_received_data:v1" capabilities:"beam:coder:bytes:v1" capabilities:"beam:coder:bool:v1" capabilities:"beam:coder:varint:v1" capabilities:"beam:coder:double:v1" capabilities:"beam:coder:string_utf8:v1" capabilities:"beam:coder:length_prefix:v1" capabilities:"beam:coder:kv:v1" capabilities:"beam:coder:iterable:v1" capabilities:"beam:coder:state_backed_iterable:v1" capabilities:"beam:coder:windowed_value:v1" capabilities:"beam:coder:global_window:v1" capabilities:"beam:coder:interval_window:v1" capabilities:"beam:coder:row:v1" capabilities:"beam:coder:nullable:v1" capabilities:"beam:coder:timer:v1" dependencies:{type_urn:"beam:artifact:type:file:v1" role_urn:"beam:artifact:role:go_worker_binary:v1"}}}} root_transform_ids:"e1" roo
t_transform_ids:"e2" root_transform_ids:"e3" root_transform_ids:"s1" requirements:"beam:requirement:pardo:stateful:v1"
2024/09/27 22:22:36 Prepared job with id: job-001 and staging token: job-001
2024/09/27 22:22:36 Staged binary artifact with token: job-001
2024/09/27 22:22:36 Submitted job: job-001
2024/09/27 22:22:36  (): starting job-001[go-job-1-1727475756568453106]
2024/09/27 22:22:36 Job[job-001] state: STARTING
2024/09/27 22:22:36  (): running job-001[go-job-1-1727475756568453106]
2024/09/27 22:22:36 Job[job-001] state: RUNNING
2024/09/27 22:22:36 starting worker job-001[go-job-1-1727475756568453106]_go
2024/09/27 22:22:36 INFO ProcessElement <0, foo> source=.../poc/donothing.go:45 time=2024-09-27T22:22:36.580Z worker.ID=job-001[go-job-1-1727475756568453106]_go worker.endpoint=localhost:45899
2024/09/27 22:22:36 INFO FinishBundle source=.../poc/donothing.go:104 time=2024-09-27T22:22:36.584Z worker.ID=job-001[go-job-1-1727475756568453106]_go worker.endpoint=localhost:45899
2024/09/27 22:22:36 INFO OnTimer <0, foo>: count 0 source=.../poc/donothing.go:88 time=2024-09-27T22:22:36.585Z worker.ID=job-001[go-job-1-1727475756568453106]_go worker.endpoint=localhost:45899
2024/09/27 22:22:36 INFO FinishBundle source=.../poc/donothing.go:104 time=2024-09-27T22:22:36.586Z worker.ID=job-001[go-job-1-1727475756568453106]_go worker.endpoint=localhost:45899
2024/09/27 22:22:36 INFO OnTimer <0, foo>: count 1 source=.../poc/donothing.go:88 time=2024-09-27T22:22:36.586Z worker.ID=job-001[go-job-1-1727475756568453106]_go worker.endpoint=localhost:45899
2024/09/27 22:22:36 INFO FinishBundle source=.../poc/donothing.go:104 time=2024-09-27T22:22:36.587Z worker.ID=job-001[go-job-1-1727475756568453106]_go worker.endpoint=localhost:45899
2024/09/27 22:22:36 INFO data.Recv timers for unknown bundle response="timers:{instruction_id:\"inst004\" transform_id:\"e4\" timer_family_id:\"processingTime\" is_last:true}"
2024/09/27 22:22:36 stopping worker job-001[go-job-1-1727475756568453106]_go
2024/09/27 22:22:36 INFO  (): pipeline completed job-001[go-job-1-1727475756568453106] source=.../go/pkg/mod/github.com/apache/beam/sdks/v2@v2.59.0/go/pkg/beam/runners/universal/runnerlib/execute.go:109 time=2024-09-27T22:22:36.587Z worker.ID=job-001[go-job-1-1727475756568453106]_go worker.endpoint=localhost:45899
2024/09/27 22:22:36 INFO  (): terminating job-001[go-job-1-1727475756568453106] source=.../go/pkg/mod/github.com/apache/beam/sdks/v2@v2.59.0/go/pkg/beam/runners/universal/runnerlib/execute.go:109 time=2024-09-27T22:22:36.587Z worker.ID=job-001[go-job-1-1727475756568453106]_go worker.endpoint=localhost:45899
2024/09/27 22:22:36 INFO Job[job-001] state: DONE source=.../go/pkg/mod/github.com/apache/beam/sdks/v2@v2.59.0/go/pkg/beam/runners/universal/runnerlib/job.go:125 time=2024-09-27T22:22:36.587Z worker.ID=job-001[go-job-1-1727475756568453106]_go worker.endpoint=localhost:45899
--- PASS: TestDoNothingTransform (0.02s)
PASS
ok      .../poc    0.150s

@lostluck
Copy link
Contributor

If I understand correctly the issue is:

  1. There is a processing time timer in a window that has fired, and the state has been cleaned out.
  • It sounds like the timer is firing after the window has been garbage collected.

That does sound like an issue with Dataflow.

As for Prism, this does seem like an edge case even with the Real Time execution or not. But it's not related to the windowing / or not windowing technically. I'd recommend filling it as a separate issue, even though it uses the same-ish, pipeline.

Some notes:

  1. Setting OutputTimestamp prevents the downstream watermark from advancing, not the upstream/input watermark.
  2. Windows are a notion around the Aggregation/GroupByKey implementation, which does include Side Inputs which is what's happening with the . I don't know exactly what is needed with Windows and Stateful DoFns outside of State and Timers are bound to a window and are Garbage collected accordingly.

Technically this seems to be around looping timers, along with ProcessingTime weirdness.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants