From 9fd2cf84c5fd27769c2b72ad0caf62ab378a8e53 Mon Sep 17 00:00:00 2001 From: Rachel Chen Date: Wed, 21 Jul 2021 00:22:08 -0700 Subject: [PATCH] multiplex: add (*Multiplex).CloseChan This commit exposes the underlying mp.closed channel to the caller, and allows the caller to receive an event on session closed. --- multiplex.go | 5 +++++ multiplex_test.go | 36 ++++++++++++++++++++++++++++++++++++ 2 files changed, 41 insertions(+) diff --git a/multiplex.go b/multiplex.go index 0caf106..ecc53da 100644 --- a/multiplex.go +++ b/multiplex.go @@ -165,6 +165,11 @@ func (mp *Multiplex) IsClosed() bool { } } +// CloseChan returns a read-only channel which will be closed when the session is closed +func (mp *Multiplex) CloseChan() <-chan struct{} { + return mp.closed +} + func (mp *Multiplex) sendMsg(timeout, cancel <-chan struct{}, header uint64, data []byte) error { buf := pool.Get(len(data) + 20) diff --git a/multiplex_test.go b/multiplex_test.go index fd218af..1f27f24 100644 --- a/multiplex_test.go +++ b/multiplex_test.go @@ -394,6 +394,42 @@ func TestClosing(t *testing.T) { } } +func TestCloseChan(t *testing.T) { + a, b := net.Pipe() + + mpa := NewMultiplex(a, false) + mpb := NewMultiplex(b, true) + + ctx, cancel := context.WithTimeout(context.Background(), time.Second*1) + defer cancel() + + _, err := mpb.NewStream(ctx) + if err != nil { + t.Fatal(err) + } + + _, err = mpa.Accept() + if err != nil { + t.Fatal(err) + } + + go func() { + mpa.Close() + }() + + select { + case <-ctx.Done(): + t.Fatal("did not receive from CloseChan for mpa within timeout") + case <-mpa.CloseChan(): + } + + select { + case <-ctx.Done(): + t.Fatal("did not receive from CloseChan for mpb within timeout") + case <-mpb.CloseChan(): + } +} + func TestReset(t *testing.T) { a, b := net.Pipe()