Skip to content

Commit

Permalink
feat: transport layer performance optimization
Browse files Browse the repository at this point in the history
Signed-off-by: jyjiangkai <jyjiangkai@163.com>
  • Loading branch information
hwjiangkai committed Dec 17, 2022
1 parent 6e9856b commit 72be0cc
Show file tree
Hide file tree
Showing 13 changed files with 1,855 additions and 347 deletions.
295 changes: 211 additions & 84 deletions client/internal/vanus/store/block_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,9 @@ import (
"github.com/linkall-labs/vanus/client/internal/vanus/codec"
"github.com/linkall-labs/vanus/client/internal/vanus/net/rpc"
"github.com/linkall-labs/vanus/client/internal/vanus/net/rpc/bare"
"github.com/linkall-labs/vanus/client/pkg/api"
"github.com/linkall-labs/vanus/client/pkg/primitive"
"github.com/linkall-labs/vanus/observability/log"
)

const (
defaultTaskChannelBuffer = 512
"github.com/linkall-labs/vanus/pkg/errors"
)

func newBlockStore(endpoint string) (*BlockStore, error) {
Expand All @@ -56,120 +52,124 @@ func newBlockStore(endpoint string) (*BlockStore, error) {
})),
tracer: tracing.NewTracer("internal.store.BlockStore", trace.SpanKindClient),
}
s.stream, err = s.connect(context.Background())
s.appendStream, err = s.connectAppendStream(context.Background())
if err != nil {
// TODO: check error
return nil, err
}
s.taskC = make(chan Task, defaultTaskChannelBuffer)
s.run(context.Background())
s.receive(context.Background(), s.stream)
s.readStream, err = s.connectReadStream(context.Background())
if err != nil {
// TODO: check error
return nil, err
}
s.runAppendStreamRecv(context.Background(), s.appendStream)
s.runReadStreamRecv(context.Background(), s.readStream)
return s, nil
}

type BlockStore struct {
primitive.RefCount
client rpc.Client
tracer *tracing.Tracer
stream segpb.SegmentServer_AppendToBlockStreamClient
taskC chan Task
callbacks sync.Map
mu sync.Mutex
client rpc.Client
tracer *tracing.Tracer
appendStream segpb.SegmentServer_AppendToBlockStreamClient
readStream segpb.SegmentServer_ReadFromBlockStreamClient
appendCallbacks sync.Map
readCallbacks sync.Map
appendMu sync.Mutex
readMu sync.Mutex
}

type Task struct {
request *segpb.AppendToBlockStreamRequest
cb api.Callback
}
type appendCallback func(*segpb.AppendToBlockStreamResponse)
type readCallback func(*segpb.ReadFromBlockStreamResponse)

