From 23b7743a8583ab16f347292f0ddea721291aac31 Mon Sep 17 00:00:00 2001 From: Daniel Liu Date: Tue, 27 Aug 2024 06:59:54 +0800 Subject: [PATCH] event: include Feed type fixation logic in f.init (#27249) --- event/feed.go | 34 ++++++++++++---------------------- 1 file changed, 12 insertions(+), 22 deletions(-) diff --git a/event/feed.go b/event/feed.go index 33dafe58867e..d94bd820f0c5 100644 --- a/event/feed.go +++ b/event/feed.go @@ -57,7 +57,8 @@ func (e feedTypeError) Error() string { return "event: wrong type in " + e.op + " got " + e.got.String() + ", want " + e.want.String() } -func (f *Feed) init() { +func (f *Feed) init(etype reflect.Type) { + f.etype = etype f.removeSub = make(chan interface{}) f.sendLock = make(chan struct{}, 1) f.sendLock <- struct{}{} @@ -70,8 +71,6 @@ func (f *Feed) init() { // The channel should have ample buffer space to avoid blocking other subscribers. // Slow subscribers are not dropped. func (f *Feed) Subscribe(channel interface{}) Subscription { - f.once.Do(f.init) - chanval := reflect.ValueOf(channel) chantyp := chanval.Type() if chantyp.Kind() != reflect.Chan || chantyp.ChanDir()&reflect.SendDir == 0 { @@ -79,11 +78,13 @@ func (f *Feed) Subscribe(channel interface{}) Subscription { } sub := &feedSub{feed: f, channel: chanval, err: make(chan error, 1)} - f.mu.Lock() - defer f.mu.Unlock() - if !f.typecheck(chantyp.Elem()) { + f.once.Do(func() { f.init(chantyp.Elem()) }) + if f.etype != chantyp.Elem() { panic(feedTypeError{op: "Subscribe", got: chantyp, want: reflect.ChanOf(reflect.SendDir, f.etype)}) } + + f.mu.Lock() + defer f.mu.Unlock() // Add the select case to the inbox. // The next Send will add it to f.sendCases. cas := reflect.SelectCase{Dir: reflect.SelectSend, Chan: chanval} @@ -91,15 +92,6 @@ func (f *Feed) Subscribe(channel interface{}) Subscription { return sub } -// note: callers must hold f.mu -func (f *Feed) typecheck(typ reflect.Type) bool { - if f.etype == nil { - f.etype = typ - return true - } - return f.etype == typ -} - func (f *Feed) remove(sub *feedSub) { // Delete from inbox first, which covers channels // that have not been added to f.sendCases yet. @@ -128,19 +120,17 @@ func (f *Feed) remove(sub *feedSub) { func (f *Feed) Send(value interface{}) (nsent int) { rvalue := reflect.ValueOf(value) - f.once.Do(f.init) + f.once.Do(func() { f.init(rvalue.Type()) }) + if f.etype != rvalue.Type() { + panic(feedTypeError{op: "Send", got: rvalue.Type(), want: f.etype}) + } + <-f.sendLock // Add new cases from the inbox after taking the send lock. f.mu.Lock() f.sendCases = append(f.sendCases, f.inbox...) f.inbox = nil - - if !f.typecheck(rvalue.Type()) { - f.sendLock <- struct{}{} - f.mu.Unlock() - panic(feedTypeError{op: "Send", got: rvalue.Type(), want: f.etype}) - } f.mu.Unlock() // Set the sent value on all channels.