Skip to content

Commit

Permalink
streamed readings (#433)
Browse files Browse the repository at this point in the history
* poc for device read streaming

* bump
  • Loading branch information
edaniszewski committed Mar 30, 2020
1 parent 6e47b77 commit 20fa007
Show file tree
Hide file tree
Showing 8 changed files with 487 additions and 17 deletions.
22 changes: 11 additions & 11 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Gopkg.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@

[[constraint]]
name = "github.com/vapor-ware/synse-server-grpc"
version = "3.0.0-alpha.1"
branch = "v3/staging"

[[constraint]]
branch = "master"
Expand Down
33 changes: 33 additions & 0 deletions internal/test/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,39 @@ func (mock *MockReadCachedStreamErr) Send(reading *synse.V3Reading) error {
return fmt.Errorf("grpc error")
}

//
// READ STREAM
//

// MockReadStreamStream mocks the stream for the ReadCached request, with no error.
type MockReadStreamStream struct {
grpc.ServerStream
Results []*synse.V3Reading
}

// NewMockReadStreamStream creates a new mock read cache stream.
func NewMockReadStreamStream() *MockReadStreamStream {
return &MockReadStreamStream{
Results: []*synse.V3Reading{},
}
}

// Send fulfils the stream interface for the mock grpc stream.
func (mock *MockReadStreamStream) Send(reading *synse.V3Reading) error {
mock.Results = append(mock.Results, reading)
return nil
}

// MockReadStreamStreamErr mocks the stream for a ReadCached request, with error.
type MockReadStreamStreamErr struct {
grpc.ServerStream
}

// Send fulfils the stream interface for the mock grpc stream.
func (mock *MockReadStreamStreamErr) Send(reading *synse.V3Reading) error {
return fmt.Errorf("grpc error")
}

//
// WRITE ASYNC
//
Expand Down
59 changes: 59 additions & 0 deletions sdk/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -485,6 +485,65 @@ func (server *server) ReadCache(request *synse.V3Bounds, stream synse.V3Plugin_R
return nil
}

// ReadStream streams readings to the caller as they are read from the plugin.
func (server *server) ReadStream(request *synse.V3StreamRequest, stream synse.V3Plugin_ReadStreamServer) error {
log.WithFields(log.Fields{
"selectors": request.Selectors,
"route": "READSTREAM",
}).Info("[grpc] processing request")

// Get all devices which will match the stream selector.
devices := map[string]*Device{}
for _, s := range request.Selectors {
devs, err := server.deviceManager.GetDevices(s)
if err != nil {
return err
}
for _, d := range devs {
if _, ok := devices[d.id]; !ok {
devices[d.id] = d
}
}
}

// If any selectors were specified, but no devices were found matching those
// selectors, return an error. This prevents the invalid selector(s) from defaulting
// to return readings for all configured devices.
if len(devices) == 0 && len(request.Selectors) != 0 {
return errors.New("specified selector does not match any known devices")
}

filter := make([]string, len(devices))
for id := range devices {
filter = append(filter, id)
}

s := newReadStream(filter)
server.stateManager.addStream(s)
defer func() {
server.stateManager.removeStream(s.id)
s.close()
}()
go s.listen()

log.Info("[server] streaming readings from device manager")
for r := range s.readings {
device := server.deviceManager.GetDevice(r.Device)
for _, data := range r.Reading {
reading := data.Encode()
if device != nil {
reading.Id = device.id
reading.DeviceType = device.Type
}
if err := stream.Send(reading); err != nil {
return err
}
}
}
log.Info("[server] done streaming readings")
return nil
}

// WriteAsync writes data to the specified plugin device. A transaction ID is returned
// so the status of the write can be checked asynchronously.
//
Expand Down
85 changes: 80 additions & 5 deletions sdk/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -916,6 +916,86 @@ func TestServer_ReadCache_error(t *testing.T) {
assert.Error(t, err)
}

func TestServer_ReadStream_noDeviceMatchID(t *testing.T) {
o := output.Output{
Name: "test",
Type: "foo",
}

s := server{
stateManager: &stateManager{
readingsLock: &sync.RWMutex{},
config: &config.PluginSettings{
Cache: &config.CacheSettings{
Enabled: false,
},
},
readings: map[string][]*output.Reading{
"12345": {o.MakeReading(1)},
"67890": {o.MakeReading(2)},
"abcde": {o.MakeReading(3)},
},
},
deviceManager: &deviceManager{
devices: map[string]*Device{},
aliasCache: NewAliasCache(),
},
}

req := &synse.V3StreamRequest{
Selectors: []*synse.V3DeviceSelector{
{Id: "998877"},
},
}
mock := test.NewMockReadStreamStream()
err := s.ReadStream(req, mock)

assert.Error(t, err)
assert.Equal(t, 0, len(mock.Results))
}

func TestServer_ReadStream_noDeviceMatchTag(t *testing.T) {
o := output.Output{
Name: "test",
Type: "foo",
}

s := server{
stateManager: &stateManager{
readingsLock: &sync.RWMutex{},
config: &config.PluginSettings{
Cache: &config.CacheSettings{
Enabled: false,
},
},
readings: map[string][]*output.Reading{
"12345": {o.MakeReading(1)},
"67890": {o.MakeReading(2)},
"abcde": {o.MakeReading(3)},
},
},
deviceManager: &deviceManager{
devices: map[string]*Device{},
aliasCache: NewAliasCache(),
tagCache: NewTagCache(),
},
}

req := &synse.V3StreamRequest{
Selectors: []*synse.V3DeviceSelector{
{Tags: []*synse.V3Tag{{
Namespace: "nonexistent",
Label: "tag",
}}},
},
}
mock := test.NewMockReadStreamStream()
err := s.ReadStream(req, mock)

assert.Error(t, err)
assert.Equal(t, 0, len(mock.Results))
}

func TestServer_WriteAsync(t *testing.T) {
handler := DeviceHandler{
Write: func(device *Device, data *WriteData) error {
Expand All @@ -924,11 +1004,6 @@ func TestServer_WriteAsync(t *testing.T) {
}
s := server{
deviceManager: &deviceManager{
//tagCache: &TagCache{
// cache: map[string]map[string]map[string][]*Device{
// "system": {"id": {"1234": {{id: "1234", handler: &handler}}}},
// },
//},
devices: map[string]*Device{
"1234": {id: "1234", handler: &handler},
},
Expand Down
36 changes: 36 additions & 0 deletions sdk/state_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"sync"
"time"

"github.com/google/uuid"
"github.com/patrickmn/go-cache"
log "github.com/sirupsen/logrus"
"github.com/vapor-ware/synse-sdk/sdk/config"
Expand All @@ -37,6 +38,9 @@ type stateManager struct {
readingsCache *cache.Cache
readingsLock *sync.RWMutex
transactions *cache.Cache

streams map[uuid.UUID]ReadStream
streamLock *sync.Mutex
}

// newStateManager creates a new instance of the stateManager.
Expand All @@ -61,6 +65,8 @@ func newStateManager(conf *config.PluginSettings) *stateManager {
),
readingsCache: readingsCache,
readingsLock: &sync.RWMutex{},
streams: make(map[uuid.UUID]ReadStream),
streamLock: &sync.Mutex{},
}
}

Expand All @@ -70,6 +76,22 @@ func (manager *stateManager) Start() {
go manager.updateReadings()
}

// addStream adds a new stream for the stateManager to send reading data to.
func (manager *stateManager) addStream(stream ReadStream) {
manager.streamLock.Lock()
defer manager.streamLock.Unlock()

manager.streams[stream.id] = stream
}

// removeStream removes a stream which the stateManager was sending data to.
func (manager *stateManager) removeStream(id uuid.UUID) {
manager.streamLock.Lock()
defer manager.streamLock.Unlock()

delete(manager.streams, id)
}

// registerActions registers pre-run (setup) and post-run (teardown) actions
// for the state manager.
func (manager *stateManager) registerActions(plugin *Plugin) {
Expand Down Expand Up @@ -113,11 +135,25 @@ func (manager *stateManager) updateReadings() {
manager.readings[id] = readings
manager.readingsLock.Unlock()

// Dispatch the reading to all connected streams.
manager.dispatchToStreams(reading)

// Update the local readings cache, if enabled.
manager.addReadingToCache(reading)
}
}

// dispatchToStreams dispatches the given reading to all streams currently
// connected to the state manager.
func (manager *stateManager) dispatchToStreams(reading *ReadContext) {
manager.streamLock.Lock()
defer manager.streamLock.Unlock()

for _, stream := range manager.streams {
stream.stream <- reading
}
}

// addReadingToCache adds the given reading to the readingsCache, if the plugin
// is configured to enable read caching.
func (manager *stateManager) addReadingToCache(ctx *ReadContext) {
Expand Down
Loading

0 comments on commit 20fa007

Please sign in to comment.