Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cherry-pick optional ws url #1535

Merged
merged 11 commits into from
Nov 18, 2024
5 changes: 5 additions & 0 deletions .changeset/four-kangaroos-appear.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": patch
---

Add config validation so it requires ws url when http polling disabled #bugfix
8 changes: 8 additions & 0 deletions .changeset/moody-rules-agree.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
---
"chainlink": patch
---

- register polling subscription to avoid subscription leaking when rpc client gets closed.
- add a temporary special treatment for SubscribeNewHead before we replace it with SubscribeToHeads. Add a goroutine that forwards new head from poller to caller channel.
- fix a deadlock in poller, by using a new lock for subs slice in rpc client.
#bugfix
8 changes: 8 additions & 0 deletions .changeset/silly-lies-boil.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
---
"chainlink": minor
---

Make websocket URL `WSURL` for `EVM.Nodes` optional, and apply logic so that:
* If WS URL was not provided, SubscribeFilterLogs should fail with an explicit error
* If WS URL was not provided LogBroadcaster should be disabled
#nops
7 changes: 6 additions & 1 deletion .github/workflows/ccip-offchain-upgrade-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,8 @@ jobs:
if: needs.changes.outputs.src == 'true' || github.event_name == 'workflow_dispatch'
needs: [ changes ]
environment: integration
outputs:
tag: ${{ steps.build-test-image.outputs.test_image_tag }}
permissions:
id-token: write
contents: read
Expand All @@ -136,11 +138,14 @@ jobs:
repository: smartcontractkit/ccip
ref: ${{ github.event.pull_request.head.sha || github.sha }}
- name: Build Test Image
id: build-test-image
if: needs.changes.outputs.src == 'true' || github.event_name == 'workflow_dispatch'
uses: smartcontractkit/.github/actions/ctf-build-test-image@a5e4f4c8fbb8e15ab2ad131552eca6ac83c4f4b3 # ctf-build-test-image@0.1.0
with:
# we just want to build the load tests
suites: ccip-tests/load ccip-tests/smoke
repository: chainlink-ccip-tests
tag: ${{ github.sha }}
QA_AWS_ROLE_TO_ASSUME: ${{ secrets.QA_AWS_ROLE_TO_ASSUME }}
QA_AWS_REGION: ${{ secrets.QA_AWS_REGION }}
QA_AWS_ACCOUNT_NUMBER: ${{ secrets.QA_AWS_ACCOUNT_NUMBER }}
Expand Down Expand Up @@ -249,7 +254,7 @@ jobs:
env:
BASE64_CONFIG_OVERRIDE: ${{ steps.set_override_config.outputs.base_64_override }},${{ steps.setup_create_base64_config_ccip.outputs.base64_config }}
TEST_BASE64_CONFIG_OVERRIDE: ${{ steps.set_override_config.outputs.base_64_override }},${{ steps.setup_create_base64_config_ccip.outputs.base64_config }}
ENV_JOB_IMAGE: ${{ env.ENV_JOB_IMAGE_BASE }}:${{ github.sha }}
ENV_JOB_IMAGE: ${{ env.ENV_JOB_IMAGE_BASE }}:${{ needs.build-test-image.outputs.tag }}
TEST_SUITE: smoke
TEST_ARGS: -test.timeout 30m
TEST_LOG_LEVEL: info
Expand Down
17 changes: 11 additions & 6 deletions common/client/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,14 +91,14 @@ type node[
services.StateMachine
lfcLog logger.Logger
name string
id int32
id int
chainID CHAIN_ID
nodePoolCfg NodeConfig
chainCfg ChainConfig
order int32
chainFamily string

ws url.URL
ws *url.URL
http *url.URL

rpc RPC
Expand All @@ -121,10 +121,10 @@ func NewNode[
nodeCfg NodeConfig,
chainCfg ChainConfig,
lggr logger.Logger,
wsuri url.URL,
wsuri *url.URL,
httpuri *url.URL,
name string,
id int32,
id int,
chainID CHAIN_ID,
nodeOrder int32,
rpc RPC,
Expand All @@ -136,8 +136,10 @@ func NewNode[
n.chainID = chainID
n.nodePoolCfg = nodeCfg
n.chainCfg = chainCfg
n.ws = wsuri
n.order = nodeOrder
if wsuri != nil {
n.ws = wsuri
}
if httpuri != nil {
n.http = httpuri
}
Expand All @@ -157,7 +159,10 @@ func NewNode[
}

func (n *node[CHAIN_ID, HEAD, RPC]) String() string {
s := fmt.Sprintf("(%s)%s:%s", Primary.String(), n.name, n.ws.String())
s := fmt.Sprintf("(%s)%s", Primary.String(), n.name)
if n.ws != nil {
s = s + fmt.Sprintf(":%s", n.ws.String())
}
if n.http != nil {
s = s + fmt.Sprintf(":%s", n.http.String())
}
Expand Down
4 changes: 2 additions & 2 deletions common/client/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,10 @@ type testNodeOpts struct {
config testNodeConfig
chainConfig clientMocks.ChainConfig
lggr logger.Logger
wsuri url.URL
wsuri *url.URL
httpuri *url.URL
name string
id int32
id int
chainID types.ID
nodeOrder int32
rpc *mockNodeClient[types.ID, Head]
Expand Down
18 changes: 12 additions & 6 deletions core/chains/evm/client/config_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,15 +80,21 @@ func NewClientConfigs(
func parseNodeConfigs(nodeCfgs []NodeConfig) ([]*toml.Node, error) {
nodes := make([]*toml.Node, len(nodeCfgs))
for i, nodeCfg := range nodeCfgs {
if nodeCfg.WSURL == nil || nodeCfg.HTTPURL == nil {
return nil, fmt.Errorf("node config [%d]: missing WS or HTTP URL", i)
var wsURL, httpURL *commonconfig.URL
// wsUrl requirement will be checked in EVMConfig validation
if nodeCfg.WSURL != nil {
wsURL = commonconfig.MustParseURL(*nodeCfg.WSURL)
}
wsUrl := commonconfig.MustParseURL(*nodeCfg.WSURL)
httpUrl := commonconfig.MustParseURL(*nodeCfg.HTTPURL)

if nodeCfg.HTTPURL == nil {
return nil, fmt.Errorf("node config [%d]: missing HTTP URL", i)
}

httpURL = commonconfig.MustParseURL(*nodeCfg.HTTPURL)
node := &toml.Node{
Name: nodeCfg.Name,
WSURL: wsUrl,
HTTPURL: httpUrl,
WSURL: wsURL,
HTTPURL: httpURL,
SendOnly: nodeCfg.SendOnly,
Order: nodeCfg.Order,
}
Expand Down
4 changes: 2 additions & 2 deletions core/chains/evm/client/config_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,15 +90,15 @@ func TestNodeConfigs(t *testing.T) {
require.Len(t, tomlNodes, len(nodeConfigs))
})

t.Run("parsing missing ws url fails", func(t *testing.T) {
t.Run("ws can be optional", func(t *testing.T) {
nodeConfigs := []client.NodeConfig{
{
Name: ptr("foo1"),
HTTPURL: ptr("http://foo1.test"),
},
}
_, err := client.ParseTestNodeConfigs(nodeConfigs)
require.Error(t, err)
require.Nil(t, err)
})

t.Run("parsing missing http url fails", func(t *testing.T) {
Expand Down
11 changes: 7 additions & 4 deletions core/chains/evm/client/evm_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,25 @@ import (
)

func NewEvmClient(cfg evmconfig.NodePool, chainCfg commonclient.ChainConfig, clientErrors evmconfig.ClientErrors, lggr logger.Logger, chainID *big.Int, nodes []*toml.Node, chainType chaintype.ChainType) Client {
var empty url.URL
var primaries []commonclient.Node[*big.Int, *evmtypes.Head, RPCClient]
var sendonlys []commonclient.SendOnlyNode[*big.Int, RPCClient]
largePayloadRPCTimeout, defaultRPCTimeout := getRPCTimeouts(chainType)
for i, node := range nodes {
var ws *url.URL
if node.WSURL != nil {
ws = (*url.URL)(node.WSURL)
}
if node.SendOnly != nil && *node.SendOnly {
rpc := NewRPCClient(lggr, empty, (*url.URL)(node.HTTPURL), *node.Name, int32(i), chainID,
rpc := NewRPCClient(lggr, nil, (*url.URL)(node.HTTPURL), *node.Name, i, chainID,
commonclient.Secondary, cfg.FinalizedBlockPollInterval(), cfg.NewHeadsPollInterval(), largePayloadRPCTimeout, defaultRPCTimeout, chainType)
sendonly := commonclient.NewSendOnlyNode(lggr, (url.URL)(*node.HTTPURL),
*node.Name, chainID, rpc)
sendonlys = append(sendonlys, sendonly)
} else {
rpc := NewRPCClient(lggr, (url.URL)(*node.WSURL), (*url.URL)(node.HTTPURL), *node.Name, int32(i),
rpc := NewRPCClient(lggr, ws, (*url.URL)(node.HTTPURL), *node.Name, i,
chainID, commonclient.Primary, cfg.FinalizedBlockPollInterval(), cfg.NewHeadsPollInterval(), largePayloadRPCTimeout, defaultRPCTimeout, chainType)
primaryNode := commonclient.NewNode(cfg, chainCfg,
lggr, (url.URL)(*node.WSURL), (*url.URL)(node.HTTPURL), *node.Name, int32(i), chainID, *node.Order,
lggr, ws, (*url.URL)(node.HTTPURL), *node.Name, i, chainID, *node.Order,
rpc, "EVM")
primaries = append(primaries, primaryNode)
}
Expand Down
10 changes: 5 additions & 5 deletions core/chains/evm/client/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ func NewChainClientWithTestNode(
rpcUrl string,
rpcHTTPURL *url.URL,
sendonlyRPCURLs []url.URL,
id int32,
id int,
chainID *big.Int,
) (Client, error) {
parsed, err := url.ParseRequestURI(rpcUrl)
Expand All @@ -148,10 +148,10 @@ func NewChainClientWithTestNode(
}

lggr := logger.Test(t)
rpc := NewRPCClient(lggr, *parsed, rpcHTTPURL, "eth-primary-rpc-0", id, chainID, commonclient.Primary, 0, 0, commonclient.QueryTimeout, commonclient.QueryTimeout, "")
rpc := NewRPCClient(lggr, parsed, rpcHTTPURL, "eth-primary-rpc-0", id, chainID, commonclient.Primary, 0, 0, commonclient.QueryTimeout, commonclient.QueryTimeout, "")

n := commonclient.NewNode[*big.Int, *evmtypes.Head, RPCClient](
nodeCfg, clientMocks.ChainConfig{NoNewHeadsThresholdVal: noNewHeadsThreshold}, lggr, *parsed, rpcHTTPURL, "eth-primary-node-0", id, chainID, 1, rpc, "EVM")
nodeCfg, clientMocks.ChainConfig{NoNewHeadsThresholdVal: noNewHeadsThreshold}, lggr, parsed, rpcHTTPURL, "eth-primary-node-0", id, chainID, 1, rpc, "EVM")
primaries := []commonclient.Node[*big.Int, *evmtypes.Head, RPCClient]{n}

var sendonlys []commonclient.SendOnlyNode[*big.Int, RPCClient]
Expand All @@ -160,7 +160,7 @@ func NewChainClientWithTestNode(
return nil, pkgerrors.Errorf("sendonly ethereum rpc url scheme must be http(s): %s", u.String())
}
var empty url.URL
rpc := NewRPCClient(lggr, empty, &sendonlyRPCURLs[i], fmt.Sprintf("eth-sendonly-rpc-%d", i), id, chainID, commonclient.Secondary, 0, 0, commonclient.QueryTimeout, commonclient.QueryTimeout, "")
rpc := NewRPCClient(lggr, &empty, &sendonlyRPCURLs[i], fmt.Sprintf("eth-sendonly-rpc-%d", i), id, chainID, commonclient.Secondary, 0, 0, commonclient.QueryTimeout, commonclient.QueryTimeout, "")
s := commonclient.NewSendOnlyNode[*big.Int, RPCClient](
lggr, u, fmt.Sprintf("eth-sendonly-%d", i), chainID, rpc)
sendonlys = append(sendonlys, s)
Expand Down Expand Up @@ -206,7 +206,7 @@ func NewChainClientWithMockedRpc(
parsed, _ := url.ParseRequestURI("ws://test")

n := commonclient.NewNode[*big.Int, *evmtypes.Head, RPCClient](
cfg, clientMocks.ChainConfig{NoNewHeadsThresholdVal: noNewHeadsThreshold}, lggr, *parsed, nil, "eth-primary-node-0", 1, chainID, 1, rpc, "EVM")
cfg, clientMocks.ChainConfig{NoNewHeadsThresholdVal: noNewHeadsThreshold}, lggr, parsed, nil, "eth-primary-node-0", 1, chainID, 1, rpc, "EVM")
primaries := []commonclient.Node[*big.Int, *evmtypes.Head, RPCClient]{n}
clientErrors := NewTestClientErrors()
c := NewChainClient(lggr, selectionMode, leaseDuration, noNewHeadsThreshold, primaries, nil, chainID, chainType, &clientErrors, 0)
Expand Down
Loading
Loading