Skip to content

Commit

Permalink
feat: add safe_channel pkg
Browse files Browse the repository at this point in the history
  • Loading branch information
tim-ywliu committed Oct 17, 2024
1 parent 2602d00 commit da29aef
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 0 deletions.
38 changes: 38 additions & 0 deletions safe_channel/safe_channel.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package safe_channel

import "sync"

type SafeCh[T any] struct {
mu sync.Mutex
closed bool
ch chan T
}

func NewSafeCh[T any](size int) *SafeCh[T] {
return &SafeCh[T]{
ch: make(chan T, size),
}
}

func (c *SafeCh[T]) Send(e T) {
c.mu.Lock()
defer c.mu.Unlock()

if !c.closed {
c.ch <- e
}
}

func (c *SafeCh[T]) GetRcvChan() <-chan T {
return c.ch
}

func (c *SafeCh[T]) Close() {
c.mu.Lock()
defer c.mu.Unlock()

if !c.closed {
close(c.ch)
c.closed = true
}
}
42 changes: 42 additions & 0 deletions safe_channel/safe_channel_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package safe_channel

import (
"sync"
"testing"
"time"

"github.com/stretchr/testify/require"
)

// Simulate 1 receiver and N(N=2) senders situation
func TestSafeChannel(t *testing.T) {
sCh := NewSafeCh[int](1)
wg := sync.WaitGroup{}

// Two senders
wg.Add(2)
for i := 0; i < 2; i++ {
go func(i int) {
if i == 0 {
// Case: send after sCh closed
time.Sleep(1 * time.Second)

require.Equal(t, sCh.closed, true)
sCh.Send(1) // No panic
require.Equal(t, sCh.closed, true)
} else {
// Case: send success
sCh.Send(1)
require.Equal(t, sCh.closed, false)
}
wg.Done()
}(i)
}

// One receiver
<-sCh.GetRcvChan()
sCh.Close()
require.Equal(t, sCh.closed, true)

wg.Wait()
}

0 comments on commit da29aef

Please sign in to comment.