Skip to content

Commit

Permalink
rename pubsub package to subscriptions
Browse files Browse the repository at this point in the history
  • Loading branch information
Maciej Winnicki committed Aug 14, 2017
1 parent 748481e commit 7c225f0
Show file tree
Hide file tree
Showing 13 changed files with 64 additions and 224 deletions.
8 changes: 4 additions & 4 deletions api/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
"github.com/serverless/event-gateway/internal/httpapi"
"github.com/serverless/event-gateway/internal/kv"
"github.com/serverless/event-gateway/internal/metrics"
"github.com/serverless/event-gateway/pubsub"
"github.com/serverless/event-gateway/subscriptions"
)

// StartConfigAPI creates a new configuration API server and listens for requests.
Expand All @@ -28,15 +28,15 @@ func StartConfigAPI(config httpapi.Config) httpapi.Server {
functionsAPI := &functions.HTTPAPI{Functions: functionService}
functionsAPI.RegisterRoutes(router)

pubsubService := &pubsub.PubSub{
subscriptionsService := &subscriptions.Subscriptions{
TopicsDB: kv.NewPrefixedStore("/serverless-event-gateway/topics", config.KV),
SubscriptionsDB: kv.NewPrefixedStore("/serverless-event-gateway/subscriptions", config.KV),
EndpointsDB: kv.NewPrefixedStore("/serverless-event-gateway/endpoints", config.KV),
FunctionsDB: functionsDB,
Log: config.Log,
}
pubsubAPI := &pubsub.HTTPAPI{PubSub: pubsubService}
pubsubAPI.RegisterRoutes(router)
subscriptionsAPI := &subscriptions.HTTPAPI{Subscriptions: subscriptionsService}
subscriptionsAPI.RegisterRoutes(router)

router.GET("/v1/status", func(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {})
router.Handler("GET", "/metrics", prometheus.Handler())
Expand Down
20 changes: 10 additions & 10 deletions internal/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"go.uber.org/zap"

"github.com/serverless/event-gateway/functions"
"github.com/serverless/event-gateway/pubsub"
"github.com/serverless/event-gateway/subscriptions"
)

// cacher is a simplification of the db.Reactive interface, which doesn't care about
Expand Down Expand Up @@ -78,52 +78,52 @@ func (c *functionCache) Del(k string, v []byte) {
type endpointCache struct {
sync.RWMutex
// cache maps from EndpointID to Endpoint
cache map[pubsub.EndpointID]*pubsub.Endpoint
cache map[subscriptions.EndpointID]*subscriptions.Endpoint
log *zap.Logger
}

func newEndpointCache(log *zap.Logger) *endpointCache {
return &endpointCache{
cache: map[pubsub.EndpointID]*pubsub.Endpoint{},
cache: map[subscriptions.EndpointID]*subscriptions.Endpoint{},
log: log,
}
}

func (c *endpointCache) Set(k string, v []byte) {
e := &pubsub.Endpoint{}
e := &subscriptions.Endpoint{}
err := json.NewDecoder(bytes.NewReader(v)).Decode(e)
if err != nil {
c.log.Error("Could not deserialize Endpoint state!", zap.Error(err), zap.String("key", k), zap.String("value", string(v)))
} else {
c.Lock()
defer c.Unlock()
c.cache[pubsub.EndpointID(k)] = e
c.cache[subscriptions.EndpointID(k)] = e
}
}

func (c *endpointCache) Del(k string, v []byte) {
c.Lock()
defer c.Unlock()
delete(c.cache, pubsub.EndpointID(k))
delete(c.cache, subscriptions.EndpointID(k))
}

type subscriptionCache struct {
sync.RWMutex
// topicToSub maps from a TopicID to a set of subscribing FunctionID's
topicToFns map[pubsub.TopicID]map[functions.FunctionID]struct{}
topicToFns map[subscriptions.TopicID]map[functions.FunctionID]struct{}
log *zap.Logger
}

func newSubscriptionCache(log *zap.Logger) *subscriptionCache {
return &subscriptionCache{
// topicToFns is a map from TopicID to a set of FunctionID's
topicToFns: map[pubsub.TopicID]map[functions.FunctionID]struct{}{},
topicToFns: map[subscriptions.TopicID]map[functions.FunctionID]struct{}{},
log: log,
}
}

func (c *subscriptionCache) Set(k string, v []byte) {
s := pubsub.Subscription{}
s := subscriptions.Subscription{}
err := json.NewDecoder(bytes.NewReader(v)).Decode(&s)
if err != nil {
c.log.Error("Could not deserialize Subscription state!", zap.Error(err), zap.String("key", k), zap.String("value", string(v)))
Expand All @@ -148,7 +148,7 @@ func (c *subscriptionCache) Del(k string, v []byte) {
c.Lock()
defer c.Unlock()

oldSub := pubsub.Subscription{}
oldSub := subscriptions.Subscription{}
err := json.NewDecoder(bytes.NewReader(v)).Decode(&oldSub)
if err != nil {
c.log.Error("Could not deserialize Subscription state during deletion!", zap.Error(err), zap.String("key", k))
Expand Down
10 changes: 5 additions & 5 deletions internal/cache/target.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,14 @@ import (

"github.com/serverless/event-gateway/functions"
"github.com/serverless/event-gateway/internal/kv"
"github.com/serverless/event-gateway/pubsub"
"github.com/serverless/event-gateway/subscriptions"
)

// Targeter is an interface for retrieving cached configuration for driving performance-sensitive routing decisions.
type Targeter interface {
BackingFunction(endpoint pubsub.EndpointID) *functions.FunctionID
BackingFunction(endpoint subscriptions.EndpointID) *functions.FunctionID
Function(functionID functions.FunctionID) *functions.Function
SubscribersOfTopic(topic pubsub.TopicID) []functions.FunctionID
SubscribersOfTopic(topic subscriptions.TopicID) []functions.FunctionID
}

// Target is an implementation of Targeter using the docker/libkv library for watching data in etcd, zookeeper, and
Expand All @@ -30,7 +30,7 @@ type Target struct {

// BackingFunction returns functions and their weights, along with the group ID if this was a Group function target, so
// we can submit events to topics that are fed by both.
func (tc *Target) BackingFunction(endpointID pubsub.EndpointID) *functions.FunctionID {
func (tc *Target) BackingFunction(endpointID subscriptions.EndpointID) *functions.FunctionID {
// try to get the endpoint from our cache
tc.endpointCache.RLock()
defer tc.endpointCache.RUnlock()
Expand All @@ -49,7 +49,7 @@ func (tc *Target) Function(functionID functions.FunctionID) *functions.Function
}

// SubscribersOfTopic is used for determining which functions to forward messages in a topic to.
func (tc *Target) SubscribersOfTopic(topic pubsub.TopicID) []functions.FunctionID {
func (tc *Target) SubscribersOfTopic(topic subscriptions.TopicID) []functions.FunctionID {
tc.subscriptionCache.RLock()
fnSet, exists := tc.subscriptionCache.topicToFns[topic]
tc.subscriptionCache.RUnlock()
Expand Down
3 changes: 0 additions & 3 deletions pubsub/mock/mock.go

This file was deleted.

157 changes: 0 additions & 157 deletions pubsub/mock/store.go

This file was deleted.

4 changes: 2 additions & 2 deletions router/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"time"

"github.com/satori/go.uuid"
"github.com/serverless/event-gateway/pubsub"
"github.com/serverless/event-gateway/subscriptions"
)

// Event is a default event structure. All data that passes through the Event Gateway is formatted as an Event, based on this schema.
Expand Down Expand Up @@ -83,6 +83,6 @@ func fromRequest(r *http.Request) (*Event, error) {
}

type event struct {
topics []pubsub.TopicID
topics []subscriptions.TopicID
payload []byte
}
18 changes: 9 additions & 9 deletions router/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
"github.com/serverless/event-gateway/internal/kv"
"github.com/serverless/event-gateway/internal/metrics"
"github.com/serverless/event-gateway/internal/sync"
"github.com/serverless/event-gateway/pubsub"
"github.com/serverless/event-gateway/subscriptions"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -83,12 +83,12 @@ func TestIntegrationSubscription(t *testing.T) {
// set up pub/sub
eventName := "smileys"

post(testAPIServer.URL+"/v1/subscriptions", pubsub.Subscription{
post(testAPIServer.URL+"/v1/subscriptions", subscriptions.Subscription{
FunctionID: subscriberFnID,
Event: pubsub.TopicID(eventName),
Event: subscriptions.TopicID(eventName),
})

wait10Seconds(router.WaitForSubscriber(pubsub.TopicID(eventName)),
wait10Seconds(router.WaitForSubscriber(subscriptions.TopicID(eventName)),
"timed out waiting for subscriber to be configured!")

emit(testRouterServer.URL, eventName, []byte(expected))
Expand Down Expand Up @@ -129,15 +129,15 @@ func TestIntegrationHTTPSubscription(t *testing.T) {
},
})

post(testAPIServer.URL+"/v1/subscriptions", pubsub.Subscription{
post(testAPIServer.URL+"/v1/subscriptions", subscriptions.Subscription{
FunctionID: functions.FunctionID("supersmileyfunction"),
Event: "http",
Method: "POST",
Path: "/smilez",
})

select {
case <-router.WaitForEndpoint(pubsub.NewEndpointID("POST", "/smilez")):
case <-router.WaitForEndpoint(subscriptions.NewEndpointID("POST", "/smilez")):
case <-time.After(10 * time.Second):
panic("timed out waiting for endpoint to be configured!")
}
Expand Down Expand Up @@ -220,15 +220,15 @@ func newConfigAPIServer(kvstore store.Store, log *zap.Logger) *httptest.Server {
fnsapi := &functions.HTTPAPI{Functions: fns}
fnsapi.RegisterRoutes(apiRouter)

ps := &pubsub.PubSub{
subs := &subscriptions.Subscriptions{
TopicsDB: kv.NewPrefixedStore("/serverless-event-gateway/topics", kvstore),
SubscriptionsDB: kv.NewPrefixedStore("/serverless-event-gateway/subscriptions", kvstore),
EndpointsDB: kv.NewPrefixedStore("/serverless-event-gateway/endpoints", kvstore),
FunctionsDB: fnsDB,
Log: log,
}
psapi := &pubsub.HTTPAPI{PubSub: ps}
psapi.RegisterRoutes(apiRouter)
subsapi := &subscriptions.HTTPAPI{Subscriptions: subs}
subsapi.RegisterRoutes(apiRouter)

return httptest.NewServer(apiRouter)
}
Expand Down
Loading

0 comments on commit 7c225f0

Please sign in to comment.