From cf785b98b00808e26ba589d3223a41a985f3ae9d Mon Sep 17 00:00:00 2001 From: Alexander Korelskiy Date: Wed, 7 Aug 2024 15:43:36 +0300 Subject: [PATCH] Migrate to new JetStream API, update linter settings, use go 1.22 --- .github/workflows/go.yml | 4 +- .github/workflows/golangci-lint.yml | 5 +-- .golangci.yml | 36 +++++++--------- Dockerfile | 2 +- README.md | 12 ++---- go.mod | 27 ++++++------ go.sum | 64 ++++++++++++++++------------- pkg/app/app.go | 17 ++++---- pkg/app/handlers.go | 2 +- pkg/app/queue.go | 11 ++--- pkg/app/queue_test.go | 5 ++- pkg/rpcqueue/client.go | 5 ++- pkg/rpcqueue/rpcqueue.go | 36 ++++++++-------- 13 files changed, 111 insertions(+), 115 deletions(-) diff --git a/.github/workflows/go.yml b/.github/workflows/go.yml index ebd17ad..c409b9a 100644 --- a/.github/workflows/go.yml +++ b/.github/workflows/go.yml @@ -26,7 +26,7 @@ jobs: - name: Set up Go uses: actions/setup-go@v2 with: - go-version: '1.20.x' + go-version: '1.22.x' - name: Test env: @@ -41,7 +41,7 @@ jobs: - name: Set up Go uses: actions/setup-go@v2 with: - go-go-version: '1.20.x' + go-go-version: '1.22.x' - name: Build run: go build -v ./... diff --git a/.github/workflows/golangci-lint.yml b/.github/workflows/golangci-lint.yml index fa32cbd..7f22cf9 100644 --- a/.github/workflows/golangci-lint.yml +++ b/.github/workflows/golangci-lint.yml @@ -5,7 +5,6 @@ on: - v* branches: - master - - main pull_request: permissions: contents: read @@ -18,13 +17,13 @@ jobs: steps: - uses: actions/setup-go@v3 with: - go-version: '1.20.x' + go-version: '1.22.x' - uses: actions/checkout@v3 - name: golangci-lint uses: golangci/golangci-lint-action@v3 with: # Optional: version of golangci-lint to use in form of v1.2 or v1.2.3 or `latest` to use the latest version - version: v1.54.2 + version: v1.59.1 # Optional: working directory, useful for monorepos # working-directory: somedir diff --git a/.golangci.yml b/.golangci.yml index 5934d60..d2afec8 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -11,13 +11,23 @@ run: # include test files or not, default is true tests: true - # skip files - skip-files: +issues: + # Which files to exclude: they will be analyzed, but issues from them won't be reported. + # There is no need to include all autogenerated files, + # we confidently recognize autogenerated files. + # If it's not, please let us know. + # "/" will be replaced by current OS file path separator to properly work on Windows. + # Default: [] + exclude-files: - ".*\\_gen\\.go$" output: - # colored-line-number|line-number|json|tab|checkstyle|code-climate, default is "colored-line-number" - format: colored-line-number + # Default: + # formats: + # - format: colored-line-number + # path: stdout + formats: + - format: colored-line-number # print lines of code with issue, default is true print-issued-lines: true @@ -34,32 +44,14 @@ linters-settings: # Enable multiple checks by tags, run `GL_DEBUG=gocritic golangci-lint` run to see all tags and checks. # Empty list by default. See https://github.com/go-critic/go-critic#usage -> section "Tags". enabled-checks: - - appendAssign - appendCombine - - assignOp - - badCond - boolExprSimplify - - captLocal - - caseOrder - - defaultCaseOrder - - dupArg - - dupBranchBody - - dupCase - - dupSubExpr - - elseif - emptyFallthrough - emptyStringTest - equalFold - - exitAfterDefer - - flagName - hexLiteral - indexAlloc - nilValReturn - - offBy1 - - regexpMust - - sloppyLen - - switchTrue - - wrapperFunc - yodaStyleExpr linters: diff --git a/Dockerfile b/Dockerfile index 4cf8829..0a61a11 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,4 +1,4 @@ -FROM golang:1.20-alpine AS builder +FROM golang:1.22-alpine AS builder COPY . /build RUN cd /build && go install -mod=mod ./cmd/brokersrv diff --git a/README.md b/README.md index 9dcf4fa..c116500 100644 --- a/README.md +++ b/README.md @@ -34,24 +34,18 @@ RpcServices = [ "testsrv" ] ... import ( - "github.com/nats-io/nats.go" "github.com/vmkteam/brokersrv/pkg/rpcqueue" ) ... -nc, err := nats.Connect("nats://localhost:4222", nats.Name("testsrv"), nats.MaxReconnects(100), nats.ReconnectWait(3*time.Second)) +nc, err := rpcqueue.NewClient("nats://localhost:4222", "testsrv") ... -js, err := nc.JetStream() - - -... - -rpcQueue := rpcqueue.New("testsrv", js, zenrpcSrv, someLoggerPrintF) - +rpcQueue := rpcqueue.New("testsrv", nc.JetStreamConn, zenrpcSrv, someLoggerPrintF) go rpcQueue.Run() + ``` ### Send test RPC request diff --git a/go.mod b/go.mod index 13e9d98..8b8ae8c 100644 --- a/go.mod +++ b/go.mod @@ -1,15 +1,15 @@ module github.com/vmkteam/brokersrv -go 1.20 +go 1.22 require ( - github.com/BurntSushi/toml v1.3.2 - github.com/labstack/echo/v4 v4.11.4 + github.com/BurntSushi/toml v1.4.0 + github.com/labstack/echo/v4 v4.12.0 github.com/namsral/flag v1.7.4-pre - github.com/nats-io/nats.go v1.32.0 - github.com/prometheus/client_golang v1.18.0 - github.com/vmkteam/zenrpc-middleware v1.1.5 - github.com/vmkteam/zenrpc/v2 v2.2.11 + github.com/nats-io/nats.go v1.36.0 + github.com/prometheus/client_golang v1.19.1 + github.com/vmkteam/zenrpc-middleware v1.1.6 + github.com/vmkteam/zenrpc/v2 v2.2.12 ) require ( @@ -24,11 +24,10 @@ require ( github.com/labstack/gommon v0.4.2 // indirect github.com/mattn/go-colorable v0.1.13 // indirect github.com/mattn/go-isatty v0.0.20 // indirect - github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0 // indirect github.com/nats-io/nkeys v0.4.7 // indirect github.com/nats-io/nuid v1.0.1 // indirect github.com/prometheus/client_model v0.5.0 // indirect - github.com/prometheus/common v0.45.0 // indirect + github.com/prometheus/common v0.48.0 // indirect github.com/prometheus/procfs v0.12.0 // indirect github.com/tmthrgd/go-hex v0.0.0-20190904060850-447a3041c3bc // indirect github.com/valyala/bytebufferpool v1.0.0 // indirect @@ -37,10 +36,10 @@ require ( github.com/vmihailenco/msgpack/v5 v5.3.5 // indirect github.com/vmihailenco/tagparser v0.1.2 // indirect github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect - golang.org/x/crypto v0.18.0 // indirect - golang.org/x/net v0.19.0 // indirect - golang.org/x/sys v0.16.0 // indirect - golang.org/x/text v0.14.0 // indirect - google.golang.org/protobuf v1.31.0 // indirect + golang.org/x/crypto v0.25.0 // indirect + golang.org/x/net v0.24.0 // indirect + golang.org/x/sys v0.22.0 // indirect + golang.org/x/text v0.16.0 // indirect + google.golang.org/protobuf v1.33.0 // indirect mellium.im/sasl v0.3.0 // indirect ) diff --git a/go.sum b/go.sum index e6b9ae1..2db3d36 100644 --- a/go.sum +++ b/go.sum @@ -1,7 +1,7 @@ cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= -github.com/BurntSushi/toml v1.3.2 h1:o7IhLm0Msx3BaB+n3Ag7L8EVlByGnpq14C4YWiu/gL8= -github.com/BurntSushi/toml v1.3.2/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ= +github.com/BurntSushi/toml v1.4.0 h1:kuoIxZQy2WRRk1pttg9asf+WVv6tWQuBNVmK8+nqPr0= +github.com/BurntSushi/toml v1.4.0/go.mod h1:ukJfTF/6rtPPRCnwkur4qwRxa8vTRFBF0uk2lLoLwho= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= @@ -19,6 +19,7 @@ github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4 github.com/getsentry/sentry-go v0.13.0 h1:20dgTiUSfxRB/EhMPtxcL9ZEbM1ZdR+W/7f7NWD+xWo= github.com/getsentry/sentry-go v0.13.0/go.mod h1:EOsfu5ZdvKPfeHYV6pTVQnsjfp30+XA7//UooKNumH0= github.com/go-errors/errors v1.0.1 h1:LUHzmkK3GUKUrL/1gfBUxAHzcev3apQlezX/+O7ma6w= +github.com/go-errors/errors v1.0.1/go.mod h1:f4zRHt4oKfwPJE5k8C9vpYG+aDHdBFUsgrm6/TyX73Q= github.com/go-pg/pg/v10 v10.10.6 h1:1vNtPZ4Z9dWUw/TjJwOfFUbF5nEq1IkR6yG8Mq/Iwso= github.com/go-pg/pg/v10 v10.10.6/go.mod h1:GLmFXufrElQHf5uzM3BQlcfwV3nsgnHue5uzjQ6Nqxg= github.com/go-pg/zerochecker v0.2.0 h1:pp7f72c3DobMWOb2ErtZsnrPaSvHd2W4o9//8HtF4mU= @@ -35,14 +36,14 @@ github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvq github.com/golang/protobuf v1.4.1/go.mod h1:U8fpvMrcmy5pZrNK1lt4xCsGvpyWQ/VVv6QDs8UjoX8= github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= github.com/golang/protobuf v1.4.3/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= -github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= -github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= +github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= +github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc= github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= @@ -52,11 +53,12 @@ github.com/klauspost/compress v1.17.2 h1:RlWWUY/Dr4fL8qk9YG7DTZ7PDgME2V4csBXA8L/ github.com/klauspost/compress v1.17.2/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= +github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= -github.com/labstack/echo/v4 v4.11.4 h1:vDZmA+qNeh1pd/cCkEicDMrjtrnMGQ1QFI9gWN1zGq8= -github.com/labstack/echo/v4 v4.11.4/go.mod h1:noh7EvLwqDsmh/X/HWKPUl1AjzJrhyptRyEbQJfxen8= +github.com/labstack/echo/v4 v4.12.0 h1:IKpw49IMryVB2p1a4dzwlhP1O2Tf2E0Ir/450lH+kI0= +github.com/labstack/echo/v4 v4.12.0/go.mod h1:UP9Cr2DJXbOK3Kr9ONYzNowSh7HP0aG0ShAyycHSJvM= github.com/labstack/gommon v0.4.2 h1:F8qTUNXgG1+6WQmqoUWnz8WiEU60mXVVw0P4ht1WRA0= github.com/labstack/gommon v0.4.2/go.mod h1:QlUFxVM+SNXhDL/Z7YhocGIBYOiwB0mXm1+1bAPHPyU= github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA= @@ -64,12 +66,10 @@ github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovk github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= -github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0 h1:jWpvCLoY8Z/e3VKvlsiIGKtc+UG6U5vzxaoagmhXfyg= -github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0/go.mod h1:QUyp042oQthUoa9bqDv0ER0wrtXnBruoNd7aNjkbP+k= github.com/namsral/flag v1.7.4-pre h1:b2ScHhoCUkbsq0d2C15Mv+VU8bl8hAXV8arnWiOHNZs= github.com/namsral/flag v1.7.4-pre/go.mod h1:OXldTctbM6SWH1K899kPZcf65KxJiD7MsceFUpB5yDo= -github.com/nats-io/nats.go v1.32.0 h1:Bx9BZS+aXYlxW08k8Gd3yR2s73pV5XSoAQUyp1Kwvp0= -github.com/nats-io/nats.go v1.32.0/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8= +github.com/nats-io/nats.go v1.36.0 h1:suEUPuWzTSse/XhESwqLxXGuj8vGRuPRoG7MoRN/qyU= +github.com/nats-io/nats.go v1.36.0/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8= github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI= github.com/nats-io/nkeys v0.4.7/go.mod h1:kqXRgRDPlGy7nGaEDMuYzmiJCIAAWDK0IMBtDmGD0nc= github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= @@ -86,24 +86,28 @@ github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1y github.com/onsi/gomega v1.10.3 h1:gph6h/qe9GSUw1NhH1gp+qb+h8rXD8Cy60Z32Qw3ELA= github.com/onsi/gomega v1.10.3/go.mod h1:V9xEwhxec5O8UDM77eCW8vLymOMltsqPVYWrpDsH8xc= github.com/pingcap/errors v0.11.4 h1:lFuQV/oaUMGcD2tqt+01ROSmJs75VG1ToEOkZIZ4nE4= +github.com/pingcap/errors v0.11.4/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTwv5KuLY8= github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I= +github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/prometheus/client_golang v1.18.0 h1:HzFfmkOzH5Q8L8G+kSJKUx5dtG87sewO+FoDDqP5Tbk= -github.com/prometheus/client_golang v1.18.0/go.mod h1:T+GXkCk5wSJyOqMIzVgvvjFDlkOQntgjkJWKrN5txjA= +github.com/prometheus/client_golang v1.19.1 h1:wZWJDwK+NameRJuPGDhlnFgx8e8HN3XHQeLaYJFJBOE= +github.com/prometheus/client_golang v1.19.1/go.mod h1:mP78NwGzrVks5S2H6ab8+ZZGJLZUq1hoULYBAYBw1Ho= github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= github.com/prometheus/client_model v0.5.0 h1:VQw1hfvPvk3Uv6Qf29VrPF32JB6rtbgI6cYPYQjL0Qw= github.com/prometheus/client_model v0.5.0/go.mod h1:dTiFglRmd66nLR9Pv9f0mZi7B7fk5Pm3gvsjB5tr+kI= -github.com/prometheus/common v0.45.0 h1:2BGz0eBc2hdMDLnO/8n0jeB3oPrt2D08CekT0lneoxM= -github.com/prometheus/common v0.45.0/go.mod h1:YJmSTw9BoKxJplESWWxlbyttQR4uaEcGyv9MZjVOJsY= +github.com/prometheus/common v0.48.0 h1:QO8U2CdOzSn1BBsmXJXduaaW+dY/5QLjfB8svtSzKKE= +github.com/prometheus/common v0.48.0/go.mod h1:0/KsvlIEfPQCQ5I2iNSAWKPZziNCvRs5EC6ILDTlAPc= github.com/prometheus/procfs v0.12.0 h1:jluTpSng7V9hY0O2R9DzzJHYb2xULk9VTR1V1R/k6Bo= github.com/prometheus/procfs v0.12.0/go.mod h1:pcuDEFsWDnvcgNzo4EEweacyhjeA9Zk3cnaOZAZEfOo= github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ= +github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= +github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= github.com/tmthrgd/go-hex v0.0.0-20190904060850-447a3041c3bc h1:9lRDQMhESg+zvGYmW5DyG0UqvY96Bu5QYsTLvCHdrgo= github.com/tmthrgd/go-hex v0.0.0-20190904060850-447a3041c3bc/go.mod h1:bciPuU6GHm1iF1pBvUfxfsH0Wmnc2VbpgvbI9ZWuIRs= github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw= @@ -119,16 +123,16 @@ github.com/vmihailenco/tagparser v0.1.2 h1:gnjoVuB/kljJ5wICEEOpx98oXMWPLj22G67Vb github.com/vmihailenco/tagparser v0.1.2/go.mod h1:OeAg3pn3UbLjkWt+rN9oFYB6u/cQgqMEUPoW2WPyhdI= github.com/vmihailenco/tagparser/v2 v2.0.0 h1:y09buUbR+b5aycVFQs/g70pqKVZNBmxwAhO7/IwNM9g= github.com/vmihailenco/tagparser/v2 v2.0.0/go.mod h1:Wri+At7QHww0WTrCBeu4J6bNtoV6mEfg5OIWRZA9qds= -github.com/vmkteam/zenrpc-middleware v1.1.5 h1:4A6cuGtwdkyB+OvK/VJBuoThFdpIqp2tEdtJlRv2sMQ= -github.com/vmkteam/zenrpc-middleware v1.1.5/go.mod h1:gwy1/fGceUMArJ+lLHN1IQtw7CBMFoBfG38TyN0O4II= -github.com/vmkteam/zenrpc/v2 v2.2.11 h1:JfB7QYWhFPXhw9FZuVk5FBwtoVesYscmS3RrSqDK1Zc= -github.com/vmkteam/zenrpc/v2 v2.2.11/go.mod h1:T/ZQlJbKThBNJtyN0313xEPcxjEyB19uNldTBr0o2KE= +github.com/vmkteam/zenrpc-middleware v1.1.6 h1:6vqmuK/GOYiOw9dQ52KDJqtoJCI5mKzRoahINZfUX5w= +github.com/vmkteam/zenrpc-middleware v1.1.6/go.mod h1:gwy1/fGceUMArJ+lLHN1IQtw7CBMFoBfG38TyN0O4II= +github.com/vmkteam/zenrpc/v2 v2.2.12 h1:McYNxjJPqzyxaB+WGj9KlcEto7ldjvfx9NotzSGsv2o= +github.com/vmkteam/zenrpc/v2 v2.2.12/go.mod h1:T/ZQlJbKThBNJtyN0313xEPcxjEyB19uNldTBr0o2KE= golang.org/x/crypto v0.0.0-20180910181607-0e37d006457b/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= -golang.org/x/crypto v0.18.0 h1:PGVlW0xEltQnzFZ55hkuX5+KLyrMYhHld1YHO4AKcdc= -golang.org/x/crypto v0.18.0/go.mod h1:R0j02AL6hcrfOiy9T4ZYp/rcWeMxM3L6QYxlOuEG1mg= +golang.org/x/crypto v0.25.0 h1:ypSNr+bnYL2YhwoMt2zPxHFmbAN1KZs/njMG3hxUp30= +golang.org/x/crypto v0.25.0/go.mod h1:T+wALwcMOSE0kXgUAnPAHqTLW+XHgcELELW8VaDgm/M= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU= @@ -142,8 +146,8 @@ golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= golang.org/x/net v0.0.0-20201006153459-a7d1128ccaa0/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= -golang.org/x/net v0.19.0 h1:zTwKpTd2XuCqf8huc7Fo2iSy+4RHPd10s4KzeTnVr1c= -golang.org/x/net v0.19.0/go.mod h1:CfAk/cbD4CthTvqiEl8NpboMuiuOYsAr/7NOjZJtv1U= +golang.org/x/net v0.24.0 h1:1PcaxkF854Fu3+lvBIx5SYn9wRlBzzcnHZSiaFFAb0w= +golang.org/x/net v0.24.0/go.mod h1:2Q7sJY5mzlzWjKtYUEXSlBWCdyaioyXzRB2RtU8KVE8= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -163,14 +167,14 @@ golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20210923061019-b8560ed6a9b7/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.16.0 h1:xWw16ngr6ZMtmxDyKyIgsE93KNKz5HKmMa3b8ALHidU= -golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.22.0 h1:RI27ohtqKCnwULzJLqkv897zojh5/DwS/ENaMzUOaWI= +golang.org/x/sys v0.22.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= -golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= -golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= +golang.org/x/text v0.16.0 h1:a94ExnEXNtEwYLGJSIUxnWoxoRz/ZcCsV63ROupILh4= +golang.org/x/text v0.16.0/go.mod h1:GhwF1Be+LQoKShO3cGOHzqOgRrGaYc9AvblQOmPVHnI= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY= @@ -195,13 +199,13 @@ google.golang.org/protobuf v1.22.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2 google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c= -google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= -google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8= -google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= +google.golang.org/protobuf v1.33.0 h1:uNO2rsAINq/JlFpSdYEKIZ0uKD/R9cpdv0T+yoGwGmI= +google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= @@ -209,8 +213,10 @@ gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= +gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= mellium.im/sasl v0.2.1/go.mod h1:ROaEDLQNuf9vjKqE1SrAfnsobm2YKXT1gnN1uDp1PjQ= diff --git a/pkg/app/app.go b/pkg/app/app.go index 6b646ed..83a0925 100644 --- a/pkg/app/app.go +++ b/pkg/app/app.go @@ -5,6 +5,7 @@ import ( "log" "time" + "github.com/nats-io/nats.go/jetstream" "github.com/vmkteam/brokersrv/pkg/rpcqueue" "github.com/labstack/echo/v4" @@ -29,7 +30,7 @@ type App struct { cfg Config echo *echo.Echo nc *nats.Conn - js nats.JetStreamContext + js jetstream.JetStream qm *QueueManager } @@ -62,23 +63,21 @@ func (a *App) Run() error { // registerJetStream configure and register stream for NATS JetStream func (a *App) registerJetStream() error { - js, err := a.nc.JetStream() + ctx := context.Background() + js, err := jetstream.New(a.nc) if err != nil { return err } a.js = js - jsCfg := &nats.StreamConfig{ + jsCfg := jetstream.StreamConfig{ Name: rpcqueue.StreamName, - Retention: nats.WorkQueuePolicy, - Storage: nats.FileStorage, + Retention: jetstream.WorkQueuePolicy, + Storage: jetstream.FileStorage, Subjects: []string{rpcqueue.StreamName + ".*"}, } - _, err = js.AddStream(jsCfg) - if err == nats.ErrStreamNameAlreadyInUse { - _, err = js.UpdateStream(jsCfg) - } + _, err = js.CreateOrUpdateStream(ctx, jsCfg) return err } diff --git a/pkg/app/handlers.go b/pkg/app/handlers.go index 8d63ca4..bbfcbbd 100644 --- a/pkg/app/handlers.go +++ b/pkg/app/handlers.go @@ -52,7 +52,7 @@ func (a *App) processRpcServices(c echo.Context) error { return c.JSON(http.StatusInternalServerError, zenrpc.NewResponseError(nil, zenrpc.InvalidParams, "request ID not empty", nil)) } - err = a.qm.Publish(service, req, c.Request().Header) + err = a.qm.Publish(c.Request().Context(), service, req, c.Request().Header) if err != nil { return c.JSON(http.StatusInternalServerError, zenrpc.NewResponseError(nil, zenrpc.InternalError, err.Error(), nil)) } diff --git a/pkg/app/queue.go b/pkg/app/queue.go index c6e4239..37d8e35 100644 --- a/pkg/app/queue.go +++ b/pkg/app/queue.go @@ -1,13 +1,14 @@ package app import ( + "context" "encoding/json" "net/http" "time" + "github.com/nats-io/nats.go/jetstream" "github.com/vmkteam/brokersrv/pkg/rpcqueue" - "github.com/nats-io/nats.go" "github.com/vmkteam/zenrpc/v2" ) @@ -17,18 +18,18 @@ type Message struct { } type QueueManager struct { - js nats.JetStreamContext + js jetstream.JetStream } // NewQueueManager returns new QueueManager. -func NewQueueManager(js nats.JetStreamContext) *QueueManager { +func NewQueueManager(js jetstream.JetStream) *QueueManager { return &QueueManager{ js: js, } } // Publish prepare and publish message to NATs. -func (m *QueueManager) Publish(service string, zenrpcRequest zenrpc.Request, headers http.Header) error { +func (m *QueueManager) Publish(ctx context.Context, service string, zenrpcRequest zenrpc.Request, headers http.Header) error { message := Message{ Request: zenrpcRequest, Header: headers, @@ -39,6 +40,6 @@ func (m *QueueManager) Publish(service string, zenrpcRequest zenrpc.Request, hea return err } - _, err = m.js.Publish(rpcqueue.StreamName+"."+service, bb, nats.AckWait(5*time.Minute)) + _, err = m.js.Publish(ctx, rpcqueue.StreamName+"."+service, bb, jetstream.WithRetryWait(5*time.Minute)) return err } diff --git a/pkg/app/queue_test.go b/pkg/app/queue_test.go index 3487bed..9e3bbb1 100644 --- a/pkg/app/queue_test.go +++ b/pkg/app/queue_test.go @@ -22,7 +22,8 @@ var ( ) func TestQueueManager(t *testing.T) { - err := testApp.qm.Publish(testRpcSrvSubject, testZenrpcRequest, http.Header{}) + ctx := context.Background() + err := testApp.qm.Publish(ctx, testRpcSrvSubject, testZenrpcRequest, http.Header{}) if err != nil { t.Fatal(err) } @@ -33,7 +34,7 @@ func TestQueueManager(t *testing.T) { testQueue := rpcqueue.New(testRpcSrvSubject, testRpcQueueClient.JetStreamConn, testRpc, t.Logf) - err = testQueue.Run() + err = testQueue.Run(ctx) if err != nil { t.Fatal(err) } diff --git a/pkg/rpcqueue/client.go b/pkg/rpcqueue/client.go index b601e24..6af6486 100644 --- a/pkg/rpcqueue/client.go +++ b/pkg/rpcqueue/client.go @@ -4,6 +4,7 @@ import ( "time" "github.com/nats-io/nats.go" + "github.com/nats-io/nats.go/jetstream" ) const ( @@ -16,7 +17,7 @@ type Config struct { } type Client struct { - JetStreamConn nats.JetStreamContext + JetStreamConn jetstream.JetStream NatsConn *nats.Conn } @@ -29,7 +30,7 @@ func NewClient(cfg Config, appName string) (*Client, error) { return nil, err } - js, err := nc.JetStream() + js, err := jetstream.New(nc) if err != nil { return nil, err } diff --git a/pkg/rpcqueue/rpcqueue.go b/pkg/rpcqueue/rpcqueue.go index 23e10b3..eeac47e 100644 --- a/pkg/rpcqueue/rpcqueue.go +++ b/pkg/rpcqueue/rpcqueue.go @@ -8,7 +8,7 @@ import ( "time" "github.com/labstack/echo/v4" - "github.com/nats-io/nats.go" + "github.com/nats-io/nats.go/jetstream" "github.com/prometheus/client_golang/prometheus" zm "github.com/vmkteam/zenrpc-middleware" "github.com/vmkteam/zenrpc/v2" @@ -28,7 +28,7 @@ type Message struct { type RPCQueue struct { subject string - js nats.JetStreamContext + js jetstream.JetStream srv zenrpc.Server pf Printf } @@ -38,7 +38,7 @@ type Printf func(format string, v ...interface{}) var statEvents *prometheus.CounterVec // New initialize new brokersrv rpc queue. -func New(subject string, js nats.JetStreamContext, srv zenrpc.Server, pf Printf) RPCQueue { +func New(subject string, js jetstream.JetStream, srv zenrpc.Server, pf Printf) RPCQueue { statEvents = prometheus.NewCounterVec(prometheus.CounterOpts{ Namespace: subject, Subsystem: "rpcqueue", @@ -57,16 +57,20 @@ func New(subject string, js nats.JetStreamContext, srv zenrpc.Server, pf Printf) } // Run subscribe to NATs Streaming subject and process events -func (q *RPCQueue) Run() error { - _, err := q.js.QueueSubscribe( - fmt.Sprintf("%s.%s", StreamName, q.subject), - fmt.Sprintf("group-%s", q.subject), q.handleMessage, - nats.ManualAck(), - nats.Durable(fmt.Sprintf("dur-%s", q.subject)), - nats.BindStream(StreamName), - nats.MaxAckPending(maxAckPending), - nats.AckWait(maxAckWait), - ) +func (q *RPCQueue) Run(ctx context.Context) error { + c, err := q.js.CreateOrUpdateConsumer(ctx, StreamName, jetstream.ConsumerConfig{ + Name: fmt.Sprintf("dur-%s", q.subject), + FilterSubject: fmt.Sprintf("%s.%s", StreamName, q.subject), + Durable: fmt.Sprintf("dur-%s", q.subject), + AckPolicy: jetstream.AckExplicitPolicy, + MaxAckPending: maxAckPending, + AckWait: maxAckWait, + }) + if err != nil { + return err + } + + _, err = c.Consume(q.handleMessage) if err != nil { return err } @@ -75,13 +79,13 @@ func (q *RPCQueue) Run() error { } // handleMessage send message to rpc server and acknowledge event. -func (q *RPCQueue) handleMessage(message *nats.Msg) { +func (q *RPCQueue) handleMessage(message jetstream.Msg) { var ( m Message zenrpcReq zenrpc.Request ) - err := json.Unmarshal(message.Data, &m) + err := json.Unmarshal(message.Data(), &m) if err != nil { statEvents.WithLabelValues("error").Inc() q.pf("failed to unmarshal message err=%q", err) @@ -104,7 +108,7 @@ func (q *RPCQueue) handleMessage(message *nats.Msg) { if err = message.Ack(); err != nil { statEvents.WithLabelValues("error").Inc() - q.pf("failed to ack message=%q err=%q", string(message.Data), err) + q.pf("failed to ack message=%q err=%q", string(message.Data()), err) return }