Skip to content

Commit

Permalink
Allow disabling Kafka integration (#178)
Browse files Browse the repository at this point in the history
* Allow disabling producing to Kafka in configuration

* Fix passing kafka.Writer containing a lock by value

* Change DisableKafka to EnableKafka

* Add missing License and copyright info
  • Loading branch information
ghislainbourgeois authored Sep 23, 2023
1 parent 3fa04ea commit 59f771e
Show file tree
Hide file tree
Showing 5 changed files with 157 additions and 5 deletions.
7 changes: 4 additions & 3 deletions factory/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,10 @@ type Mongodb struct {
}

type KafkaInfo struct {
BrokerUri string `yaml:"brokerUri,omitempty"`
BrokerPort int `yaml:"brokerPort,omitempty"`
Topic string `yaml:"topicName,omitempty"`
BrokerUri string `yaml:"brokerUri,omitempty"`
BrokerPort int `yaml:"brokerPort,omitempty"`
Topic string `yaml:"topicName,omitempty"`
EnableKafka *bool `yaml:"enableKafka,omitempty"`
}

type Configuration struct {
Expand Down
10 changes: 10 additions & 0 deletions factory/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,3 +144,13 @@ func TestCompareGenericSlices(t *testing.T) {
fmt.Println("Generic, The Links match")
}
}

func TestKafkaEnabledByDefault(t *testing.T) {
err := InitConfigFactory("../config/smfcfg.yaml")
if err != nil {
t.Errorf("Could not load default configuration file: %v", err)
}
if *SmfConfig.Configuration.KafkaInfo.EnableKafka != true {
t.Errorf("Expected Kafka to be enabled by default, was disabled")
}
}
5 changes: 5 additions & 0 deletions factory/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ func InitConfigFactory(f string) error {
return yamlErr
}

if SmfConfig.Configuration.KafkaInfo.EnableKafka == nil {
enableKafka := true
SmfConfig.Configuration.KafkaInfo.EnableKafka = &enableKafka
}

roc := os.Getenv("MANAGED_BY_CONFIG_POD")
if roc == "true" {
gClient := client.ConnectToConfigServer("webui:9876")
Expand Down
20 changes: 18 additions & 2 deletions metrics/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,17 @@ import (
)

type Writer struct {
kafkaWriter kafka.Writer
kafkaWriter *kafka.Writer
}

var StatWriter Writer

func InitialiseKafkaStream(config *factory.Configuration) error {

if *config.KafkaInfo.EnableKafka == false {
return nil
}

brokerUrl := "sd-core-kafka-headless:9092"
topicName := "sdcore-data-source-smf"

Expand All @@ -50,7 +54,7 @@ func InitialiseKafkaStream(config *factory.Configuration) error {
}

StatWriter = Writer{
kafkaWriter: producer,
kafkaWriter: &producer,
}
return nil
}
Expand All @@ -61,6 +65,9 @@ func GetWriter() Writer {
}

func (writer Writer) SendMessage(message []byte) error {
if *factory.SmfConfig.Configuration.KafkaInfo.EnableKafka == false {
return nil
}
msg := kafka.Message{Value: message}
if err := writer.kafkaWriter.WriteMessages(context.Background(), msg); err != nil {
logger.KafkaLog.Errorf("kafka send message write error: [%v] ", err.Error())
Expand All @@ -71,6 +78,9 @@ func (writer Writer) SendMessage(message []byte) error {

func (writer Writer) PublishPduSessEvent(ctxt mi.CoreSubscriber, op mi.SubscriberOp) error {

if *factory.SmfConfig.Configuration.KafkaInfo.EnableKafka == false {
return nil
}
smKafkaEvt := mi.MetricEvent{EventType: mi.CSubscriberEvt,
SubscriberData: mi.CoreSubscriberData{Subscriber: ctxt, Operation: op}}
if msg, err := json.Marshal(smKafkaEvt); err != nil {
Expand All @@ -92,6 +102,9 @@ func SetNfInstanceId(s string) {

func PublishMsgEvent(msgType mi.SmfMsgType) error {

if *factory.SmfConfig.Configuration.KafkaInfo.EnableKafka == false {
return nil
}
smKafkaMsgEvt := mi.MetricEvent{EventType: mi.CMsgTypeEvt, MsgType: mi.CoreMsgType{MsgType: msgType.String(), SourceNfId: nfInstanceId}}
if msg, err := json.Marshal(smKafkaMsgEvt); err != nil {
logger.KafkaLog.Errorf("publishing msg event marshal error [%v] ", err.Error())
Expand All @@ -105,6 +118,9 @@ func PublishMsgEvent(msgType mi.SmfMsgType) error {

func (writer Writer) PublishNfStatusEvent(msgEvent mi.MetricEvent) error {

if *factory.SmfConfig.Configuration.KafkaInfo.EnableKafka == false {
return nil
}
if msg, err := json.Marshal(msgEvent); err != nil {
logger.KafkaLog.Errorf("publishing nf status marshal error [%v] ", err.Error())
return err
Expand Down
120 changes: 120 additions & 0 deletions metrics/kafka_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
// SPDX-FileCopyrightText: 2023 Open Networking Foundation <info@opennetworking.org>
// SPDX-License-Identifier: Apache-2.0

package metrics

import (
"testing"

"github.com/omec-project/metricfunc/pkg/metricinfo"
"github.com/omec-project/smf/factory"
)

var my_false bool = false

func TestInitializeKafkaStreamWithKafkaDisabled(t *testing.T) {
config := factory.Configuration{
KafkaInfo: factory.KafkaInfo{
EnableKafka: &my_false,
},
}

result := InitialiseKafkaStream(&config)

if result != nil {
t.Errorf("Expected return value to be nil, got %v", result)
}
if StatWriter.kafkaWriter != nil {
t.Errorf("Expected kafkaWrite to be nil, got %v", StatWriter.kafkaWriter)
}
}

func TestSendMessageWithKafkaDisabled(t *testing.T) {
configuration := factory.Configuration{
KafkaInfo: factory.KafkaInfo{
EnableKafka: &my_false,
},
}
config := factory.Config{
Configuration: &configuration,
}
factory.SmfConfig = config

InitialiseKafkaStream(&configuration)

writer := GetWriter()

// If the kafkaWriter is called, this will panic and fail the test
result := writer.SendMessage([]byte{0xFF})

if result != nil {
t.Errorf("Expected return value to be nil, got %v", result)
}
}

func TestPublishPduSessEventWithKafkaDisabled(t *testing.T) {
configuration := factory.Configuration{
KafkaInfo: factory.KafkaInfo{
EnableKafka: &my_false,
},
}
config := factory.Config{
Configuration: &configuration,
}
factory.SmfConfig = config

InitialiseKafkaStream(&configuration)

writer := GetWriter()

// If the kafkaWriter is called, this will panic and fail the test
result := writer.PublishPduSessEvent(metricinfo.CoreSubscriber{}, 0)

if result != nil {
t.Errorf("Expected return value to be nil, got %v", result)
}
}

func TestPublishMsgEventWithKafkaDisabled(t *testing.T) {
configuration := factory.Configuration{
KafkaInfo: factory.KafkaInfo{
EnableKafka: &my_false,
},
}
config := factory.Config{
Configuration: &configuration,
}
factory.SmfConfig = config

InitialiseKafkaStream(&configuration)

// If the kafkaWriter is called, this will panic and fail the test
result := PublishMsgEvent(0)

if result != nil {
t.Errorf("Expected return value to be nil, got %v", result)
}
}

func TestPublishNfStatusWithKafkaDisabled(t *testing.T) {
configuration := factory.Configuration{
KafkaInfo: factory.KafkaInfo{
EnableKafka: &my_false,
},
}
config := factory.Config{
Configuration: &configuration,
}
factory.SmfConfig = config

InitialiseKafkaStream(&configuration)

writer := GetWriter()

// If the kafkaWriter is called, this will panic and fail the test
result := writer.PublishNfStatusEvent(metricinfo.MetricEvent{})

if result != nil {
t.Errorf("Expected return value to be nil, got %v", result)
}
}

0 comments on commit 59f771e

Please sign in to comment.