Skip to content

Commit

Permalink
bulker: ingest: more flexible logic for finding stream
Browse files Browse the repository at this point in the history
  • Loading branch information
absorbb committed Sep 11, 2023
1 parent 3519f8e commit c4b7526
Showing 1 changed file with 82 additions and 35 deletions.
117 changes: 82 additions & 35 deletions bulkerapp/app/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,41 +288,7 @@ func (r *Router) IngestHandler(c *gin.Context) {
r.Debugf("[ingest] Message ID: %s Domain: %s", messageId, domain)
logFormat := "[ingest] Message ID: %s Domain: %s"

var stream *StreamWithDestinations
if ingestMessage.WriteKey != "" {
parts := strings.Split(ingestMessage.WriteKey, ":")
if len(parts) == 1 {
stream, err = r.fastStore.GetStreamById(ingestMessage.WriteKey)
} else {
var binding *ApiKeyBinding
binding, err = r.fastStore.getStreamByKeyId(parts[0])
if err == nil && binding != nil {
if binding.KeyType != ingestMessage.IngestType {
err = fmt.Errorf("invalid key type: found %s, expected %s", binding.KeyType, ingestMessage.IngestType)
} else if !r.checkHash(binding.Hash, parts[1]) {
err = fmt.Errorf("invalid key secret")
} else {
stream, err = r.fastStore.GetStreamById(binding.StreamId)
}
}
}
} else if ingestMessage.Origin.Slug != "" {
eventsLogId = ingestMessage.Origin.Slug
stream, err = r.fastStore.GetStreamById(ingestMessage.Origin.Slug)
} else if ingestMessage.Origin.Domain != "" {
var streams []StreamWithDestinations
streams, err = r.fastStore.GetStreamsByDomain(ingestMessage.Origin.Domain)
if len(streams) > 1 {
rError = r.ResponseError(c, http.StatusBadRequest, "error getting stream", false, fmt.Errorf("multiple streams found for domain %s. Please use 'writeKey' message property to select a concrete stream", ingestMessage.Origin.Domain), logFormat, messageId, domain)
return
} else if len(streams) == 1 {
stream = &streams[0]
}
}
if err != nil {
rError = r.ResponseError(c, http.StatusInternalServerError, "error getting stream", false, err, logFormat, messageId, domain)
return
}
stream := r.getStream(ingestMessage)
if stream == nil {
rError = r.ResponseError(c, http.StatusNotFound, "stream not found", false, nil, logFormat, messageId, domain)
return
Expand Down Expand Up @@ -580,3 +546,84 @@ type IngestMessage struct {
HttpPayload map[string]any `json:"httpPayload"`
Geo map[string]any `json:"geo"`
}

type StreamCredentials struct {
Slug string `json:"slug"`
Domain string `json:"domain"`
WriteKey string `json:"writeKey"`
IngestType string `json:"ingestType"`
}

type StreamLocator func(message IngestMessage) *StreamWithDestinations

func (r *Router) getStream(ingestMessage IngestMessage) *StreamWithDestinations {
var locators []StreamLocator
if ingestMessage.IngestType == "s2s" {
locators = []StreamLocator{r.WriteKeyStreamLocator, r.SlugStreamLocator, r.DomainStreamLocator}
} else {
locators = []StreamLocator{r.SlugStreamLocator, r.DomainStreamLocator, r.WriteKeyStreamLocator}
}
for _, locator := range locators {
stream := locator(ingestMessage)
if stream != nil {
return stream
}
}
return nil
}

func (r *Router) WriteKeyStreamLocator(ingestMessage IngestMessage) *StreamWithDestinations {
if ingestMessage.WriteKey != "" {
parts := strings.Split(ingestMessage.WriteKey, ":")
if len(parts) == 1 {
stream, err := r.fastStore.GetStreamById(ingestMessage.WriteKey)
if err != nil {
r.Errorf("error getting stream: %v", err)
} else {
return stream
}
} else {
var binding *ApiKeyBinding
binding, err := r.fastStore.getStreamByKeyId(parts[0])
if err == nil && binding != nil {
if binding.KeyType != ingestMessage.IngestType {
r.Errorf("invalid key type: found %s, expected %s", binding.KeyType, ingestMessage.IngestType)
} else if !r.checkHash(binding.Hash, parts[1]) {
r.Errorf("invalid key secret")
} else {
stream, err := r.fastStore.GetStreamById(binding.StreamId)
if err != nil {
r.Errorf("error getting stream: %v", err)
} else {
return stream
}
}
}
}
}
return nil
}

func (r *Router) SlugStreamLocator(ingestMessage IngestMessage) *StreamWithDestinations {
if ingestMessage.Origin.Slug != "" {
stream, err := r.fastStore.GetStreamById(ingestMessage.Origin.Slug)
if err != nil {
r.Errorf("error getting stream: %v", err)
} else {
return stream
}
}
return nil
}

func (r *Router) DomainStreamLocator(ingestMessage IngestMessage) *StreamWithDestinations {
if ingestMessage.Origin.Domain != "" {
streams, err := r.fastStore.GetStreamsByDomain(ingestMessage.Origin.Domain)
if err != nil {
r.Errorf("error getting stream: %v", err)
} else if len(streams) == 1 {
return &streams[0]
}
}
return nil
}

0 comments on commit c4b7526

Please sign in to comment.