Skip to content

Commit

Permalink
setup non-deterministic testing
Browse files Browse the repository at this point in the history
  • Loading branch information
Ergels Gaxhaj committed Feb 4, 2022
1 parent c2fe76f commit d11bcf8
Show file tree
Hide file tree
Showing 7 changed files with 159 additions and 8 deletions.
2 changes: 1 addition & 1 deletion baseapp/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@ func (app *BaseApp) Commit() (res abci.ResponseCommit) {
// each listener has an internal wait threshold after which it sends `false` to the ListenSuccess() channel
// but the BaseApp also imposes a global wait limit
if app.globalWaitLimit > 0 {
maxWait := time.NewTicker(app.globalWaitLimit)
maxWait := time.NewTicker(app.globalWaitLimit * time.Millisecond)
defer maxWait.Stop()
for _, lis := range app.abciListeners {
select {
Expand Down
2 changes: 1 addition & 1 deletion baseapp/baseapp.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ type BaseApp struct { // nolint: maligned
// and exposing the requests and responses to external consumers
abciListeners []ABCIListener

// globalWaitTime is the maximum amount of time the BaseApp will wait for positive acknowledgement of message
// globalWaitLimit is the maximum amount of time the BaseApp will wait for positive acknowledgement of message
// receipt from ABCIListeners before halting
globalWaitLimit time.Duration
}
Expand Down
4 changes: 2 additions & 2 deletions plugin/loader/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,12 +92,12 @@ type PluginLoader struct {
// NewPluginLoader creates new plugin loader
func NewPluginLoader(opts serverTypes.AppOptions, logger logging.Logger) (*PluginLoader, error) {
loader := &PluginLoader{plugins: make(map[string]plugin.Plugin, len(preloadPlugins)), opts: opts, logger: logger}
loader.disabled = cast.ToStringSlice(opts.Get(fmt.Sprintf("%s.%s", plugin.PLUGINS_TOML_KEY, plugin.PLUGINS_DISABLED_TOML_KEY)))
for _, v := range preloadPlugins {
if err := loader.Load(v); err != nil {
return nil, err
}
}
loader.disabled = cast.ToStringSlice(opts.Get(plugin.PLUGINS_DISABLED_TOML_KEY))
pluginDir := cast.ToString(opts.Get(plugin.PLUGINS_DIR_TOML_KEY))
if pluginDir == "" {
pluginDir = filepath.Join(os.Getenv("GOPATH"), plugin.DEFAULT_PLUGINS_DIRECTORY)
Expand Down Expand Up @@ -138,7 +138,7 @@ func (loader *PluginLoader) Load(pl plugin.Plugin) error {
name, ppl.Version(), pl.Version())
}
if sliceContainsStr(loader.disabled, name) {
loader.logger.Info("not loading disabled plugin", "plugin name", name)
loader.logger.Info("not loading disabled plugin", "name", name)
return nil
}
loader.plugins[name] = pl
Expand Down
2 changes: 1 addition & 1 deletion plugin/plugins/file/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func NewFileStreamingService(writeDir, filePrefix string, storeKeys []types.Stor
stateCache: make([][]byte, 0),
stateCacheLock: new(sync.Mutex),
ack: ack,
ackChan: make(chan bool),
ackChan: make(chan bool, 1),
}, nil
}

Expand Down
2 changes: 1 addition & 1 deletion plugin/plugins/kafka/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func NewKafkaStreamingService(
stateCache: make([][]byte, 0),
stateCacheLock: new(sync.Mutex),
ack: ack,
ackChan: make (chan bool),
ackChan: make (chan bool, 1),
}

go func() {
Expand Down
2 changes: 1 addition & 1 deletion plugin/plugins/trace/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func NewTraceStreamingService(
stateCacheLock: new(sync.Mutex),
printDataToStdout: printDataToStdout,
ack: ack,
ackChan: make(chan bool),
ackChan: make(chan bool, 1),
}

return tss, nil
Expand Down
153 changes: 152 additions & 1 deletion simapp/sim_test.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,22 @@
package simapp

import (
"context"
"encoding/json"
"fmt"
"github.com/confluentinc/confluent-kafka-go/kafka"
"github.com/cosmos/cosmos-sdk/plugin"
"github.com/cosmos/cosmos-sdk/server/types"
"github.com/spf13/cast"
"github.com/spf13/viper"
tmos "github.com/tendermint/tendermint/libs/os"
"math/rand"
"os"
"path/filepath"
"runtime/debug"
"strings"
"testing"
"time"

storetypes "github.com/cosmos/cosmos-sdk/store/types"
"github.com/stretchr/testify/require"
Expand All @@ -33,6 +42,9 @@ import (
"github.com/cosmos/cosmos-sdk/x/simulation"
slashingtypes "github.com/cosmos/cosmos-sdk/x/slashing/types"
stakingtypes "github.com/cosmos/cosmos-sdk/x/staking/types"

kafkaplugin "github.com/cosmos/cosmos-sdk/plugin/plugins/kafka"
kafkaservice "github.com/cosmos/cosmos-sdk/plugin/plugins/kafka/service"
)

// Get flags every time the simulator is run
Expand Down Expand Up @@ -310,8 +322,21 @@ func TestAppStateDeterminism(t *testing.T) {
logger = log.NewNopLogger()
}

appOpts := loadAppOptions()
disabledPlugins := cast.ToStringSlice(appOpts.Get(fmt.Sprintf("%s.%s", plugin.PLUGINS_TOML_KEY, plugin.PLUGINS_DISABLED_TOML_KEY)))
var kafkaDisabled bool = false
for _, p := range disabledPlugins {
if kafkaplugin.PLUGIN_NAME == p {
kafkaDisabled = true
break
}
}
if !kafkaDisabled {
prepKafkaTopics(appOpts)
}

db := dbm.NewMemDB()
app := NewSimApp(logger, db, nil, true, map[int64]bool{}, DefaultNodeHome, FlagPeriodValue, MakeTestEncodingConfig(), EmptyAppOptions{}, interBlockCacheOpt())
app := NewSimApp(logger, db, nil, true, map[int64]bool{}, DefaultNodeHome, FlagPeriodValue, MakeTestEncodingConfig(), appOpts, interBlockCacheOpt())

fmt.Printf(
"running non-determinism simulation; seed %d: %d/%d, attempt: %d/%d\n",
Expand Down Expand Up @@ -347,3 +372,129 @@ func TestAppStateDeterminism(t *testing.T) {
}
}
}

func loadAppOptions() types.AppOptions {
// load plugin config
usrHomeDir, _ := os.UserHomeDir()
confFile := filepath.Join(usrHomeDir, "app.toml")
vpr := viper.New()
vpr.SetConfigFile(confFile)
err := vpr.ReadInConfig()
if err != nil {
tmos.Exit(err.Error())
}
return vpr
}

func prepKafkaTopics(opts types.AppOptions) {
// kafka topic setup
topicPrefix := cast.ToString(opts.Get(fmt.Sprintf("%s.%s.%s.%s", plugin.PLUGINS_TOML_KEY, plugin.STREAMING_TOML_KEY, kafkaplugin.PLUGIN_NAME, kafkaplugin.TOPIC_PREFIX_PARAM)))
bootstrapServers := cast.ToString(opts.Get(fmt.Sprintf("%s.%s.%s.%s.%s", plugin.PLUGINS_TOML_KEY, plugin.STREAMING_TOML_KEY, kafkaplugin.PLUGIN_NAME, kafkaplugin.PRODUCER_CONFIG_PARAM, "bootstrap_servers")))
bootstrapServers = strings.ReplaceAll(bootstrapServers, "_", ".")
topics := []string{
string(kafkaservice.BeginBlockReqTopic),
kafkaservice.BeginBlockResTopic,
kafkaservice.DeliverTxReqTopic,
kafkaservice.DeliverTxResTopic,
kafkaservice.EndBlockReqTopic,
kafkaservice.EndBlockResTopic,
kafkaservice.StateChangeTopic,
}
deleteTopics(topicPrefix, topics, bootstrapServers)
createTopics(topicPrefix, topics, bootstrapServers)
}

func createTopics(topicPrefix string, topics []string, bootstrapServers string) {

adminClient, err := kafka.NewAdminClient(&kafka.ConfigMap{
"bootstrap.servers": bootstrapServers,
"broker.version.fallback": "0.10.0.0",
"api.version.fallback.ms": 0,
})
if err != nil {
fmt.Printf("Failed to create Admin client: %s\n", err)
tmos.Exit(err.Error())
}

// Contexts are used to abort or limit the amount of time
// the Admin call blocks waiting for a result.
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// Create topics on cluster.
// Set Admin options to wait for the operation to finish (or at most 60s)
maxDuration, err := time.ParseDuration("60s")
if err != nil {
fmt.Printf("time.ParseDuration(60s)")
tmos.Exit(err.Error())
}

var _topics []kafka.TopicSpecification
for _, s := range topics {
_topics = append(_topics,
kafka.TopicSpecification{
Topic: fmt.Sprintf("%s-%s", topicPrefix, s),
NumPartitions: 1,
ReplicationFactor: 1})
}
results, err := adminClient.CreateTopics(ctx, _topics, kafka.SetAdminOperationTimeout(maxDuration))
if err != nil {
fmt.Printf("Problem during the topicPrefix creation: %v\n", err)
tmos.Exit(err.Error())
}

// Check for specific topicPrefix errors
for _, result := range results {
if result.Error.Code() != kafka.ErrNoError &&
result.Error.Code() != kafka.ErrTopicAlreadyExists {
fmt.Printf("Topic creation failed for %s: %v",
result.Topic, result.Error.String())
tmos.Exit(err.Error())
}
}

adminClient.Close()
}

func deleteTopics(topicPrefix string, topics []string, bootstrapServers string) {
// Create a new AdminClient.
// AdminClient can also be instantiated using an existing
// Producer or Consumer instance, see NewAdminClientFromProducer and
// NewAdminClientFromConsumer.
a, err := kafka.NewAdminClient(&kafka.ConfigMap{"bootstrap.servers": bootstrapServers})
if err != nil {
fmt.Printf("Failed to create Admin client: %s\n", err)
tmos.Exit(err.Error())
}

// Contexts are used to abort or limit the amount of time
// the Admin call blocks waiting for a result.
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// Delete topics on cluster
// Set Admin options to wait for the operation to finish (or at most 60s)
maxDur, err := time.ParseDuration("60s")
if err != nil {
fmt.Printf("ParseDuration(60s)")
tmos.Exit(err.Error())
}

var _topics []string
for _, s := range topics {
_topics = append(_topics, fmt.Sprintf("%s-%s", topicPrefix, s))
}

results, err := a.DeleteTopics(ctx, _topics, kafka.SetAdminOperationTimeout(maxDur))
if err != nil {
fmt.Printf("Failed to delete topics: %v\n", err)
tmos.Exit(err.Error())
}

// Print results
for _, result := range results {
fmt.Printf("%s\n", result)
}

a.Close()
}

0 comments on commit d11bcf8

Please sign in to comment.