Skip to content

Commit

Permalink
Turn libbeat.Publisher into interface
Browse files Browse the repository at this point in the history
  • Loading branch information
urso committed Jul 12, 2016
1 parent ab702e2 commit d070c64
Show file tree
Hide file tree
Showing 14 changed files with 126 additions and 69 deletions.
10 changes: 5 additions & 5 deletions filebeat/publish/publish.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ type LogPublisher interface {
}

type syncLogPublisher struct {
pub *publisher.Publisher
pub publisher.Publisher
client publisher.Client
in, out chan []*input.FileEvent

Expand All @@ -27,7 +27,7 @@ type syncLogPublisher struct {
}

type asyncLogPublisher struct {
pub *publisher.Publisher
pub publisher.Publisher
client publisher.Client
in, out chan []*input.FileEvent

Expand Down Expand Up @@ -71,7 +71,7 @@ var (
func New(
async bool,
in, out chan []*input.FileEvent,
pub *publisher.Publisher,
pub publisher.Publisher,
) LogPublisher {
if async {
return newAsyncLogPublisher(in, out, pub)
Expand All @@ -81,7 +81,7 @@ func New(

func newSyncLogPublisher(
in, out chan []*input.FileEvent,
pub *publisher.Publisher,
pub publisher.Publisher,
) *syncLogPublisher {
return &syncLogPublisher{
in: in,
Expand Down Expand Up @@ -138,7 +138,7 @@ func (p *syncLogPublisher) Stop() {

func newAsyncLogPublisher(
in, out chan []*input.FileEvent,
pub *publisher.Publisher,
pub publisher.Publisher,
) *asyncLogPublisher {
return &asyncLogPublisher{
in: in,
Expand Down
2 changes: 1 addition & 1 deletion filebeat/publish/publish_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func TestPublisherModes(t *testing.T) {
regChan := make(chan []*input.FileEvent, len(test.order)+1)
client := pubtest.NewChanClient(0)

pub := New(test.async, pubChan, regChan, client)
pub := New(test.async, pubChan, regChan, pubtest.PublisherWithClient(client))
pub.Start()

var events [][]*input.FileEvent
Expand Down
12 changes: 6 additions & 6 deletions libbeat/beat/beat.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,12 +80,12 @@ type Creator func(*Beat, *common.Config) (Beater, error)
// Beat contains the basic beat data and the publisher client used to publish
// events.
type Beat struct {
Name string // Beat name.
Version string // Beat version number. Defaults to the libbeat version when an implementation does not set a version.
UUID uuid.UUID // ID assigned to a Beat instance.
RawConfig *common.Config // Raw config that can be unpacked to get Beat specific config data.
Config BeatConfig // Common Beat configuration data.
Publisher *publisher.Publisher // Publisher
Name string // Beat name.
Version string // Beat version number. Defaults to the libbeat version when an implementation does not set a version.
UUID uuid.UUID // ID assigned to a Beat instance.
RawConfig *common.Config // Raw config that can be unpacked to get Beat specific config data.
Config BeatConfig // Common Beat configuration data.
Publisher publisher.Publisher // Publisher
}

// BeatConfig struct contains the basic configuration of every beat
Expand Down
4 changes: 2 additions & 2 deletions libbeat/publisher/async.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,15 @@ import (

type asyncPipeline struct {
outputs []worker
pub *Publisher
pub *BeatPublisher
}

const (
defaultBulkSize = 2048
)

func newAsyncPipeline(
pub *Publisher,
pub *BeatPublisher,
hwm, bulkHWM int,
ws *workerSignal,
) *asyncPipeline {
Expand Down
4 changes: 2 additions & 2 deletions libbeat/publisher/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,12 @@ type Client interface {
type client struct {
canceler *op.Canceler

publisher *Publisher
publisher *BeatPublisher
beatMeta common.MapStr // Beat metadata that is added to all events.
globalEventMetadata common.EventMetadata // Fields and tags that are added to all events.
}

func newClient(pub *Publisher) *client {
func newClient(pub *BeatPublisher) *client {
c := &client{
canceler: op.NewCanceler(),

Expand Down
2 changes: 1 addition & 1 deletion libbeat/publisher/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
// ClientOptions.
func TestGetClient(t *testing.T) {
c := &client{
publisher: &Publisher{},
publisher: &BeatPublisher{},
}
c.publisher.pipelines.async = &asyncPipeline{}
c.publisher.pipelines.sync = &syncPipeline{}
Expand Down
4 changes: 2 additions & 2 deletions libbeat/publisher/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ func testEvent() common.MapStr {
}

type testPublisher struct {
pub *Publisher
pub *BeatPublisher
outputMsgHandler *testMessageHandler
client *client
}
Expand All @@ -147,7 +147,7 @@ const (
)

func newTestPublisher(bulkSize int, response OutputResponse) *testPublisher {
pub := &Publisher{}
pub := &BeatPublisher{}
pub.wsOutput.Init()
pub.wsPublisher.Init()

Expand Down
40 changes: 26 additions & 14 deletions libbeat/publisher/publish.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,11 @@ type TransactionalEventPublisher interface {
PublishTransaction(transaction op.Signaler, events []common.MapStr)
}

type Publisher struct {
type Publisher interface {
Connect() Client
}

type BeatPublisher struct {
shipperName string // Shipper name as set in the configuration file
hostname string // Host name as returned by the operation system
name string // The shipperName if configured, the hostname otherwise
Expand All @@ -55,8 +59,8 @@ type Publisher struct {
Index string
Output []*outputWorker
TopologyOutput outputs.TopologyOutputer
IgnoreOutgoing bool
GeoLite *libgeo.GeoIP
ignoreOutgoing bool
geoLite *libgeo.GeoIP
Processors *processors.Processors

globalEventMetadata common.EventMetadata // Fields and tags to add to each event.
Expand Down Expand Up @@ -107,7 +111,7 @@ func init() {
publishDisabled = flag.Bool("N", false, "Disable actual publishing for testing")
}

func (publisher *Publisher) IsPublisherIP(ip string) bool {
func (publisher *BeatPublisher) IsPublisherIP(ip string) bool {
for _, myip := range publisher.IpAddrs {
if myip == ip {
return true
Expand All @@ -117,7 +121,7 @@ func (publisher *Publisher) IsPublisherIP(ip string) bool {
return false
}

func (publisher *Publisher) GetServerName(ip string) string {
func (publisher *BeatPublisher) GetServerName(ip string) string {
// in case the IP is localhost, return current shipper name
islocal, err := common.IsLoopback(ip)
if err != nil {
Expand All @@ -137,18 +141,26 @@ func (publisher *Publisher) GetServerName(ip string) string {
return ""
}

func (publisher *Publisher) Connect() Client {
func (publisher *BeatPublisher) GeoLite() *libgeo.GeoIP {
return publisher.geoLite
}

func (publisher *BeatPublisher) IgnoreOutgoing() bool {
return publisher.ignoreOutgoing
}

func (publisher *BeatPublisher) Connect() Client {
atomic.AddUint32(&publisher.numClients, 1)
return newClient(publisher)
}

func (publisher *Publisher) UpdateTopologyPeriodically() {
func (publisher *BeatPublisher) UpdateTopologyPeriodically() {
for range publisher.RefreshTopologyTimer {
_ = publisher.PublishTopology() // ignore errors
}
}

func (publisher *Publisher) PublishTopology(params ...string) error {
func (publisher *BeatPublisher) PublishTopology(params ...string) error {

localAddrs := params
if len(params) == 0 {
Expand Down Expand Up @@ -178,24 +190,24 @@ func New(
configs map[string]*common.Config,
shipper ShipperConfig,
processors *processors.Processors,
) (*Publisher, error) {
) (*BeatPublisher, error) {

publisher := Publisher{}
publisher := BeatPublisher{}
err := publisher.init(beatName, configs, shipper, processors)
if err != nil {
return nil, err
}
return &publisher, nil
}

func (publisher *Publisher) init(
func (publisher *BeatPublisher) init(
beatName string,
configs map[string]*common.Config,
shipper ShipperConfig,
processors *processors.Processors,
) error {
var err error
publisher.IgnoreOutgoing = shipper.Ignore_outgoing
publisher.ignoreOutgoing = shipper.Ignore_outgoing
publisher.Processors = processors

publisher.disabled = *publishDisabled
Expand All @@ -213,7 +225,7 @@ func (publisher *Publisher) init(
bulkHWM = *shipper.BulkQueueSize
}

publisher.GeoLite = common.LoadGeoIPData(shipper.Geoip)
publisher.geoLite = common.LoadGeoIPData(shipper.Geoip)

publisher.wsPublisher.Init()
publisher.wsOutput.Init()
Expand Down Expand Up @@ -321,7 +333,7 @@ func (publisher *Publisher) init(
return nil
}

func (publisher *Publisher) Stop() {
func (publisher *BeatPublisher) Stop() {
if atomic.LoadUint32(&publisher.numClients) > 0 {
panic("All clients must disconnect before shutting down publisher pipeline")
}
Expand Down
4 changes: 2 additions & 2 deletions libbeat/publisher/publish_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func (topo testTopology) GetNameByIP(ip string) string {

// Test GetServerName.
func TestPublisherTypeGetServerName(t *testing.T) {
pt := &Publisher{name: shipperName}
pt := &BeatPublisher{name: shipperName}
assert.Equal(t, shipperName, pt.GetServerName("127.0.0.1"))

// Unknown hosts return empty string.
Expand All @@ -57,7 +57,7 @@ func TestPublisherTypeUpdateTopologyPeriodically(t *testing.T) {
publishName: make(chan string, 1),
publishLocalAddrs: make(chan []string, 1),
}
pt := &Publisher{
pt := &BeatPublisher{
name: shipperName,
RefreshTopologyTimer: c,
TopologyOutput: topo,
Expand Down
4 changes: 2 additions & 2 deletions libbeat/publisher/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@ package publisher
import "github.com/elastic/beats/libbeat/common/op"

type syncPipeline struct {
pub *Publisher
pub *BeatPublisher
}

func newSyncPipeline(pub *Publisher, hwm, bulkHWM int) *syncPipeline {
func newSyncPipeline(pub *BeatPublisher, hwm, bulkHWM int) *syncPipeline {
return &syncPipeline{pub: pub}
}

Expand Down
12 changes: 12 additions & 0 deletions libbeat/publisher/testing/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@ import (
"github.com/elastic/beats/libbeat/publisher"
)

type TestPublisher struct {
client publisher.Client
}

// given channel only.
type ChanClient struct {
done chan struct{}
Expand All @@ -19,6 +23,14 @@ type PublishMessage struct {
Events []common.MapStr
}

func PublisherWithClient(client publisher.Client) publisher.Publisher {
return &TestPublisher{client}
}

func (pub *TestPublisher) Connect() publisher.Client {
return pub.client
}

func NewChanClient(bufSize int) *ChanClient {
return NewChanClientWith(make(chan PublishMessage, bufSize))
}
Expand Down
14 changes: 9 additions & 5 deletions packetbeat/beater/packetbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,8 @@ func New(b *beat.Beat, rawConfig *common.Config) (beat.Beater, error) {
func (pb *Packetbeat) init(b *beat.Beat) error {

cfg := &pb.Config.Packetbeat

if err := procs.ProcWatcher.Init(cfg.Procs); err != nil {
err := procs.ProcWatcher.Init(cfg.Procs)
if err != nil {
logp.Critical(err.Error())
return err
}
Expand All @@ -113,16 +113,20 @@ func (pb *Packetbeat) init(b *beat.Beat) error {
if b.Config.Shipper.BulkQueueSize != nil {
bulkQueueSize = *b.Config.Shipper.BulkQueueSize
}
pb.Pub = publish.NewPublisher(b.Publisher, queueSize, bulkQueueSize)
pb.Pub, err = publish.NewPublisher(b.Publisher, queueSize, bulkQueueSize)
if err != nil {
return fmt.Errorf("Initializing publisher failed: %v", err)
}

logp.Debug("main", "Initializing protocol plugins")
err := protos.Protos.Init(false, pb.Pub, cfg.Protocols)
err = protos.Protos.Init(false, pb.Pub, cfg.Protocols)
if err != nil {
return fmt.Errorf("Initializing protocol analyzers failed: %v", err)
}

logp.Debug("main", "Initializing sniffer")
if err := pb.setupSniffer(); err != nil {
err = pb.setupSniffer()
if err != nil {
return fmt.Errorf("Initializing sniffer failed: %v", err)
}

Expand Down
Loading

0 comments on commit d070c64

Please sign in to comment.