func (s *BlockStore) run(ctx context.Context) {
func (s *BlockStore) runAppendStreamRecv(ctx context.Context, stream segpb.SegmentServer_AppendToBlockStreamClient) {
go func() {
for {
var err error
select {
case <-ctx.Done():
s.stream.CloseSend()
s.stream = nil
return
case task := <-s.taskC:
stream := s.stream
if stream == nil {
if stream, err = s.connect(ctx); err != nil {
task.cb(err)
break
}
s.receive(ctx, stream)
s.stream = stream
}
if err = stream.Send(task.request); err != nil {
log.Warning(ctx, "===Send failed===", map[string]interface{}{
log.KeyError: err,
})
s.processSendError(task, err)
}
res, err := stream.Recv()
if err != nil {
log.Error(ctx, "append stream recv failed", map[string]interface{}{
log.KeyError: err,
})
break
}
c, _ := s.appendCallbacks.LoadAndDelete(res.ResponseId)
if c != nil {
c.(appendCallback)(res)
}
}
}()
}

func (s *BlockStore) receive(ctx context.Context, stream segpb.SegmentServer_AppendToBlockStreamClient) {
func (s *BlockStore) runReadStreamRecv(ctx context.Context, stream segpb.SegmentServer_ReadFromBlockStreamClient) {
go func() {
for {
res, err := stream.Recv()
if err != nil {
log.Warning(ctx, "===Recv failed===", map[string]interface{}{
log.Error(ctx, "read stream recv failed", map[string]interface{}{
log.KeyError: err,
})
break
}
c, _ := s.callbacks.LoadAndDelete(res.ResponseId)
if c == nil {
// TODO(jiangkai): check err
continue
}
if res.ResponseCode != segpb.ResponseCode_SUCCESS {
c.(api.Callback)(stderr.New(res.ResponseCode.String()))
c, _ := s.readCallbacks.LoadAndDelete(res.ResponseId)
if c != nil {
c.(readCallback)(res)
}
}
}()
}

func (s *BlockStore) connect(ctx context.Context) (segpb.SegmentServer_AppendToBlockStreamClient, error) {
if s.stream != nil {
return s.stream, nil
func (s *BlockStore) connectAppendStream(ctx context.Context) (segpb.SegmentServer_AppendToBlockStreamClient, error) {
if s.appendStream != nil {
return s.appendStream, nil
}

s.mu.Lock()
defer s.mu.Unlock()
s.appendMu.Lock()
defer s.appendMu.Unlock()

if s.stream != nil { //double check
return s.stream, nil
if s.appendStream != nil { //double check
return s.appendStream, nil
}

client, err := s.client.Get(ctx)
if err != nil {
return nil, err
}

stream, err := client.(segpb.SegmentServerClient).AppendToBlockStream(context.Background())
stream, err := client.(segpb.SegmentServerClient).AppendToBlockStream(ctx)
if err != nil {
log.Warning(ctx, "===Get Stream failed===", map[string]interface{}{
log.Warning(ctx, "get append stream failed", map[string]interface{}{
log.KeyError: err,
})
return nil, err
}
return stream, nil
}

func (s *BlockStore) processSendError(t Task, err error) {
cb, _ := s.callbacks.LoadAndDelete(t.request.RequestId)
if cb != nil {
cb.(api.Callback)(err)
func (s *BlockStore) connectReadStream(ctx context.Context) (segpb.SegmentServer_ReadFromBlockStreamClient, error) {
if s.readStream != nil {
return s.readStream, nil
}
if stderr.Is(err, io.EOF) {
s.stream.CloseSend()
s.stream = nil

s.readMu.Lock()
defer s.readMu.Unlock()

if s.readStream != nil { //double check
return s.readStream, nil
}

client, err := s.client.Get(ctx)
if err != nil {
return nil, err
}

stream, err := client.(segpb.SegmentServerClient).ReadFromBlockStream(ctx)
if err != nil {
log.Warning(ctx, "get read stream failed", map[string]interface{}{
log.KeyError: err,
})
return nil, err
}
return stream, nil
}

func (s *BlockStore) Endpoint() string {
Expand Down Expand Up @@ -207,34 +207,79 @@ func (s *BlockStore) Append(ctx context.Context, block uint64, event *ce.Event)
return res.GetOffsets()[0], nil
}

func (s *BlockStore) AppendStream(ctx context.Context, block uint64, event *ce.Event, cb api.Callback) {
_, span := s.tracer.Start(ctx, "AppendStream")
func (s *BlockStore) SyncAppendStream(ctx context.Context, block uint64, event *ce.Event) (int64, error) {
_ctx, span := s.tracer.Start(ctx, "SyncAppendStream")
defer span.End()

var (
err error
err error
wg sync.WaitGroup
resp *segpb.AppendToBlockStreamResponse
)

eventpb, err := codec.ToProto(event)
if err != nil {
cb(err)
return
if s.appendStream == nil {
s.appendStream, err = s.connectAppendStream(_ctx)
if err != nil {
return -1, err
}
s.runAppendStreamRecv(_ctx, s.appendStream)
}

// generate unique RequestId
requestID := rand.Uint64()
s.callbacks.Store(requestID, cb)
task := Task{
request: &segpb.AppendToBlockStreamRequest{
RequestId: requestID,
BlockId: block,
Events: &cepb.CloudEventBatch{
Events: []*cepb.CloudEvent{eventpb},
},

wg.Add(1)

eventpb, err := codec.ToProto(event)
if err != nil {
return -1, err
}

s.appendCallbacks.Store(requestID, appendCallback(func(res *segpb.AppendToBlockStreamResponse) {
resp = res
wg.Done()
}))

req := &segpb.AppendToBlockStreamRequest{
RequestId: requestID,
BlockId: block,
Events: &cepb.CloudEventBatch{
Events: []*cepb.CloudEvent{eventpb},
},
cb: cb,
}
s.taskC <- task

if err = s.appendStream.Send(req); err != nil {
log.Error(ctx, "append stream send failed", map[string]interface{}{
log.KeyError: err,
})
if stderr.Is(err, io.EOF) {
s.appendStream.CloseSend()
s.appendStream = nil
c, _ := s.appendCallbacks.LoadAndDelete(requestID)
if c != nil {
c.(appendCallback)(&segpb.AppendToBlockStreamResponse{
ResponseId: requestID,
ResponseCode: segpb.ResponseCode_UNKNOWN,
Offsets: []int64{},
})
}
}
return -1, err
}

wg.Wait()

if resp.ResponseCode == segpb.ResponseCode_SegmentFull {
log.Warning(ctx, "block append failed cause the segment is full", nil)
return -1, errors.ErrSegmentFull
}

if resp.ResponseCode != segpb.ResponseCode_SUCCESS {
log.Warning(ctx, "block append failed cause unknown error", nil)
return -1, errors.ErrUnknown
}

return resp.Offsets[0], nil
}

func (s *BlockStore) Read(
Expand Down Expand Up @@ -278,6 +323,88 @@ func (s *BlockStore) Read(
return []*ce.Event{}, err
}

func (s *BlockStore) SyncReadStream(
ctx context.Context, block uint64, offset int64, size int16, pollingTimeout uint32,
) ([]*ce.Event, error) {
_ctx, span := s.tracer.Start(ctx, "SyncReadStream")
defer span.End()

var (
err error
wg sync.WaitGroup
resp *segpb.ReadFromBlockStreamResponse
)

if s.readStream == nil {
s.readStream, err = s.connectReadStream(_ctx)
if err != nil {
return []*ce.Event{}, err
}
s.runReadStreamRecv(_ctx, s.readStream)
}

// generate unique RequestId
requestID := rand.Uint64()

wg.Add(1)

s.appendCallbacks.Store(requestID, readCallback(func(res *segpb.ReadFromBlockStreamResponse) {
resp = res
wg.Done()
}))

req := &segpb.ReadFromBlockStreamRequest{
BlockId: block,
Offset: offset,
Number: int64(size),
PollingTimeout: pollingTimeout,
}

if err = s.readStream.Send(req); err != nil {
log.Error(ctx, "read stream send failed", map[string]interface{}{
log.KeyError: err,
})
if stderr.Is(err, io.EOF) {
s.readStream.CloseSend()
s.readStream = nil
c, _ := s.readCallbacks.LoadAndDelete(requestID)
if c != nil {
c.(readCallback)(&segpb.ReadFromBlockStreamResponse{
ResponseId: requestID,
ResponseCode: segpb.ResponseCode_UNKNOWN,
Events: &cepb.CloudEventBatch{
Events: []*cepb.CloudEvent{},
},
})
}
}
return []*ce.Event{}, err
}

wg.Wait()

if resp.ResponseCode != segpb.ResponseCode_SUCCESS {
log.Warning(ctx, "block append failed cause unknown error", nil)
return []*ce.Event{}, errors.ErrUnknown
}

if batch := resp.GetEvents(); batch != nil {
if eventpbs := batch.GetEvents(); len(eventpbs) > 0 {
events := make([]*ce.Event, 0, len(eventpbs))
for _, eventpb := range eventpbs {
event, err2 := codec.FromProto(eventpb)
if err2 != nil {
// TODO: return events or error?
return events, err2
}
events = append(events, event)
}
return events, nil
}
}
return []*ce.Event{}, errors.ErrUnknown
}

func (s *BlockStore) LookupOffset(ctx context.Context, blockID uint64, t time.Time) (int64, error) {
ctx, span := s.tracer.Start(ctx, "LookupOffset")
defer span.End()
Expand Down
Loading

0 comments on commit 72be0cc

Please sign in to comment.