Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rpctest: integration test harness fixes #2071

Merged
merged 5 commits into from
Dec 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 26 additions & 8 deletions blockchain/sizehelper.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,14 +63,32 @@ const (
loadFactorNum = 13
loadFactorDen = 2

// maxAlloc is the maximum size of an allocation. On 64-bit,
// it's theoretically possible to allocate 1<<heapAddrBits bytes. On
// 32-bit, however, this is one less than 1<<32 because the
// number of bytes in the address space doesn't actually fit
// in a uintptr.
// _64bit = 1 on 64-bit systems, 0 on 32-bit systems
_64bit = 1 << (^uintptr(0) >> 63) / 2

// PtrSize is the size of a pointer in bytes - unsafe.Sizeof(uintptr(0))
// but as an ideal constant. It is also the size of the machine's native
// word size (that is, 4 on 32-bit systems, 8 on 64-bit).
PtrSize = 4 << (^uintptr(0) >> 63)

// heapAddrBits is the number of bits in a heap address that's actually
// available for memory allocation.
//
// NOTE (kcalvinalvin): I just took the constant for a 64 bit system.
maxAlloc = 281474976710656
// NOTE (guggero): For 64-bit systems, we just assume 40 bits of address
// space available, as that seems to be the lowest common denominator.
// See heapAddrBits in runtime/malloc.go of the standard library for
// more details
heapAddrBits = 32 + (_64bit * 8)
guggero marked this conversation as resolved.
Show resolved Hide resolved

// maxAlloc is the maximum size of an allocation on the heap.
//
// NOTE(guggero): With the somewhat simplified heapAddrBits calculation
// above, this will currently limit the maximum allocation size of the
// UTXO cache to around 300GiB on 64-bit systems. This should be more
// than enough for the foreseeable future, but if we ever need to
// increase it, we should probably use the same calculation as the
// standard library.
maxAlloc = (1 << heapAddrBits) - (1-_64bit)*1
)

var class_to_size = [_NumSizeClasses]uint16{0, 8, 16, 24, 32, 48, 64, 80, 96, 112, 128, 144, 160, 176, 192, 208, 224, 240, 256, 288, 320, 352, 384, 416, 448, 480, 512, 576, 640, 704, 768, 896, 1024, 1152, 1280, 1408, 1536, 1792, 2048, 2304, 2688, 3072, 3200, 3456, 4096, 4864, 5376, 6144, 6528, 6784, 6912, 8192, 9472, 9728, 10240, 10880, 12288, 13568, 14336, 16384, 18432, 19072, 20480, 21760, 24576, 27264, 28672, 32768}
Expand Down Expand Up @@ -175,7 +193,7 @@ func calculateMinEntries(totalBytes int, bucketSize int) int {
// mulUintptr returns a * b and whether the multiplication overflowed.
// On supported platforms this is an intrinsic lowered by the compiler.
func mulUintptr(a, b uintptr) (uintptr, bool) {
if a|b < 1<<(4*8) || a == 0 {
if a|b < 1<<(4*PtrSize) || a == 0 {
return a * b, false
}
overflow := b > MaxUintptr/a
Expand Down
52 changes: 15 additions & 37 deletions integration/rpctest/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ package rpctest

import (
"fmt"
"io/ioutil"
"log"
"os"
"os/exec"
Expand All @@ -31,7 +30,7 @@ type nodeConfig struct {
profile string
debugLevel string
extra []string
prefix string
nodeDir string

exe string
endpoint string
Expand All @@ -41,7 +40,7 @@ type nodeConfig struct {
}

// newConfig returns a newConfig with all default values.
func newConfig(prefix, certFile, keyFile string, extra []string,
func newConfig(nodeDir, certFile, keyFile string, extra []string,
customExePath string) (*nodeConfig, error) {

var btcdPath string
Expand All @@ -61,7 +60,7 @@ func newConfig(prefix, certFile, keyFile string, extra []string,
rpcUser: "user",
rpcPass: "pass",
extra: extra,
prefix: prefix,
nodeDir: nodeDir,
exe: btcdPath,
endpoint: "ws",
certFile: certFile,
Expand All @@ -77,17 +76,9 @@ func newConfig(prefix, certFile, keyFile string, extra []string,
// temporary data, and log directories which must be cleaned up with a call to
// cleanup().
func (n *nodeConfig) setDefaults() error {
datadir, err := ioutil.TempDir("", n.prefix+"-data")
if err != nil {
return err
}
n.dataDir = datadir
logdir, err := ioutil.TempDir("", n.prefix+"-logs")
if err != nil {
return err
}
n.logDir = logdir
cert, err := ioutil.ReadFile(n.certFile)
n.dataDir = filepath.Join(n.nodeDir, "data")
n.logDir = filepath.Join(n.nodeDir, "logs")
cert, err := os.ReadFile(n.certFile)
if err != nil {
return err
}
Expand Down Expand Up @@ -163,22 +154,7 @@ func (n *nodeConfig) rpcConnConfig() rpc.ConnConfig {

// String returns the string representation of this nodeConfig.
func (n *nodeConfig) String() string {
return n.prefix
}

// cleanup removes the tmp data and log directories.
func (n *nodeConfig) cleanup() error {
dirs := []string{
n.logDir,
n.dataDir,
}
var err error
for _, dir := range dirs {
if err = os.RemoveAll(dir); err != nil {
log.Printf("Cannot remove dir %s: %v", dir, err)
}
}
return err
return n.nodeDir
}

// node houses the necessary state required to configure, launch, and manage a
Expand Down Expand Up @@ -213,8 +189,7 @@ func (n *node) start() error {
return err
}

pid, err := os.Create(filepath.Join(n.dataDir,
fmt.Sprintf("%s.pid", n.config)))
pid, err := os.Create(filepath.Join(n.dataDir, "btcd.pid"))
if err != nil {
return err
}
Expand Down Expand Up @@ -258,7 +233,10 @@ func (n *node) cleanup() error {
}
}

return n.config.cleanup()
// Since the node's main data directory is passed in to the node config,
// it isn't our responsibility to clean it up. So we're done after
// removing the pid file.
return nil
}

// shutdown terminates the running btcd process, and cleans up all
Expand All @@ -283,11 +261,11 @@ func genCertPair(certFile, keyFile string) error {
}

// Write cert and key files.
if err = ioutil.WriteFile(certFile, cert, 0666); err != nil {
if err = os.WriteFile(certFile, cert, 0666); err != nil {
return err
}
if err = ioutil.WriteFile(keyFile, key, 0600); err != nil {
os.Remove(certFile)
if err = os.WriteFile(keyFile, key, 0600); err != nil {
_ = os.Remove(certFile)
return err
}

Expand Down
134 changes: 121 additions & 13 deletions integration/rpctest/rpc_harness.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ package rpctest

import (
"fmt"
"io/ioutil"
"net"
"os"
"path/filepath"
Expand Down Expand Up @@ -152,8 +151,7 @@ func New(activeNet *chaincfg.Params, handlers *rpcclient.NotificationHandlers,
return nil, err
}

harnessID := strconv.Itoa(numTestInstances)
nodeTestData, err := ioutil.TempDir(testDir, "harness-"+harnessID)
nodeTestData, err := os.MkdirTemp(testDir, "rpc-node")
if err != nil {
return nil, err
}
Expand All @@ -173,7 +171,7 @@ func New(activeNet *chaincfg.Params, handlers *rpcclient.NotificationHandlers,
extraArgs = append(extraArgs, miningAddr)

config, err := newConfig(
"rpctest", certFile, keyFile, extraArgs, customExePath,
nodeTestData, certFile, keyFile, extraArgs, customExePath,
)
if err != nil {
return nil, err
Expand Down Expand Up @@ -248,10 +246,10 @@ func (h *Harness) SetUp(createTestChain bool, numMatureOutputs uint32) error {
// Start the btcd node itself. This spawns a new process which will be
// managed
if err := h.node.start(); err != nil {
return err
return fmt.Errorf("error starting node: %w", err)
Roasbeef marked this conversation as resolved.
Show resolved Hide resolved
}
if err := h.connectRPCClient(); err != nil {
return err
return fmt.Errorf("error connecting RPC client: %w", err)
}

h.wallet.Start()
Expand All @@ -272,8 +270,8 @@ func (h *Harness) SetUp(createTestChain bool, numMatureOutputs uint32) error {
// Create a test chain with the desired number of mature coinbase
// outputs.
if createTestChain && numMatureOutputs != 0 {
numToGenerate := (uint32(h.ActiveNet.CoinbaseMaturity) +
numMatureOutputs)
coinbaseMaturity := uint32(h.ActiveNet.CoinbaseMaturity)
numToGenerate := coinbaseMaturity + numMatureOutputs
_, err := h.Client.Generate(numToGenerate)
if err != nil {
return err
Expand Down Expand Up @@ -351,15 +349,18 @@ func (h *Harness) connectRPCClient() error {
batchConf.HTTPPostMode = true
for i := 0; i < h.MaxConnRetries; i++ {
fail := false
timeout := time.Duration(i) * h.ConnectionRetryTimeout
if client == nil {
if client, err = rpcclient.New(&rpcConf, h.handlers); err != nil {
time.Sleep(time.Duration(i) * h.ConnectionRetryTimeout)
client, err = rpcclient.New(&rpcConf, h.handlers)
if err != nil {
time.Sleep(timeout)
fail = true
}
}
if batchClient == nil {
if batchClient, err = rpcclient.NewBatch(&batchConf); err != nil {
time.Sleep(time.Duration(i) * h.ConnectionRetryTimeout)
batchClient, err = rpcclient.NewBatch(&batchConf)
if err != nil {
time.Sleep(timeout)
fail = true
}
}
Expand All @@ -369,7 +370,9 @@ func (h *Harness) connectRPCClient() error {
}

if client == nil || batchClient == nil {
return fmt.Errorf("connection timeout")
return fmt.Errorf("connection timeout, tried %d times with "+
"timeout %v, last err: %w", h.MaxConnRetries,
h.ConnectionRetryTimeout, err)
}

h.Client = client
Expand Down Expand Up @@ -558,6 +561,111 @@ func NextAvailablePort() int {
panic("no ports available for listening")
}

// NextAvailablePortForProcess returns the first port that is available for
Roasbeef marked this conversation as resolved.
Show resolved Hide resolved
// listening by a new node, using a lock file to make sure concurrent access for
// parallel tasks within the same process don't re-use the same port. It panics
// if no port is found and the maximum available TCP port is reached.
func NextAvailablePortForProcess(pid int) int {
lockFile := filepath.Join(
os.TempDir(), fmt.Sprintf("rpctest-port-pid-%d.lock", pid),
)
timeout := time.After(time.Second)

var (
lockFileHandle *os.File
err error
)
for {
// Attempt to acquire the lock file. If it already exists, wait
// for a bit and retry.
lockFileHandle, err = os.OpenFile(
lockFile, os.O_CREATE|os.O_EXCL, 0600,
)
if err == nil {
// Lock acquired.
break
}

// Wait for a bit and retry.
select {
case <-timeout:
panic("timeout waiting for lock file")
case <-time.After(10 * time.Millisecond):
}
}

// Release the lock file when we're done.
defer func() {
// Always close file first, Windows won't allow us to remove it
// otherwise.
_ = lockFileHandle.Close()
err := os.Remove(lockFile)
if err != nil {
panic(fmt.Errorf("couldn't remove lock file: %w", err))
}
}()

portFile := filepath.Join(
os.TempDir(), fmt.Sprintf("rpctest-port-pid-%d", pid),
)
port, err := os.ReadFile(portFile)
if err != nil {
if !os.IsNotExist(err) {
panic(fmt.Errorf("error reading port file: %w", err))
}
port = []byte(strconv.Itoa(int(defaultNodePort)))
}

lastPort, err := strconv.Atoi(string(port))
if err != nil {
panic(fmt.Errorf("error parsing port: %w", err))
}

// We take the next one.
lastPort++
for lastPort < 65535 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think if we just listen to port 0 it will give us a random free port? Something like this
https://github.com/phayes/freeport/blob/master/freeport.go#L8

or this,
https://gist.github.com/sevkin/96bdae9274465b2d09191384f86ef39d

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but we do want ports in order so we can incrementally distribute them. Otherwise we'd need to keep a list of already used ports in the file instead of just the last one we used.

// If there are no errors while attempting to listen on this
// port, close the socket and return it as available. While it
// could be the case that some other process picks up this port
// between the time the socket is closed and it's reopened in
// the harness node, in practice in CI servers this seems much
// less likely than simply some other process already being
// bound at the start of the tests.
addr := fmt.Sprintf(ListenerFormat, lastPort)
l, err := net.Listen("tcp4", addr)
if err == nil {
err := l.Close()
if err == nil {
err := os.WriteFile(
portFile,
[]byte(strconv.Itoa(lastPort)), 0600,
)
if err != nil {
panic(fmt.Errorf("error updating "+
"port file: %w", err))
}

return lastPort
}
}
lastPort++
}

// No ports available? Must be a mistake.
panic("no ports available for listening")
}

// GenerateProcessUniqueListenerAddresses is a function that returns two
// listener addresses with unique ports per the given process id and should be
// used to overwrite rpctest's default generator which is prone to use colliding
// ports.
func GenerateProcessUniqueListenerAddresses(pid int) (string, string) {
port1 := NextAvailablePortForProcess(pid)
Roasbeef marked this conversation as resolved.
Show resolved Hide resolved
port2 := NextAvailablePortForProcess(pid)
return fmt.Sprintf(ListenerFormat, port1),
fmt.Sprintf(ListenerFormat, port2)
}

// baseDir is the directory path of the temp directory for all rpctest files.
func baseDir() (string, error) {
dirPath := filepath.Join(os.TempDir(), "btcd", "rpctest")
Expand Down
Loading