From 006b4e2ac3de57d1dcc52b22e9c807221c74b92e Mon Sep 17 00:00:00 2001 From: "Kim, JinSan" Date: Mon, 21 Dec 2020 13:56:26 +0900 Subject: [PATCH 1/2] chore: remove grpc/socket abci and make it buildable --- abci/client/client.go | 17 -- abci/client/grpc_client.go | 306 ------------------- abci/client/socket_client.go | 406 -------------------------- abci/client/socket_client_test.go | 126 -------- abci/server/grpc_server.go | 57 ---- abci/server/server.go | 30 -- abci/server/socket_server.go | 252 ---------------- cmd/tendermint/commands/run_node.go | 3 +- config/config.go | 9 +- config/toml.go | 6 +- consensus/replay_file.go | 2 +- docs/tendermint-core/configuration.md | 5 +- node/node.go | 2 +- proxy/client.go | 33 +-- 14 files changed, 11 insertions(+), 1243 deletions(-) delete mode 100644 abci/client/grpc_client.go delete mode 100644 abci/client/socket_client.go delete mode 100644 abci/client/socket_client_test.go delete mode 100644 abci/server/grpc_server.go delete mode 100644 abci/server/server.go delete mode 100644 abci/server/socket_server.go diff --git a/abci/client/client.go b/abci/client/client.go index 4f7c7b69a..182d5720f 100644 --- a/abci/client/client.go +++ b/abci/client/client.go @@ -1,7 +1,6 @@ package abcicli import ( - "fmt" "sync" "github.com/tendermint/tendermint/abci/types" @@ -51,22 +50,6 @@ type Client interface { //---------------------------------------- -// NewClient returns a new ABCI client of the specified transport type. -// It returns an error if the transport is not "socket" or "grpc" -func NewClient(addr, transport string, mustConnect bool) (client Client, err error) { - switch transport { - case "socket": - client = NewSocketClient(addr, mustConnect) - case "grpc": - client = NewGRPCClient(addr, mustConnect) - default: - err = fmt.Errorf("unknown abci transport %s", transport) - } - return -} - -//---------------------------------------- - type Callback func(*types.Request, *types.Response) //---------------------------------------- diff --git a/abci/client/grpc_client.go b/abci/client/grpc_client.go deleted file mode 100644 index 01583bc1f..000000000 --- a/abci/client/grpc_client.go +++ /dev/null @@ -1,306 +0,0 @@ -package abcicli - -import ( - "fmt" - "net" - "sync" - "time" - - "golang.org/x/net/context" - "google.golang.org/grpc" - - "github.com/tendermint/tendermint/abci/types" - tmnet "github.com/tendermint/tendermint/libs/net" - "github.com/tendermint/tendermint/libs/service" -) - -var _ Client = (*grpcClient)(nil) - -// A stripped copy of the remoteClient that makes -// synchronous calls using grpc -type grpcClient struct { - service.BaseService - mustConnect bool - - client types.ABCIApplicationClient - conn *grpc.ClientConn - - mtx sync.Mutex - addr string - err error - resCb func(*types.Request, *types.Response) // listens to all callbacks -} - -func NewGRPCClient(addr string, mustConnect bool) Client { - cli := &grpcClient{ - addr: addr, - mustConnect: mustConnect, - } - cli.BaseService = *service.NewBaseService(nil, "grpcClient", cli) - return cli -} - -func dialerFunc(ctx context.Context, addr string) (net.Conn, error) { - return tmnet.Connect(addr) -} - -func (cli *grpcClient) OnStart() error { - if err := cli.BaseService.OnStart(); err != nil { - return err - } -RETRY_LOOP: - for { - conn, err := grpc.Dial(cli.addr, grpc.WithInsecure(), grpc.WithContextDialer(dialerFunc)) - if err != nil { - if cli.mustConnect { - return err - } - cli.Logger.Error(fmt.Sprintf("abci.grpcClient failed to connect to %v. Retrying...\n", cli.addr), "err", err) - time.Sleep(time.Second * dialRetryIntervalSeconds) - continue RETRY_LOOP - } - - cli.Logger.Info("Dialed server. Waiting for echo.", "addr", cli.addr) - client := types.NewABCIApplicationClient(conn) - cli.conn = conn - - ENSURE_CONNECTED: - for { - _, err := client.Echo(context.Background(), &types.RequestEcho{Message: "hello"}, grpc.WaitForReady(true)) - if err == nil { - break ENSURE_CONNECTED - } - cli.Logger.Error("Echo failed", "err", err) - time.Sleep(time.Second * echoRetryIntervalSeconds) - } - - cli.client = client - return nil - } -} - -func (cli *grpcClient) OnStop() { - cli.BaseService.OnStop() - - if cli.conn != nil { - cli.conn.Close() - } -} - -func (cli *grpcClient) StopForError(err error) { - cli.mtx.Lock() - if !cli.IsRunning() { - return - } - - if cli.err == nil { - cli.err = err - } - cli.mtx.Unlock() - - cli.Logger.Error(fmt.Sprintf("Stopping abci.grpcClient for error: %v", err.Error())) - cli.Stop() -} - -func (cli *grpcClient) Error() error { - cli.mtx.Lock() - defer cli.mtx.Unlock() - return cli.err -} - -// Set listener for all responses -// NOTE: callback may get internally generated flush responses. -func (cli *grpcClient) SetResponseCallback(resCb Callback) { - cli.mtx.Lock() - cli.resCb = resCb - cli.mtx.Unlock() -} - -//---------------------------------------- -// GRPC calls are synchronous, but some callbacks expect to be called asynchronously -// (eg. the mempool expects to be able to lock to remove bad txs from cache). -// To accommodate, we finish each call in its own go-routine, -// which is expensive, but easy - if you want something better, use the socket protocol! -// maybe one day, if people really want it, we use grpc streams, -// but hopefully not :D - -func (cli *grpcClient) EchoAsync(msg string) *ReqRes { - req := types.ToRequestEcho(msg) - res, err := cli.client.Echo(context.Background(), req.GetEcho(), grpc.WaitForReady(true)) - if err != nil { - cli.StopForError(err) - } - return cli.finishAsyncCall(req, &types.Response{Value: &types.Response_Echo{Echo: res}}) -} - -func (cli *grpcClient) FlushAsync() *ReqRes { - req := types.ToRequestFlush() - res, err := cli.client.Flush(context.Background(), req.GetFlush(), grpc.WaitForReady(true)) - if err != nil { - cli.StopForError(err) - } - return cli.finishAsyncCall(req, &types.Response{Value: &types.Response_Flush{Flush: res}}) -} - -func (cli *grpcClient) InfoAsync(params types.RequestInfo) *ReqRes { - req := types.ToRequestInfo(params) - res, err := cli.client.Info(context.Background(), req.GetInfo(), grpc.WaitForReady(true)) - if err != nil { - cli.StopForError(err) - } - return cli.finishAsyncCall(req, &types.Response{Value: &types.Response_Info{Info: res}}) -} - -func (cli *grpcClient) SetOptionAsync(params types.RequestSetOption) *ReqRes { - req := types.ToRequestSetOption(params) - res, err := cli.client.SetOption(context.Background(), req.GetSetOption(), grpc.WaitForReady(true)) - if err != nil { - cli.StopForError(err) - } - return cli.finishAsyncCall(req, &types.Response{Value: &types.Response_SetOption{SetOption: res}}) -} - -func (cli *grpcClient) DeliverTxAsync(params types.RequestDeliverTx) *ReqRes { - req := types.ToRequestDeliverTx(params) - res, err := cli.client.DeliverTx(context.Background(), req.GetDeliverTx(), grpc.WaitForReady(true)) - if err != nil { - cli.StopForError(err) - } - return cli.finishAsyncCall(req, &types.Response{Value: &types.Response_DeliverTx{DeliverTx: res}}) -} - -func (cli *grpcClient) CheckTxAsync(params types.RequestCheckTx) *ReqRes { - req := types.ToRequestCheckTx(params) - res, err := cli.client.CheckTx(context.Background(), req.GetCheckTx(), grpc.WaitForReady(true)) - if err != nil { - cli.StopForError(err) - } - return cli.finishAsyncCall(req, &types.Response{Value: &types.Response_CheckTx{CheckTx: res}}) -} - -func (cli *grpcClient) QueryAsync(params types.RequestQuery) *ReqRes { - req := types.ToRequestQuery(params) - res, err := cli.client.Query(context.Background(), req.GetQuery(), grpc.WaitForReady(true)) - if err != nil { - cli.StopForError(err) - } - return cli.finishAsyncCall(req, &types.Response{Value: &types.Response_Query{Query: res}}) -} - -func (cli *grpcClient) CommitAsync() *ReqRes { - req := types.ToRequestCommit() - res, err := cli.client.Commit(context.Background(), req.GetCommit(), grpc.WaitForReady(true)) - if err != nil { - cli.StopForError(err) - } - return cli.finishAsyncCall(req, &types.Response{Value: &types.Response_Commit{Commit: res}}) -} - -func (cli *grpcClient) InitChainAsync(params types.RequestInitChain) *ReqRes { - req := types.ToRequestInitChain(params) - res, err := cli.client.InitChain(context.Background(), req.GetInitChain(), grpc.WaitForReady(true)) - if err != nil { - cli.StopForError(err) - } - return cli.finishAsyncCall(req, &types.Response{Value: &types.Response_InitChain{InitChain: res}}) -} - -func (cli *grpcClient) BeginBlockAsync(params types.RequestBeginBlock) *ReqRes { - req := types.ToRequestBeginBlock(params) - res, err := cli.client.BeginBlock(context.Background(), req.GetBeginBlock(), grpc.WaitForReady(true)) - if err != nil { - cli.StopForError(err) - } - return cli.finishAsyncCall(req, &types.Response{Value: &types.Response_BeginBlock{BeginBlock: res}}) -} - -func (cli *grpcClient) EndBlockAsync(params types.RequestEndBlock) *ReqRes { - req := types.ToRequestEndBlock(params) - res, err := cli.client.EndBlock(context.Background(), req.GetEndBlock(), grpc.WaitForReady(true)) - if err != nil { - cli.StopForError(err) - } - return cli.finishAsyncCall(req, &types.Response{Value: &types.Response_EndBlock{EndBlock: res}}) -} - -func (cli *grpcClient) finishAsyncCall(req *types.Request, res *types.Response) *ReqRes { - reqres := NewReqRes(req) - reqres.Response = res // Set response - reqres.Done() // Release waiters - reqres.SetDone() // so reqRes.SetCallback will run the callback - - // goroutine for callbacks - go func() { - cli.mtx.Lock() - defer cli.mtx.Unlock() - - // Notify client listener if set - if cli.resCb != nil { - cli.resCb(reqres.Request, res) - } - - // Notify reqRes listener if set - if cb := reqres.GetCallback(); cb != nil { - cb(res) - } - }() - - return reqres -} - -//---------------------------------------- - -func (cli *grpcClient) FlushSync() error { - return nil -} - -func (cli *grpcClient) EchoSync(msg string) (*types.ResponseEcho, error) { - reqres := cli.EchoAsync(msg) - // StopForError should already have been called if error is set - return reqres.Response.GetEcho(), cli.Error() -} - -func (cli *grpcClient) InfoSync(req types.RequestInfo) (*types.ResponseInfo, error) { - reqres := cli.InfoAsync(req) - return reqres.Response.GetInfo(), cli.Error() -} - -func (cli *grpcClient) SetOptionSync(req types.RequestSetOption) (*types.ResponseSetOption, error) { - reqres := cli.SetOptionAsync(req) - return reqres.Response.GetSetOption(), cli.Error() -} - -func (cli *grpcClient) DeliverTxSync(params types.RequestDeliverTx) (*types.ResponseDeliverTx, error) { - reqres := cli.DeliverTxAsync(params) - return reqres.Response.GetDeliverTx(), cli.Error() -} - -func (cli *grpcClient) CheckTxSync(params types.RequestCheckTx) (*types.ResponseCheckTx, error) { - reqres := cli.CheckTxAsync(params) - return reqres.Response.GetCheckTx(), cli.Error() -} - -func (cli *grpcClient) QuerySync(req types.RequestQuery) (*types.ResponseQuery, error) { - reqres := cli.QueryAsync(req) - return reqres.Response.GetQuery(), cli.Error() -} - -func (cli *grpcClient) CommitSync() (*types.ResponseCommit, error) { - reqres := cli.CommitAsync() - return reqres.Response.GetCommit(), cli.Error() -} - -func (cli *grpcClient) InitChainSync(params types.RequestInitChain) (*types.ResponseInitChain, error) { - reqres := cli.InitChainAsync(params) - return reqres.Response.GetInitChain(), cli.Error() -} - -func (cli *grpcClient) BeginBlockSync(params types.RequestBeginBlock) (*types.ResponseBeginBlock, error) { - reqres := cli.BeginBlockAsync(params) - return reqres.Response.GetBeginBlock(), cli.Error() -} - -func (cli *grpcClient) EndBlockSync(params types.RequestEndBlock) (*types.ResponseEndBlock, error) { - reqres := cli.EndBlockAsync(params) - return reqres.Response.GetEndBlock(), cli.Error() -} diff --git a/abci/client/socket_client.go b/abci/client/socket_client.go deleted file mode 100644 index 7898a8f26..000000000 --- a/abci/client/socket_client.go +++ /dev/null @@ -1,406 +0,0 @@ -package abcicli - -import ( - "bufio" - "container/list" - "errors" - "fmt" - "io" - "net" - "reflect" - "sync" - "time" - - "github.com/tendermint/tendermint/abci/types" - tmnet "github.com/tendermint/tendermint/libs/net" - "github.com/tendermint/tendermint/libs/service" - "github.com/tendermint/tendermint/libs/timer" -) - -const reqQueueSize = 256 // TODO make configurable -// const maxResponseSize = 1048576 // 1MB TODO make configurable -const flushThrottleMS = 20 // Don't wait longer than... - -var _ Client = (*socketClient)(nil) - -// This is goroutine-safe, but users should beware that -// the application in general is not meant to be interfaced -// with concurrent callers. -type socketClient struct { - service.BaseService - - addr string - mustConnect bool - conn net.Conn - - reqQueue chan *ReqRes - flushTimer *timer.ThrottleTimer - - mtx sync.Mutex - err error - reqSent *list.List // list of requests sent, waiting for response - resCb func(*types.Request, *types.Response) // called on all requests, if set. - -} - -func NewSocketClient(addr string, mustConnect bool) Client { - cli := &socketClient{ - reqQueue: make(chan *ReqRes, reqQueueSize), - flushTimer: timer.NewThrottleTimer("socketClient", flushThrottleMS), - mustConnect: mustConnect, - - addr: addr, - reqSent: list.New(), - resCb: nil, - } - cli.BaseService = *service.NewBaseService(nil, "socketClient", cli) - return cli -} - -func (cli *socketClient) OnStart() error { - var err error - var conn net.Conn -RETRY_LOOP: - for { - conn, err = tmnet.Connect(cli.addr) - if err != nil { - if cli.mustConnect { - return err - } - cli.Logger.Error(fmt.Sprintf("abci.socketClient failed to connect to %v. Retrying...", cli.addr), "err", err) - time.Sleep(time.Second * dialRetryIntervalSeconds) - continue RETRY_LOOP - } - cli.conn = conn - - go cli.sendRequestsRoutine(conn) - go cli.recvResponseRoutine(conn) - - return nil - } -} - -func (cli *socketClient) OnStop() { - if cli.conn != nil { - cli.conn.Close() - } - - cli.mtx.Lock() - defer cli.mtx.Unlock() - cli.flushQueue() -} - -// Stop the client and set the error -func (cli *socketClient) StopForError(err error) { - if !cli.IsRunning() { - return - } - - cli.mtx.Lock() - if cli.err == nil { - cli.err = err - } - cli.mtx.Unlock() - - cli.Logger.Error(fmt.Sprintf("Stopping abci.socketClient for error: %v", err.Error())) - cli.Stop() -} - -func (cli *socketClient) Error() error { - cli.mtx.Lock() - defer cli.mtx.Unlock() - return cli.err -} - -// Set listener for all responses -// NOTE: callback may get internally generated flush responses. -func (cli *socketClient) SetResponseCallback(resCb Callback) { - cli.mtx.Lock() - cli.resCb = resCb - cli.mtx.Unlock() -} - -//---------------------------------------- - -func (cli *socketClient) sendRequestsRoutine(conn io.Writer) { - - w := bufio.NewWriter(conn) - for { - select { - case <-cli.flushTimer.Ch: - select { - case cli.reqQueue <- NewReqRes(types.ToRequestFlush()): - default: - // Probably will fill the buffer, or retry later. - } - case <-cli.Quit(): - return - case reqres := <-cli.reqQueue: - cli.willSendReq(reqres) - err := types.WriteMessage(reqres.Request, w) - if err != nil { - cli.StopForError(fmt.Errorf("error writing msg: %v", err)) - return - } - // cli.Logger.Debug("Sent request", "requestType", reflect.TypeOf(reqres.Request), "request", reqres.Request) - if _, ok := reqres.Request.Value.(*types.Request_Flush); ok { - err = w.Flush() - if err != nil { - cli.StopForError(fmt.Errorf("error flushing writer: %v", err)) - return - } - } - } - } -} - -func (cli *socketClient) recvResponseRoutine(conn io.Reader) { - - r := bufio.NewReader(conn) // Buffer reads - for { - var res = &types.Response{} - err := types.ReadMessage(r, res) - if err != nil { - cli.StopForError(err) - return - } - switch r := res.Value.(type) { - case *types.Response_Exception: - // XXX After setting cli.err, release waiters (e.g. reqres.Done()) - cli.StopForError(errors.New(r.Exception.Error)) - return - default: - // cli.Logger.Debug("Received response", "responseType", reflect.TypeOf(res), "response", res) - err := cli.didRecvResponse(res) - if err != nil { - cli.StopForError(err) - return - } - } - } -} - -func (cli *socketClient) willSendReq(reqres *ReqRes) { - cli.mtx.Lock() - defer cli.mtx.Unlock() - cli.reqSent.PushBack(reqres) -} - -func (cli *socketClient) didRecvResponse(res *types.Response) error { - cli.mtx.Lock() - defer cli.mtx.Unlock() - - // Get the first ReqRes - next := cli.reqSent.Front() - if next == nil { - return fmt.Errorf("unexpected result type %v when nothing expected", reflect.TypeOf(res.Value)) - } - reqres := next.Value.(*ReqRes) - if !resMatchesReq(reqres.Request, res) { - return fmt.Errorf("unexpected result type %v when response to %v expected", - reflect.TypeOf(res.Value), reflect.TypeOf(reqres.Request.Value)) - } - - reqres.Response = res // Set response - reqres.Done() // Release waiters - cli.reqSent.Remove(next) // Pop first item from linked list - - // Notify client listener if set (global callback). - if cli.resCb != nil { - cli.resCb(reqres.Request, res) - } - - // Notify reqRes listener if set (request specific callback). - // NOTE: it is possible this callback isn't set on the reqres object. - // at this point, in which case it will be called after, when it is set. - if cb := reqres.GetCallback(); cb != nil { - cb(res) - } - - return nil -} - -//---------------------------------------- - -func (cli *socketClient) EchoAsync(msg string) *ReqRes { - return cli.queueRequest(types.ToRequestEcho(msg)) -} - -func (cli *socketClient) FlushAsync() *ReqRes { - return cli.queueRequest(types.ToRequestFlush()) -} - -func (cli *socketClient) InfoAsync(req types.RequestInfo) *ReqRes { - return cli.queueRequest(types.ToRequestInfo(req)) -} - -func (cli *socketClient) SetOptionAsync(req types.RequestSetOption) *ReqRes { - return cli.queueRequest(types.ToRequestSetOption(req)) -} - -func (cli *socketClient) DeliverTxAsync(req types.RequestDeliverTx) *ReqRes { - return cli.queueRequest(types.ToRequestDeliverTx(req)) -} - -func (cli *socketClient) CheckTxAsync(req types.RequestCheckTx) *ReqRes { - return cli.queueRequest(types.ToRequestCheckTx(req)) -} - -func (cli *socketClient) QueryAsync(req types.RequestQuery) *ReqRes { - return cli.queueRequest(types.ToRequestQuery(req)) -} - -func (cli *socketClient) CommitAsync() *ReqRes { - return cli.queueRequest(types.ToRequestCommit()) -} - -func (cli *socketClient) InitChainAsync(req types.RequestInitChain) *ReqRes { - return cli.queueRequest(types.ToRequestInitChain(req)) -} - -func (cli *socketClient) BeginBlockAsync(req types.RequestBeginBlock) *ReqRes { - return cli.queueRequest(types.ToRequestBeginBlock(req)) -} - -func (cli *socketClient) EndBlockAsync(req types.RequestEndBlock) *ReqRes { - return cli.queueRequest(types.ToRequestEndBlock(req)) -} - -//---------------------------------------- - -func (cli *socketClient) FlushSync() error { - reqRes := cli.queueRequest(types.ToRequestFlush()) - if err := cli.Error(); err != nil { - return err - } - reqRes.Wait() // NOTE: if we don't flush the queue, its possible to get stuck here - return cli.Error() -} - -func (cli *socketClient) EchoSync(msg string) (*types.ResponseEcho, error) { - reqres := cli.queueRequest(types.ToRequestEcho(msg)) - cli.FlushSync() - return reqres.Response.GetEcho(), cli.Error() -} - -func (cli *socketClient) InfoSync(req types.RequestInfo) (*types.ResponseInfo, error) { - reqres := cli.queueRequest(types.ToRequestInfo(req)) - cli.FlushSync() - return reqres.Response.GetInfo(), cli.Error() -} - -func (cli *socketClient) SetOptionSync(req types.RequestSetOption) (*types.ResponseSetOption, error) { - reqres := cli.queueRequest(types.ToRequestSetOption(req)) - cli.FlushSync() - return reqres.Response.GetSetOption(), cli.Error() -} - -func (cli *socketClient) DeliverTxSync(req types.RequestDeliverTx) (*types.ResponseDeliverTx, error) { - reqres := cli.queueRequest(types.ToRequestDeliverTx(req)) - cli.FlushSync() - return reqres.Response.GetDeliverTx(), cli.Error() -} - -func (cli *socketClient) CheckTxSync(req types.RequestCheckTx) (*types.ResponseCheckTx, error) { - reqres := cli.queueRequest(types.ToRequestCheckTx(req)) - cli.FlushSync() - return reqres.Response.GetCheckTx(), cli.Error() -} - -func (cli *socketClient) QuerySync(req types.RequestQuery) (*types.ResponseQuery, error) { - reqres := cli.queueRequest(types.ToRequestQuery(req)) - cli.FlushSync() - return reqres.Response.GetQuery(), cli.Error() -} - -func (cli *socketClient) CommitSync() (*types.ResponseCommit, error) { - reqres := cli.queueRequest(types.ToRequestCommit()) - cli.FlushSync() - return reqres.Response.GetCommit(), cli.Error() -} - -func (cli *socketClient) InitChainSync(req types.RequestInitChain) (*types.ResponseInitChain, error) { - reqres := cli.queueRequest(types.ToRequestInitChain(req)) - cli.FlushSync() - return reqres.Response.GetInitChain(), cli.Error() -} - -func (cli *socketClient) BeginBlockSync(req types.RequestBeginBlock) (*types.ResponseBeginBlock, error) { - reqres := cli.queueRequest(types.ToRequestBeginBlock(req)) - cli.FlushSync() - return reqres.Response.GetBeginBlock(), cli.Error() -} - -func (cli *socketClient) EndBlockSync(req types.RequestEndBlock) (*types.ResponseEndBlock, error) { - reqres := cli.queueRequest(types.ToRequestEndBlock(req)) - cli.FlushSync() - return reqres.Response.GetEndBlock(), cli.Error() -} - -//---------------------------------------- - -func (cli *socketClient) queueRequest(req *types.Request) *ReqRes { - reqres := NewReqRes(req) - - // TODO: set cli.err if reqQueue times out - cli.reqQueue <- reqres - - // Maybe auto-flush, or unset auto-flush - switch req.Value.(type) { - case *types.Request_Flush: - cli.flushTimer.Unset() - default: - cli.flushTimer.Set() - } - - return reqres -} - -func (cli *socketClient) flushQueue() { - // mark all in-flight messages as resolved (they will get cli.Error()) - for req := cli.reqSent.Front(); req != nil; req = req.Next() { - reqres := req.Value.(*ReqRes) - reqres.Done() - } - - // mark all queued messages as resolved -LOOP: - for { - select { - case reqres := <-cli.reqQueue: - reqres.Done() - default: - break LOOP - } - } -} - -//---------------------------------------- - -func resMatchesReq(req *types.Request, res *types.Response) (ok bool) { - switch req.Value.(type) { - case *types.Request_Echo: - _, ok = res.Value.(*types.Response_Echo) - case *types.Request_Flush: - _, ok = res.Value.(*types.Response_Flush) - case *types.Request_Info: - _, ok = res.Value.(*types.Response_Info) - case *types.Request_SetOption: - _, ok = res.Value.(*types.Response_SetOption) - case *types.Request_DeliverTx: - _, ok = res.Value.(*types.Response_DeliverTx) - case *types.Request_CheckTx: - _, ok = res.Value.(*types.Response_CheckTx) - case *types.Request_Commit: - _, ok = res.Value.(*types.Response_Commit) - case *types.Request_Query: - _, ok = res.Value.(*types.Response_Query) - case *types.Request_InitChain: - _, ok = res.Value.(*types.Response_InitChain) - case *types.Request_BeginBlock: - _, ok = res.Value.(*types.Response_BeginBlock) - case *types.Request_EndBlock: - _, ok = res.Value.(*types.Response_EndBlock) - } - return ok -} diff --git a/abci/client/socket_client_test.go b/abci/client/socket_client_test.go deleted file mode 100644 index 37bc2b57a..000000000 --- a/abci/client/socket_client_test.go +++ /dev/null @@ -1,126 +0,0 @@ -package abcicli_test - -import ( - "errors" - "fmt" - "testing" - "time" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - - abcicli "github.com/tendermint/tendermint/abci/client" - "github.com/tendermint/tendermint/abci/server" - "github.com/tendermint/tendermint/abci/types" - tmrand "github.com/tendermint/tendermint/libs/rand" - "github.com/tendermint/tendermint/libs/service" -) - -type errorStopper interface { - StopForError(error) -} - -func TestSocketClientStopForErrorDeadlock(t *testing.T) { - c := abcicli.NewSocketClient(":80", false).(errorStopper) - err := errors.New("foo-tendermint") - - // See Issue https://github.com/tendermint/abci/issues/114 - doneChan := make(chan bool) - go func() { - defer close(doneChan) - c.StopForError(err) - c.StopForError(err) - }() - - select { - case <-doneChan: - case <-time.After(time.Second * 4): - t.Fatalf("Test took too long, potential deadlock still exists") - } -} - -func TestProperSyncCalls(t *testing.T) { - app := slowApp{} - - s, c := setupClientServer(t, app) - defer s.Stop() - defer c.Stop() - - resp := make(chan error, 1) - go func() { - // This is BeginBlockSync unrolled.... - reqres := c.BeginBlockAsync(types.RequestBeginBlock{}) - c.FlushSync() - res := reqres.Response.GetBeginBlock() - require.NotNil(t, res) - resp <- c.Error() - }() - - select { - case <-time.After(time.Second): - require.Fail(t, "No response arrived") - case err, ok := <-resp: - require.True(t, ok, "Must not close channel") - assert.NoError(t, err, "This should return success") - } -} - -func TestHangingSyncCalls(t *testing.T) { - app := slowApp{} - - s, c := setupClientServer(t, app) - defer s.Stop() - defer c.Stop() - - resp := make(chan error, 1) - go func() { - // Start BeginBlock and flush it - reqres := c.BeginBlockAsync(types.RequestBeginBlock{}) - flush := c.FlushAsync() - // wait 20 ms for all events to travel socket, but - // no response yet from server - time.Sleep(20 * time.Millisecond) - // kill the server, so the connections break - s.Stop() - - // wait for the response from BeginBlock - reqres.Wait() - flush.Wait() - resp <- c.Error() - }() - - select { - case <-time.After(time.Second): - require.Fail(t, "No response arrived") - case err, ok := <-resp: - require.True(t, ok, "Must not close channel") - assert.Error(t, err, "We should get EOF error") - } -} - -func setupClientServer(t *testing.T, app types.Application) ( - service.Service, abcicli.Client) { - // some port between 20k and 30k - port := 20000 + tmrand.Int32()%10000 - addr := fmt.Sprintf("localhost:%d", port) - - s, err := server.NewServer(addr, "socket", app) - require.NoError(t, err) - err = s.Start() - require.NoError(t, err) - - c := abcicli.NewSocketClient(addr, true) - err = c.Start() - require.NoError(t, err) - - return s, c -} - -type slowApp struct { - types.BaseApplication -} - -func (slowApp) BeginBlock(req types.RequestBeginBlock) types.ResponseBeginBlock { - time.Sleep(200 * time.Millisecond) - return types.ResponseBeginBlock{} -} diff --git a/abci/server/grpc_server.go b/abci/server/grpc_server.go deleted file mode 100644 index 0f74a34aa..000000000 --- a/abci/server/grpc_server.go +++ /dev/null @@ -1,57 +0,0 @@ -package server - -import ( - "net" - - "google.golang.org/grpc" - - "github.com/tendermint/tendermint/abci/types" - tmnet "github.com/tendermint/tendermint/libs/net" - "github.com/tendermint/tendermint/libs/service" -) - -type GRPCServer struct { - service.BaseService - - proto string - addr string - listener net.Listener - server *grpc.Server - - app types.ABCIApplicationServer -} - -// NewGRPCServer returns a new gRPC ABCI server -func NewGRPCServer(protoAddr string, app types.ABCIApplicationServer) service.Service { - proto, addr := tmnet.ProtocolAndAddress(protoAddr) - s := &GRPCServer{ - proto: proto, - addr: addr, - listener: nil, - app: app, - } - s.BaseService = *service.NewBaseService(nil, "ABCIServer", s) - return s -} - -// OnStart starts the gRPC service. -func (s *GRPCServer) OnStart() error { - ln, err := net.Listen(s.proto, s.addr) - if err != nil { - return err - } - - s.listener = ln - s.server = grpc.NewServer() - types.RegisterABCIApplicationServer(s.server, s.app) - - s.Logger.Info("Listening", "proto", s.proto, "addr", s.addr) - go s.server.Serve(s.listener) - - return nil -} - -// OnStop stops the gRPC server. -func (s *GRPCServer) OnStop() { - s.server.Stop() -} diff --git a/abci/server/server.go b/abci/server/server.go deleted file mode 100644 index 6dd13ad02..000000000 --- a/abci/server/server.go +++ /dev/null @@ -1,30 +0,0 @@ -/* -Package server is used to start a new ABCI server. - -It contains two server implementation: - * gRPC server - * socket server - -*/ -package server - -import ( - "fmt" - - "github.com/tendermint/tendermint/abci/types" - "github.com/tendermint/tendermint/libs/service" -) - -func NewServer(protoAddr, transport string, app types.Application) (service.Service, error) { - var s service.Service - var err error - switch transport { - case "socket": - s = NewSocketServer(protoAddr, app) - case "grpc": - s = NewGRPCServer(protoAddr, types.NewGRPCApplication(app)) - default: - err = fmt.Errorf("unknown server type %s", transport) - } - return s, err -} diff --git a/abci/server/socket_server.go b/abci/server/socket_server.go deleted file mode 100644 index e68d79599..000000000 --- a/abci/server/socket_server.go +++ /dev/null @@ -1,252 +0,0 @@ -package server - -import ( - "bufio" - "fmt" - "io" - "net" - "os" - "runtime" - "sync" - - "github.com/tendermint/tendermint/abci/types" - tmlog "github.com/tendermint/tendermint/libs/log" - tmnet "github.com/tendermint/tendermint/libs/net" - "github.com/tendermint/tendermint/libs/service" -) - -// var maxNumberConnections = 2 - -type SocketServer struct { - service.BaseService - isLoggerSet bool - - proto string - addr string - listener net.Listener - - connsMtx sync.Mutex - conns map[int]net.Conn - nextConnID int - - appMtx sync.Mutex - app types.Application -} - -func NewSocketServer(protoAddr string, app types.Application) service.Service { - proto, addr := tmnet.ProtocolAndAddress(protoAddr) - s := &SocketServer{ - proto: proto, - addr: addr, - listener: nil, - app: app, - conns: make(map[int]net.Conn), - } - s.BaseService = *service.NewBaseService(nil, "ABCIServer", s) - return s -} - -func (s *SocketServer) SetLogger(l tmlog.Logger) { - s.BaseService.SetLogger(l) - s.isLoggerSet = true -} - -func (s *SocketServer) OnStart() error { - ln, err := net.Listen(s.proto, s.addr) - if err != nil { - return err - } - - s.listener = ln - go s.acceptConnectionsRoutine() - - return nil -} - -func (s *SocketServer) OnStop() { - if err := s.listener.Close(); err != nil { - s.Logger.Error("Error closing listener", "err", err) - } - - s.connsMtx.Lock() - defer s.connsMtx.Unlock() - for id, conn := range s.conns { - delete(s.conns, id) - if err := conn.Close(); err != nil { - s.Logger.Error("Error closing connection", "id", id, "conn", conn, "err", err) - } - } -} - -func (s *SocketServer) addConn(conn net.Conn) int { - s.connsMtx.Lock() - defer s.connsMtx.Unlock() - - connID := s.nextConnID - s.nextConnID++ - s.conns[connID] = conn - - return connID -} - -// deletes conn even if close errs -func (s *SocketServer) rmConn(connID int) error { - s.connsMtx.Lock() - defer s.connsMtx.Unlock() - - conn, ok := s.conns[connID] - if !ok { - return fmt.Errorf("connection %d does not exist", connID) - } - - delete(s.conns, connID) - return conn.Close() -} - -func (s *SocketServer) acceptConnectionsRoutine() { - for { - // Accept a connection - s.Logger.Info("Waiting for new connection...") - conn, err := s.listener.Accept() - if err != nil { - if !s.IsRunning() { - return // Ignore error from listener closing. - } - s.Logger.Error("Failed to accept connection", "err", err) - continue - } - - s.Logger.Info("Accepted a new connection") - - connID := s.addConn(conn) - - closeConn := make(chan error, 2) // Push to signal connection closed - responses := make(chan *types.Response, 1000) // A channel to buffer responses - - // Read requests from conn and deal with them - go s.handleRequests(closeConn, conn, responses) - // Pull responses from 'responses' and write them to conn. - go s.handleResponses(closeConn, conn, responses) - - // Wait until signal to close connection - go s.waitForClose(closeConn, connID) - } -} - -func (s *SocketServer) waitForClose(closeConn chan error, connID int) { - err := <-closeConn - switch { - case err == io.EOF: - s.Logger.Error("Connection was closed by client") - case err != nil: - s.Logger.Error("Connection error", "err", err) - default: - // never happens - s.Logger.Error("Connection was closed") - } - - // Close the connection - if err := s.rmConn(connID); err != nil { - s.Logger.Error("Error closing connection", "err", err) - } -} - -// Read requests from conn and deal with them -func (s *SocketServer) handleRequests(closeConn chan error, conn io.Reader, responses chan<- *types.Response) { - var count int - var bufReader = bufio.NewReader(conn) - - defer func() { - // make sure to recover from any app-related panics to allow proper socket cleanup - r := recover() - if r != nil { - const size = 64 << 10 - buf := make([]byte, size) - buf = buf[:runtime.Stack(buf, false)] - err := fmt.Errorf("recovered from panic: %v\n%s", r, buf) - if !s.isLoggerSet { - fmt.Fprintln(os.Stderr, err) - } - closeConn <- err - s.appMtx.Unlock() - } - }() - - for { - - var req = &types.Request{} - err := types.ReadMessage(bufReader, req) - if err != nil { - if err == io.EOF { - closeConn <- err - } else { - closeConn <- fmt.Errorf("error reading message: %w", err) - } - return - } - s.appMtx.Lock() - count++ - s.handleRequest(req, responses) - s.appMtx.Unlock() - } -} - -func (s *SocketServer) handleRequest(req *types.Request, responses chan<- *types.Response) { - switch r := req.Value.(type) { - case *types.Request_Echo: - responses <- types.ToResponseEcho(r.Echo.Message) - case *types.Request_Flush: - responses <- types.ToResponseFlush() - case *types.Request_Info: - res := s.app.Info(*r.Info) - responses <- types.ToResponseInfo(res) - case *types.Request_SetOption: - res := s.app.SetOption(*r.SetOption) - responses <- types.ToResponseSetOption(res) - case *types.Request_DeliverTx: - res := s.app.DeliverTx(*r.DeliverTx) - responses <- types.ToResponseDeliverTx(res) - case *types.Request_CheckTx: - res := s.app.CheckTx(*r.CheckTx) - responses <- types.ToResponseCheckTx(res) - case *types.Request_Commit: - res := s.app.Commit() - responses <- types.ToResponseCommit(res) - case *types.Request_Query: - res := s.app.Query(*r.Query) - responses <- types.ToResponseQuery(res) - case *types.Request_InitChain: - res := s.app.InitChain(*r.InitChain) - responses <- types.ToResponseInitChain(res) - case *types.Request_BeginBlock: - res := s.app.BeginBlock(*r.BeginBlock) - responses <- types.ToResponseBeginBlock(res) - case *types.Request_EndBlock: - res := s.app.EndBlock(*r.EndBlock) - responses <- types.ToResponseEndBlock(res) - default: - responses <- types.ToResponseException("Unknown request") - } -} - -// Pull responses from 'responses' and write them to conn. -func (s *SocketServer) handleResponses(closeConn chan error, conn io.Writer, responses <-chan *types.Response) { - var count int - var bufWriter = bufio.NewWriter(conn) - for { - var res = <-responses - err := types.WriteMessage(res, bufWriter) - if err != nil { - closeConn <- fmt.Errorf("error writing message: %w", err) - return - } - if _, ok := res.Value.(*types.Response_Flush); ok { - err = bufWriter.Flush() - if err != nil { - closeConn <- fmt.Errorf("error flushing write buffer: %w", err) - return - } - } - count++ - } -} diff --git a/cmd/tendermint/commands/run_node.go b/cmd/tendermint/commands/run_node.go index 628a0d173..d9a8b21de 100644 --- a/cmd/tendermint/commands/run_node.go +++ b/cmd/tendermint/commands/run_node.go @@ -43,11 +43,10 @@ func AddNodeFlags(cmd *cobra.Command) { cmd.Flags().String( "proxy_app", config.ProxyApp, - "Proxy app address, or one of: 'kvstore',"+ + "Proxy app one of: 'kvstore',"+ " 'persistent_kvstore',"+ " 'counter',"+ " 'counter_serial' or 'noop' for local testing.") - cmd.Flags().String("abci", config.ABCI, "Specify abci transport (socket | grpc)") // rpc flags cmd.Flags().String("rpc.laddr", config.RPC.ListenAddress, "RPC listen address. Port required") diff --git a/config/config.go b/config/config.go index dbc8f8311..ee94b87e3 100644 --- a/config/config.go +++ b/config/config.go @@ -148,8 +148,7 @@ type BaseConfig struct { //nolint: maligned // This should be set in viper so it can unmarshal into this struct RootDir string `mapstructure:"home"` - // TCP or UNIX socket address of the ABCI application, - // or the name of an ABCI application compiled in with the Tendermint binary + // the name of an ABCI application compiled in with the Tendermint binary ProxyApp string `mapstructure:"proxy_app"` // A custom human readable name for this node @@ -203,9 +202,6 @@ type BaseConfig struct { //nolint: maligned // A JSON file containing the private key to use for p2p authenticated encryption NodeKey string `mapstructure:"node_key_file"` - // Mechanism to connect to the ABCI application: socket | grpc - ABCI string `mapstructure:"abci"` - // TCP or UNIX socket address for the profiling server to listen on ProfListenAddress string `mapstructure:"prof_laddr"` @@ -222,8 +218,7 @@ func DefaultBaseConfig() BaseConfig { PrivValidatorState: defaultPrivValStatePath, NodeKey: defaultNodeKeyPath, Moniker: defaultMoniker, - ProxyApp: "tcp://127.0.0.1:26658", - ABCI: "socket", + ProxyApp: "", LogLevel: DefaultPackageLogLevels(), LogFormat: LogFormatPlain, ProfListenAddress: "", diff --git a/config/toml.go b/config/toml.go index 6c83805cf..46c7b7253 100644 --- a/config/toml.go +++ b/config/toml.go @@ -74,8 +74,7 @@ const defaultConfigTemplate = `# This is a TOML config file. ##### main base config options ##### -# TCP or UNIX socket address of the ABCI application, -# or the name of an ABCI application compiled in with the Tendermint binary +# the name of an ABCI application compiled in with the Tendermint binary proxy_app = "{{ .BaseConfig.ProxyApp }}" # A custom human readable name for this node @@ -131,9 +130,6 @@ priv_validator_laddr = "{{ .BaseConfig.PrivValidatorListenAddr }}" # Path to the JSON file containing the private key to use for node authentication in the p2p protocol node_key_file = "{{ js .BaseConfig.NodeKey }}" -# Mechanism to connect to the ABCI application: socket | grpc -abci = "{{ .BaseConfig.ABCI }}" - # TCP or UNIX socket address for the profiling server to listen on prof_laddr = "{{ .BaseConfig.ProfListenAddress }}" diff --git a/consensus/replay_file.go b/consensus/replay_file.go index b8b8c51da..b04917735 100644 --- a/consensus/replay_file.go +++ b/consensus/replay_file.go @@ -292,7 +292,7 @@ func newConsensusStateForReplay(config cfg.BaseConfig, csConfig *cfg.ConsensusCo } // Create proxyAppConn connection (consensus, mempool, query) - clientCreator := proxy.DefaultClientCreator(config.ProxyApp, config.ABCI, config.DBDir()) + clientCreator := proxy.DefaultClientCreator(config.ProxyApp, config.DBDir()) proxyApp := proxy.NewAppConns(clientCreator) err = proxyApp.Start() if err != nil { diff --git a/docs/tendermint-core/configuration.md b/docs/tendermint-core/configuration.md index 141645f26..44a0fbee7 100644 --- a/docs/tendermint-core/configuration.md +++ b/docs/tendermint-core/configuration.md @@ -27,9 +27,8 @@ like the file below, however, double check by inspecting the ##### main base config options ##### -# TCP or UNIX socket address of the ABCI application, -# or the name of an ABCI application compiled in with the Tendermint binary -proxy_app = "tcp://127.0.0.1:26658" +# the name of an ABCI application compiled in with the Tendermint binary +proxy_app = "kvstore" # A custom human readable name for this node moniker = "anonymous" diff --git a/node/node.go b/node/node.go index 1ad026984..2f4601d54 100644 --- a/node/node.go +++ b/node/node.go @@ -95,7 +95,7 @@ func DefaultNewNode(config *cfg.Config, logger log.Logger) (*Node, error) { return NewNode(config, privval.LoadOrGenFilePV(config.PrivValidatorKeyFile(), config.PrivValidatorStateFile()), nodeKey, - proxy.DefaultClientCreator(config.ProxyApp, config.ABCI, config.DBDir()), + proxy.DefaultClientCreator(config.ProxyApp, config.DBDir()), DefaultGenesisDocProviderFunc(config), DefaultDBProvider, DefaultMetricsProvider(config.Instrumentation), diff --git a/proxy/client.go b/proxy/client.go index ed48dbc96..6e739e6fb 100644 --- a/proxy/client.go +++ b/proxy/client.go @@ -1,10 +1,9 @@ package proxy import ( + "fmt" "sync" - "github.com/pkg/errors" - abcicli "github.com/tendermint/tendermint/abci/client" "github.com/tendermint/tendermint/abci/example/counter" "github.com/tendermint/tendermint/abci/example/kvstore" @@ -35,35 +34,10 @@ func (l *localClientCreator) NewABCIClient() (abcicli.Client, error) { return abcicli.NewLocalClient(l.mtx, l.app), nil } -//--------------------------------------------------------------- -// remote proxy opens new connections to an external app process - -type remoteClientCreator struct { - addr string - transport string - mustConnect bool -} - -func NewRemoteClientCreator(addr, transport string, mustConnect bool) ClientCreator { - return &remoteClientCreator{ - addr: addr, - transport: transport, - mustConnect: mustConnect, - } -} - -func (r *remoteClientCreator) NewABCIClient() (abcicli.Client, error) { - remoteApp, err := abcicli.NewClient(r.addr, r.transport, r.mustConnect) - if err != nil { - return nil, errors.Wrap(err, "Failed to connect to proxy") - } - return remoteApp, nil -} - //----------------------------------------------------------------- // default -func DefaultClientCreator(addr, transport, dbDir string) ClientCreator { +func DefaultClientCreator(addr, dbDir string) ClientCreator { switch addr { case "counter": return NewLocalClientCreator(counter.NewApplication(false)) @@ -76,7 +50,6 @@ func DefaultClientCreator(addr, transport, dbDir string) ClientCreator { case "noop": return NewLocalClientCreator(types.NewBaseApplication()) default: - mustConnect := false // loop retrying - return NewRemoteClientCreator(addr, transport, mustConnect) + panic(fmt.Sprintf("unknown addr=%s", addr)) } } From 01cd97fc44ceaa48c616dbe3b7603f3eda4f9661 Mon Sep 17 00:00:00 2001 From: "Kim, JinSan" Date: Mon, 21 Dec 2020 16:06:45 +0900 Subject: [PATCH 2/2] chore: make it pass tests --- abci/cmd/abci-cli/abci-cli.go | 745 --------------------------- abci/cmd/abci-cli/main.go | 14 - abci/example/example.go | 3 - abci/example/example_test.go | 156 ------ abci/example/kvstore/kvstore_test.go | 121 ----- abci/tests/client_server_test.go | 27 - abci/tests/server/client.go | 96 ---- abci/tests/test_app/app.go | 78 --- abci/tests/test_app/main.go | 85 --- abci/tests/test_app/test.sh | 28 - mempool/clist_mempool_test.go | 58 --- node/node_test.go | 2 +- proxy/app_conn_test.go | 153 ------ 13 files changed, 1 insertion(+), 1565 deletions(-) delete mode 100644 abci/cmd/abci-cli/abci-cli.go delete mode 100644 abci/cmd/abci-cli/main.go delete mode 100644 abci/example/example.go delete mode 100644 abci/example/example_test.go delete mode 100644 abci/tests/client_server_test.go delete mode 100644 abci/tests/server/client.go delete mode 100644 abci/tests/test_app/app.go delete mode 100644 abci/tests/test_app/main.go delete mode 100755 abci/tests/test_app/test.sh delete mode 100644 proxy/app_conn_test.go diff --git a/abci/cmd/abci-cli/abci-cli.go b/abci/cmd/abci-cli/abci-cli.go deleted file mode 100644 index d5a9aca27..000000000 --- a/abci/cmd/abci-cli/abci-cli.go +++ /dev/null @@ -1,745 +0,0 @@ -package main - -import ( - "bufio" - "encoding/hex" - "errors" - "fmt" - "io" - "os" - "strings" - - "github.com/spf13/cobra" - - "github.com/tendermint/tendermint/libs/log" - tmos "github.com/tendermint/tendermint/libs/os" - - abcicli "github.com/tendermint/tendermint/abci/client" - "github.com/tendermint/tendermint/abci/example/code" - "github.com/tendermint/tendermint/abci/example/counter" - "github.com/tendermint/tendermint/abci/example/kvstore" - "github.com/tendermint/tendermint/abci/server" - servertest "github.com/tendermint/tendermint/abci/tests/server" - "github.com/tendermint/tendermint/abci/types" - "github.com/tendermint/tendermint/abci/version" - "github.com/tendermint/tendermint/crypto/merkle" -) - -// client is a global variable so it can be reused by the console -var ( - client abcicli.Client - logger log.Logger -) - -// flags -var ( - // global - flagAddress string - flagAbci string - flagVerbose bool // for the println output - flagLogLevel string // for the logger - - // query - flagPath string - flagHeight int - flagProve bool - - // counter - flagSerial bool - - // kvstore - flagPersist string -) - -var RootCmd = &cobra.Command{ - Use: "abci-cli", - Short: "the ABCI CLI tool wraps an ABCI client", - Long: "the ABCI CLI tool wraps an ABCI client and is used for testing ABCI servers", - PersistentPreRunE: func(cmd *cobra.Command, args []string) error { - - switch cmd.Use { - case "counter", "kvstore": // for the examples apps, don't pre-run - return nil - case "version": // skip running for version command - return nil - } - - if logger == nil { - allowLevel, err := log.AllowLevel(flagLogLevel) - if err != nil { - return err - } - logger = log.NewFilter(log.NewTMLogger(log.NewSyncWriter(os.Stdout)), allowLevel) - } - if client == nil { - var err error - client, err = abcicli.NewClient(flagAddress, flagAbci, false) - if err != nil { - return err - } - client.SetLogger(logger.With("module", "abci-client")) - if err := client.Start(); err != nil { - return err - } - } - return nil - }, -} - -// Structure for data passed to print response. -type response struct { - // generic abci response - Data []byte - Code uint32 - Info string - Log string - - Query *queryResponse -} - -type queryResponse struct { - Key []byte - Value []byte - Height int64 - Proof *merkle.Proof -} - -func Execute() error { - addGlobalFlags() - addCommands() - return RootCmd.Execute() -} - -func addGlobalFlags() { - RootCmd.PersistentFlags().StringVarP(&flagAddress, - "address", - "", - "tcp://0.0.0.0:26658", - "address of application socket") - RootCmd.PersistentFlags().StringVarP(&flagAbci, "abci", "", "socket", "either socket or grpc") - RootCmd.PersistentFlags().BoolVarP(&flagVerbose, - "verbose", - "v", - false, - "print the command and results as if it were a console session") - RootCmd.PersistentFlags().StringVarP(&flagLogLevel, "log_level", "", "debug", "set the logger level") -} - -func addQueryFlags() { - queryCmd.PersistentFlags().StringVarP(&flagPath, "path", "", "/store", "path to prefix query with") - queryCmd.PersistentFlags().IntVarP(&flagHeight, "height", "", 0, "height to query the blockchain at") - queryCmd.PersistentFlags().BoolVarP(&flagProve, - "prove", - "", - false, - "whether or not to return a merkle proof of the query result") -} - -func addCounterFlags() { - counterCmd.PersistentFlags().BoolVarP(&flagSerial, "serial", "", false, "enforce incrementing (serial) transactions") -} - -func addKVStoreFlags() { - kvstoreCmd.PersistentFlags().StringVarP(&flagPersist, "persist", "", "", "directory to use for a database") -} - -func addCommands() { - RootCmd.AddCommand(batchCmd) - RootCmd.AddCommand(consoleCmd) - RootCmd.AddCommand(echoCmd) - RootCmd.AddCommand(infoCmd) - RootCmd.AddCommand(setOptionCmd) - RootCmd.AddCommand(deliverTxCmd) - RootCmd.AddCommand(checkTxCmd) - RootCmd.AddCommand(commitCmd) - RootCmd.AddCommand(versionCmd) - RootCmd.AddCommand(testCmd) - addQueryFlags() - RootCmd.AddCommand(queryCmd) - - // examples - addCounterFlags() - RootCmd.AddCommand(counterCmd) - addKVStoreFlags() - RootCmd.AddCommand(kvstoreCmd) -} - -var batchCmd = &cobra.Command{ - Use: "batch", - Short: "run a batch of abci commands against an application", - Long: `run a batch of abci commands against an application - -This command is run by piping in a file containing a series of commands -you'd like to run: - - abci-cli batch < example.file - -where example.file looks something like: - - set_option serial on - check_tx 0x00 - check_tx 0xff - deliver_tx 0x00 - check_tx 0x00 - deliver_tx 0x01 - deliver_tx 0x04 - info -`, - Args: cobra.ExactArgs(0), - RunE: cmdBatch, -} - -var consoleCmd = &cobra.Command{ - Use: "console", - Short: "start an interactive ABCI console for multiple commands", - Long: `start an interactive ABCI console for multiple commands - -This command opens an interactive console for running any of the other commands -without opening a new connection each time -`, - Args: cobra.ExactArgs(0), - ValidArgs: []string{"echo", "info", "set_option", "deliver_tx", "check_tx", "commit", "query"}, - RunE: cmdConsole, -} - -var echoCmd = &cobra.Command{ - Use: "echo", - Short: "have the application echo a message", - Long: "have the application echo a message", - Args: cobra.ExactArgs(1), - RunE: cmdEcho, -} -var infoCmd = &cobra.Command{ - Use: "info", - Short: "get some info about the application", - Long: "get some info about the application", - Args: cobra.ExactArgs(0), - RunE: cmdInfo, -} -var setOptionCmd = &cobra.Command{ - Use: "set_option", - Short: "set an option on the application", - Long: "set an option on the application", - Args: cobra.ExactArgs(2), - RunE: cmdSetOption, -} - -var deliverTxCmd = &cobra.Command{ - Use: "deliver_tx", - Short: "deliver a new transaction to the application", - Long: "deliver a new transaction to the application", - Args: cobra.ExactArgs(1), - RunE: cmdDeliverTx, -} - -var checkTxCmd = &cobra.Command{ - Use: "check_tx", - Short: "validate a transaction", - Long: "validate a transaction", - Args: cobra.ExactArgs(1), - RunE: cmdCheckTx, -} - -var commitCmd = &cobra.Command{ - Use: "commit", - Short: "commit the application state and return the Merkle root hash", - Long: "commit the application state and return the Merkle root hash", - Args: cobra.ExactArgs(0), - RunE: cmdCommit, -} - -var versionCmd = &cobra.Command{ - Use: "version", - Short: "print ABCI console version", - Long: "print ABCI console version", - Args: cobra.ExactArgs(0), - RunE: func(cmd *cobra.Command, args []string) error { - fmt.Println(version.Version) - return nil - }, -} - -var queryCmd = &cobra.Command{ - Use: "query", - Short: "query the application state", - Long: "query the application state", - Args: cobra.ExactArgs(1), - RunE: cmdQuery, -} - -var counterCmd = &cobra.Command{ - Use: "counter", - Short: "ABCI demo example", - Long: "ABCI demo example", - Args: cobra.ExactArgs(0), - RunE: cmdCounter, -} - -var kvstoreCmd = &cobra.Command{ - Use: "kvstore", - Short: "ABCI demo example", - Long: "ABCI demo example", - Args: cobra.ExactArgs(0), - RunE: cmdKVStore, -} - -var testCmd = &cobra.Command{ - Use: "test", - Short: "run integration tests", - Long: "run integration tests", - Args: cobra.ExactArgs(0), - RunE: cmdTest, -} - -// Generates new Args array based off of previous call args to maintain flag persistence -func persistentArgs(line []byte) []string { - - // generate the arguments to run from original os.Args - // to maintain flag arguments - args := os.Args - args = args[:len(args)-1] // remove the previous command argument - - if len(line) > 0 { // prevents introduction of extra space leading to argument parse errors - args = append(args, strings.Split(string(line), " ")...) - } - return args -} - -//-------------------------------------------------------------------------------- - -func compose(fs []func() error) error { - if len(fs) == 0 { - return nil - } - - err := fs[0]() - if err == nil { - return compose(fs[1:]) - } - - return err -} - -func cmdTest(cmd *cobra.Command, args []string) error { - return compose( - []func() error{ - func() error { return servertest.InitChain(client) }, - func() error { return servertest.SetOption(client, "serial", "on") }, - func() error { return servertest.Commit(client, nil) }, - func() error { return servertest.DeliverTx(client, []byte("abc"), code.CodeTypeBadNonce, nil) }, - func() error { return servertest.Commit(client, nil) }, - func() error { return servertest.DeliverTx(client, []byte{0x00}, code.CodeTypeOK, nil) }, - func() error { return servertest.Commit(client, []byte{0, 0, 0, 0, 0, 0, 0, 1}) }, - func() error { return servertest.DeliverTx(client, []byte{0x00}, code.CodeTypeBadNonce, nil) }, - func() error { return servertest.DeliverTx(client, []byte{0x01}, code.CodeTypeOK, nil) }, - func() error { return servertest.DeliverTx(client, []byte{0x00, 0x02}, code.CodeTypeOK, nil) }, - func() error { return servertest.DeliverTx(client, []byte{0x00, 0x03}, code.CodeTypeOK, nil) }, - func() error { return servertest.DeliverTx(client, []byte{0x00, 0x00, 0x04}, code.CodeTypeOK, nil) }, - func() error { - return servertest.DeliverTx(client, []byte{0x00, 0x00, 0x06}, code.CodeTypeBadNonce, nil) - }, - func() error { return servertest.Commit(client, []byte{0, 0, 0, 0, 0, 0, 0, 5}) }, - }) -} - -func cmdBatch(cmd *cobra.Command, args []string) error { - bufReader := bufio.NewReader(os.Stdin) -LOOP: - for { - - line, more, err := bufReader.ReadLine() - switch { - case more: - return errors.New("input line is too long") - case err == io.EOF: - break LOOP - case len(line) == 0: - continue - case err != nil: - return err - } - - cmdArgs := persistentArgs(line) - if err := muxOnCommands(cmd, cmdArgs); err != nil { - return err - } - fmt.Println() - } - return nil -} - -func cmdConsole(cmd *cobra.Command, args []string) error { - for { - fmt.Printf("> ") - bufReader := bufio.NewReader(os.Stdin) - line, more, err := bufReader.ReadLine() - if more { - return errors.New("input is too long") - } else if err != nil { - return err - } - - pArgs := persistentArgs(line) - if err := muxOnCommands(cmd, pArgs); err != nil { - return err - } - } -} - -func muxOnCommands(cmd *cobra.Command, pArgs []string) error { - if len(pArgs) < 2 { - return errors.New("expecting persistent args of the form: abci-cli [command] <...>") - } - - // TODO: this parsing is fragile - args := []string{} - for i := 0; i < len(pArgs); i++ { - arg := pArgs[i] - - // check for flags - if strings.HasPrefix(arg, "-") { - // if it has an equal, we can just skip - if strings.Contains(arg, "=") { - continue - } - // if its a boolean, we can just skip - _, err := cmd.Flags().GetBool(strings.TrimLeft(arg, "-")) - if err == nil { - continue - } - - // otherwise, we need to skip the next one too - i++ - continue - } - - // append the actual arg - args = append(args, arg) - } - var subCommand string - var actualArgs []string - if len(args) > 1 { - subCommand = args[1] - } - if len(args) > 2 { - actualArgs = args[2:] - } - cmd.Use = subCommand // for later print statements ... - - switch strings.ToLower(subCommand) { - case "check_tx": - return cmdCheckTx(cmd, actualArgs) - case "commit": - return cmdCommit(cmd, actualArgs) - case "deliver_tx": - return cmdDeliverTx(cmd, actualArgs) - case "echo": - return cmdEcho(cmd, actualArgs) - case "info": - return cmdInfo(cmd, actualArgs) - case "query": - return cmdQuery(cmd, actualArgs) - case "set_option": - return cmdSetOption(cmd, actualArgs) - default: - return cmdUnimplemented(cmd, pArgs) - } -} - -func cmdUnimplemented(cmd *cobra.Command, args []string) error { - msg := "unimplemented command" - - if len(args) > 0 { - msg += fmt.Sprintf(" args: [%s]", strings.Join(args, " ")) - } - printResponse(cmd, args, response{ - Code: codeBad, - Log: msg, - }) - - fmt.Println("Available commands:") - fmt.Printf("%s: %s\n", echoCmd.Use, echoCmd.Short) - fmt.Printf("%s: %s\n", infoCmd.Use, infoCmd.Short) - fmt.Printf("%s: %s\n", checkTxCmd.Use, checkTxCmd.Short) - fmt.Printf("%s: %s\n", deliverTxCmd.Use, deliverTxCmd.Short) - fmt.Printf("%s: %s\n", queryCmd.Use, queryCmd.Short) - fmt.Printf("%s: %s\n", commitCmd.Use, commitCmd.Short) - fmt.Printf("%s: %s\n", setOptionCmd.Use, setOptionCmd.Short) - fmt.Println("Use \"[command] --help\" for more information about a command.") - - return nil -} - -// Have the application echo a message -func cmdEcho(cmd *cobra.Command, args []string) error { - msg := "" - if len(args) > 0 { - msg = args[0] - } - res, err := client.EchoSync(msg) - if err != nil { - return err - } - printResponse(cmd, args, response{ - Data: []byte(res.Message), - }) - return nil -} - -// Get some info from the application -func cmdInfo(cmd *cobra.Command, args []string) error { - var version string - if len(args) == 1 { - version = args[0] - } - res, err := client.InfoSync(types.RequestInfo{Version: version}) - if err != nil { - return err - } - printResponse(cmd, args, response{ - Data: []byte(res.Data), - }) - return nil -} - -const codeBad uint32 = 10 - -// Set an option on the application -func cmdSetOption(cmd *cobra.Command, args []string) error { - if len(args) < 2 { - printResponse(cmd, args, response{ - Code: codeBad, - Log: "want at least arguments of the form: ", - }) - return nil - } - - key, val := args[0], args[1] - _, err := client.SetOptionSync(types.RequestSetOption{Key: key, Value: val}) - if err != nil { - return err - } - printResponse(cmd, args, response{Log: "OK (SetOption doesn't return anything.)"}) // NOTE: Nothing to show... - return nil -} - -// Append a new tx to application -func cmdDeliverTx(cmd *cobra.Command, args []string) error { - if len(args) == 0 { - printResponse(cmd, args, response{ - Code: codeBad, - Log: "want the tx", - }) - return nil - } - txBytes, err := stringOrHexToBytes(args[0]) - if err != nil { - return err - } - res, err := client.DeliverTxSync(types.RequestDeliverTx{Tx: txBytes}) - if err != nil { - return err - } - printResponse(cmd, args, response{ - Code: res.Code, - Data: res.Data, - Info: res.Info, - Log: res.Log, - }) - return nil -} - -// Validate a tx -func cmdCheckTx(cmd *cobra.Command, args []string) error { - if len(args) == 0 { - printResponse(cmd, args, response{ - Code: codeBad, - Info: "want the tx", - }) - return nil - } - txBytes, err := stringOrHexToBytes(args[0]) - if err != nil { - return err - } - res, err := client.CheckTxSync(types.RequestCheckTx{Tx: txBytes}) - if err != nil { - return err - } - printResponse(cmd, args, response{ - Code: res.Code, - Data: res.Data, - Info: res.Info, - Log: res.Log, - }) - return nil -} - -// Get application Merkle root hash -func cmdCommit(cmd *cobra.Command, args []string) error { - res, err := client.CommitSync() - if err != nil { - return err - } - printResponse(cmd, args, response{ - Data: res.Data, - }) - return nil -} - -// Query application state -func cmdQuery(cmd *cobra.Command, args []string) error { - if len(args) == 0 { - printResponse(cmd, args, response{ - Code: codeBad, - Info: "want the query", - Log: "", - }) - return nil - } - queryBytes, err := stringOrHexToBytes(args[0]) - if err != nil { - return err - } - - resQuery, err := client.QuerySync(types.RequestQuery{ - Data: queryBytes, - Path: flagPath, - Height: int64(flagHeight), - Prove: flagProve, - }) - if err != nil { - return err - } - printResponse(cmd, args, response{ - Code: resQuery.Code, - Info: resQuery.Info, - Log: resQuery.Log, - Query: &queryResponse{ - Key: resQuery.Key, - Value: resQuery.Value, - Height: resQuery.Height, - Proof: resQuery.Proof, - }, - }) - return nil -} - -func cmdCounter(cmd *cobra.Command, args []string) error { - app := counter.NewApplication(flagSerial) - logger := log.NewTMLogger(log.NewSyncWriter(os.Stdout)) - - // Start the listener - srv, err := server.NewServer(flagAddress, flagAbci, app) - if err != nil { - return err - } - srv.SetLogger(logger.With("module", "abci-server")) - if err := srv.Start(); err != nil { - return err - } - - // Stop upon receiving SIGTERM or CTRL-C. - tmos.TrapSignal(logger, func() { - // Cleanup - srv.Stop() - }) - - // Run forever. - select {} -} - -func cmdKVStore(cmd *cobra.Command, args []string) error { - logger := log.NewTMLogger(log.NewSyncWriter(os.Stdout)) - - // Create the application - in memory or persisted to disk - var app types.Application - if flagPersist == "" { - app = kvstore.NewApplication() - } else { - app = kvstore.NewPersistentKVStoreApplication(flagPersist) - app.(*kvstore.PersistentKVStoreApplication).SetLogger(logger.With("module", "kvstore")) - } - - // Start the listener - srv, err := server.NewServer(flagAddress, flagAbci, app) - if err != nil { - return err - } - srv.SetLogger(logger.With("module", "abci-server")) - if err := srv.Start(); err != nil { - return err - } - - // Stop upon receiving SIGTERM or CTRL-C. - tmos.TrapSignal(logger, func() { - // Cleanup - srv.Stop() - }) - - // Run forever. - select {} -} - -//-------------------------------------------------------------------------------- - -func printResponse(cmd *cobra.Command, args []string, rsp response) { - - if flagVerbose { - fmt.Println(">", cmd.Use, strings.Join(args, " ")) - } - - // Always print the status code. - if rsp.Code == types.CodeTypeOK { - fmt.Printf("-> code: OK\n") - } else { - fmt.Printf("-> code: %d\n", rsp.Code) - - } - - if len(rsp.Data) != 0 { - // Do no print this line when using the commit command - // because the string comes out as gibberish - if cmd.Use != "commit" { - fmt.Printf("-> data: %s\n", rsp.Data) - } - fmt.Printf("-> data.hex: 0x%X\n", rsp.Data) - } - if rsp.Log != "" { - fmt.Printf("-> log: %s\n", rsp.Log) - } - - if rsp.Query != nil { - fmt.Printf("-> height: %d\n", rsp.Query.Height) - if rsp.Query.Key != nil { - fmt.Printf("-> key: %s\n", rsp.Query.Key) - fmt.Printf("-> key.hex: %X\n", rsp.Query.Key) - } - if rsp.Query.Value != nil { - fmt.Printf("-> value: %s\n", rsp.Query.Value) - fmt.Printf("-> value.hex: %X\n", rsp.Query.Value) - } - if rsp.Query.Proof != nil { - fmt.Printf("-> proof: %#v\n", rsp.Query.Proof) - } - } -} - -// NOTE: s is interpreted as a string unless prefixed with 0x -func stringOrHexToBytes(s string) ([]byte, error) { - if len(s) > 2 && strings.ToLower(s[:2]) == "0x" { - b, err := hex.DecodeString(s[2:]) - if err != nil { - err = fmt.Errorf("error decoding hex argument: %s", err.Error()) - return nil, err - } - return b, nil - } - - if !strings.HasPrefix(s, "\"") || !strings.HasSuffix(s, "\"") { - err := fmt.Errorf("invalid string arg: \"%s\". Must be quoted or a \"0x\"-prefixed hex string", s) - return nil, err - } - - return []byte(s[1 : len(s)-1]), nil -} diff --git a/abci/cmd/abci-cli/main.go b/abci/cmd/abci-cli/main.go deleted file mode 100644 index a927e7ed8..000000000 --- a/abci/cmd/abci-cli/main.go +++ /dev/null @@ -1,14 +0,0 @@ -package main - -import ( - "fmt" - "os" -) - -func main() { - err := Execute() - if err != nil { - fmt.Print(err) - os.Exit(1) - } -} diff --git a/abci/example/example.go b/abci/example/example.go deleted file mode 100644 index ee491c1b5..000000000 --- a/abci/example/example.go +++ /dev/null @@ -1,3 +0,0 @@ -package example - -// so the go tool doesn't return errors about no buildable go files ... diff --git a/abci/example/example_test.go b/abci/example/example_test.go deleted file mode 100644 index d40976015..000000000 --- a/abci/example/example_test.go +++ /dev/null @@ -1,156 +0,0 @@ -package example - -import ( - "fmt" - "net" - "reflect" - "testing" - "time" - - "github.com/stretchr/testify/require" - - "google.golang.org/grpc" - - "golang.org/x/net/context" - - "github.com/tendermint/tendermint/libs/log" - tmnet "github.com/tendermint/tendermint/libs/net" - - abcicli "github.com/tendermint/tendermint/abci/client" - "github.com/tendermint/tendermint/abci/example/code" - "github.com/tendermint/tendermint/abci/example/kvstore" - abciserver "github.com/tendermint/tendermint/abci/server" - "github.com/tendermint/tendermint/abci/types" -) - -func TestKVStore(t *testing.T) { - fmt.Println("### Testing KVStore") - testStream(t, kvstore.NewApplication()) -} - -func TestBaseApp(t *testing.T) { - fmt.Println("### Testing BaseApp") - testStream(t, types.NewBaseApplication()) -} - -func TestGRPC(t *testing.T) { - fmt.Println("### Testing GRPC") - testGRPCSync(t, types.NewGRPCApplication(types.NewBaseApplication())) -} - -func testStream(t *testing.T, app types.Application) { - numDeliverTxs := 20000 - - // Start the listener - server := abciserver.NewSocketServer("unix://test.sock", app) - server.SetLogger(log.TestingLogger().With("module", "abci-server")) - if err := server.Start(); err != nil { - require.NoError(t, err, "Error starting socket server") - } - defer server.Stop() - - // Connect to the socket - client := abcicli.NewSocketClient("unix://test.sock", false) - client.SetLogger(log.TestingLogger().With("module", "abci-client")) - if err := client.Start(); err != nil { - t.Fatalf("Error starting socket client: %v", err.Error()) - } - defer client.Stop() - - done := make(chan struct{}) - counter := 0 - client.SetResponseCallback(func(req *types.Request, res *types.Response) { - // Process response - switch r := res.Value.(type) { - case *types.Response_DeliverTx: - counter++ - if r.DeliverTx.Code != code.CodeTypeOK { - t.Error("DeliverTx failed with ret_code", r.DeliverTx.Code) - } - if counter > numDeliverTxs { - t.Fatalf("Too many DeliverTx responses. Got %d, expected %d", counter, numDeliverTxs) - } - if counter == numDeliverTxs { - go func() { - time.Sleep(time.Second * 1) // Wait for a bit to allow counter overflow - close(done) - }() - return - } - case *types.Response_Flush: - // ignore - default: - t.Error("Unexpected response type", reflect.TypeOf(res.Value)) - } - }) - - // Write requests - for counter := 0; counter < numDeliverTxs; counter++ { - // Send request - reqRes := client.DeliverTxAsync(types.RequestDeliverTx{Tx: []byte("test")}) - _ = reqRes - // check err ? - - // Sometimes send flush messages - if counter%123 == 0 { - client.FlushAsync() - // check err ? - } - } - - // Send final flush message - client.FlushAsync() - - <-done -} - -//------------------------- -// test grpc - -func dialerFunc(ctx context.Context, addr string) (net.Conn, error) { - return tmnet.Connect(addr) -} - -func testGRPCSync(t *testing.T, app types.ABCIApplicationServer) { - numDeliverTxs := 2000 - - // Start the listener - server := abciserver.NewGRPCServer("unix://test.sock", app) - server.SetLogger(log.TestingLogger().With("module", "abci-server")) - if err := server.Start(); err != nil { - t.Fatalf("Error starting GRPC server: %v", err.Error()) - } - defer server.Stop() - - // Connect to the socket - conn, err := grpc.Dial("unix://test.sock", grpc.WithInsecure(), grpc.WithContextDialer(dialerFunc)) - if err != nil { - t.Fatalf("Error dialing GRPC server: %v", err.Error()) - } - defer conn.Close() - - client := types.NewABCIApplicationClient(conn) - - // Write requests - for counter := 0; counter < numDeliverTxs; counter++ { - // Send request - response, err := client.DeliverTx(context.Background(), &types.RequestDeliverTx{Tx: []byte("test")}) - if err != nil { - t.Fatalf("Error in GRPC DeliverTx: %v", err.Error()) - } - counter++ - if response.Code != code.CodeTypeOK { - t.Error("DeliverTx failed with ret_code", response.Code) - } - if counter > numDeliverTxs { - t.Fatal("Too many DeliverTx responses") - } - t.Log("response", counter) - if counter == numDeliverTxs { - go func() { - time.Sleep(time.Second * 1) // Wait for a bit to allow counter overflow - }() - } - - } -} diff --git a/abci/example/kvstore/kvstore_test.go b/abci/example/kvstore/kvstore_test.go index 4d8c829ad..b29dca993 100644 --- a/abci/example/kvstore/kvstore_test.go +++ b/abci/example/kvstore/kvstore_test.go @@ -2,19 +2,13 @@ package kvstore import ( "bytes" - "fmt" "io/ioutil" "sort" "testing" "github.com/stretchr/testify/require" - "github.com/tendermint/tendermint/libs/log" - "github.com/tendermint/tendermint/libs/service" - - abcicli "github.com/tendermint/tendermint/abci/client" "github.com/tendermint/tendermint/abci/example/code" - abciserver "github.com/tendermint/tendermint/abci/server" "github.com/tendermint/tendermint/abci/types" ) @@ -225,118 +219,3 @@ func valsEqual(t *testing.T, vals1, vals2 []types.ValidatorUpdate) { } } } - -func makeSocketClientServer(app types.Application, name string) (abcicli.Client, service.Service, error) { - // Start the listener - socket := fmt.Sprintf("unix://%s.sock", name) - logger := log.TestingLogger() - - server := abciserver.NewSocketServer(socket, app) - server.SetLogger(logger.With("module", "abci-server")) - if err := server.Start(); err != nil { - return nil, nil, err - } - - // Connect to the socket - client := abcicli.NewSocketClient(socket, false) - client.SetLogger(logger.With("module", "abci-client")) - if err := client.Start(); err != nil { - server.Stop() - return nil, nil, err - } - - return client, server, nil -} - -func makeGRPCClientServer(app types.Application, name string) (abcicli.Client, service.Service, error) { - // Start the listener - socket := fmt.Sprintf("unix://%s.sock", name) - logger := log.TestingLogger() - - gapp := types.NewGRPCApplication(app) - server := abciserver.NewGRPCServer(socket, gapp) - server.SetLogger(logger.With("module", "abci-server")) - if err := server.Start(); err != nil { - return nil, nil, err - } - - client := abcicli.NewGRPCClient(socket, true) - client.SetLogger(logger.With("module", "abci-client")) - if err := client.Start(); err != nil { - server.Stop() - return nil, nil, err - } - return client, server, nil -} - -func TestClientServer(t *testing.T) { - // set up socket app - kvstore := NewApplication() - client, server, err := makeSocketClientServer(kvstore, "kvstore-socket") - require.Nil(t, err) - defer server.Stop() - defer client.Stop() - - runClientTests(t, client) - - // set up grpc app - kvstore = NewApplication() - gclient, gserver, err := makeGRPCClientServer(kvstore, "kvstore-grpc") - require.Nil(t, err) - defer gserver.Stop() - defer gclient.Stop() - - runClientTests(t, gclient) -} - -func runClientTests(t *testing.T, client abcicli.Client) { - // run some tests.... - key := testKey - value := key - tx := []byte(key) - testClient(t, client, tx, key, value) - - value = testValue - tx = []byte(key + "=" + value) - testClient(t, client, tx, key, value) -} - -func testClient(t *testing.T, app abcicli.Client, tx []byte, key, value string) { - ar, err := app.DeliverTxSync(types.RequestDeliverTx{Tx: tx}) - require.NoError(t, err) - require.False(t, ar.IsErr(), ar) - // repeating tx doesn't raise error - ar, err = app.DeliverTxSync(types.RequestDeliverTx{Tx: tx}) - require.NoError(t, err) - require.False(t, ar.IsErr(), ar) - // commit - _, err = app.CommitSync() - require.NoError(t, err) - - info, err := app.InfoSync(types.RequestInfo{}) - require.NoError(t, err) - require.NotZero(t, info.LastBlockHeight) - - // make sure query is fine - resQuery, err := app.QuerySync(types.RequestQuery{ - Path: "/store", - Data: []byte(key), - }) - require.Nil(t, err) - require.Equal(t, code.CodeTypeOK, resQuery.Code) - require.Equal(t, key, string(resQuery.Key)) - require.Equal(t, value, string(resQuery.Value)) - require.EqualValues(t, info.LastBlockHeight, resQuery.Height) - - // make sure proof is fine - resQuery, err = app.QuerySync(types.RequestQuery{ - Path: "/store", - Data: []byte(key), - Prove: true, - }) - require.Nil(t, err) - require.Equal(t, code.CodeTypeOK, resQuery.Code) - require.Equal(t, key, string(resQuery.Key)) - require.Equal(t, value, string(resQuery.Value)) - require.EqualValues(t, info.LastBlockHeight, resQuery.Height) -} diff --git a/abci/tests/client_server_test.go b/abci/tests/client_server_test.go deleted file mode 100644 index 2ef64e66a..000000000 --- a/abci/tests/client_server_test.go +++ /dev/null @@ -1,27 +0,0 @@ -package tests - -import ( - "testing" - - "github.com/stretchr/testify/assert" - - abciclient "github.com/tendermint/tendermint/abci/client" - "github.com/tendermint/tendermint/abci/example/kvstore" - abciserver "github.com/tendermint/tendermint/abci/server" -) - -func TestClientServerNoAddrPrefix(t *testing.T) { - addr := "localhost:26658" - transport := "socket" - app := kvstore.NewApplication() - - server, err := abciserver.NewServer(addr, transport, app) - assert.NoError(t, err, "expected no error on NewServer") - err = server.Start() - assert.NoError(t, err, "expected no error on server.Start") - - client, err := abciclient.NewClient(addr, transport, true) - assert.NoError(t, err, "expected no error on NewClient") - err = client.Start() - assert.NoError(t, err, "expected no error on client.Start") -} diff --git a/abci/tests/server/client.go b/abci/tests/server/client.go deleted file mode 100644 index 36989f6ac..000000000 --- a/abci/tests/server/client.go +++ /dev/null @@ -1,96 +0,0 @@ -package testsuite - -import ( - "bytes" - "errors" - "fmt" - - abcicli "github.com/tendermint/tendermint/abci/client" - "github.com/tendermint/tendermint/abci/types" - tmrand "github.com/tendermint/tendermint/libs/rand" -) - -func InitChain(client abcicli.Client) error { - total := 10 - vals := make([]types.ValidatorUpdate, total) - for i := 0; i < total; i++ { - pubkey := tmrand.Bytes(33) - power := tmrand.Int() - vals[i] = types.Ed25519ValidatorUpdate(pubkey, int64(power)) - } - _, err := client.InitChainSync(types.RequestInitChain{ - Validators: vals, - }) - if err != nil { - fmt.Printf("Failed test: InitChain - %v\n", err) - return err - } - fmt.Println("Passed test: InitChain") - return nil -} - -func SetOption(client abcicli.Client, key, value string) error { - _, err := client.SetOptionSync(types.RequestSetOption{Key: key, Value: value}) - if err != nil { - fmt.Println("Failed test: SetOption") - fmt.Printf("error while setting %v=%v: \nerror: %v\n", key, value, err) - return err - } - fmt.Println("Passed test: SetOption") - return nil -} - -func Commit(client abcicli.Client, hashExp []byte) error { - res, err := client.CommitSync() - data := res.Data - if err != nil { - fmt.Println("Failed test: Commit") - fmt.Printf("error while committing: %v\n", err) - return err - } - if !bytes.Equal(data, hashExp) { - fmt.Println("Failed test: Commit") - fmt.Printf("Commit hash was unexpected. Got %X expected %X\n", data, hashExp) - return errors.New("commitTx failed") - } - fmt.Println("Passed test: Commit") - return nil -} - -func DeliverTx(client abcicli.Client, txBytes []byte, codeExp uint32, dataExp []byte) error { - res, _ := client.DeliverTxSync(types.RequestDeliverTx{Tx: txBytes}) - code, data, log := res.Code, res.Data, res.Log - if code != codeExp { - fmt.Println("Failed test: DeliverTx") - fmt.Printf("DeliverTx response code was unexpected. Got %v expected %v. Log: %v\n", - code, codeExp, log) - return errors.New("deliverTx error") - } - if !bytes.Equal(data, dataExp) { - fmt.Println("Failed test: DeliverTx") - fmt.Printf("DeliverTx response data was unexpected. Got %X expected %X\n", - data, dataExp) - return errors.New("deliverTx error") - } - fmt.Println("Passed test: DeliverTx") - return nil -} - -func CheckTx(client abcicli.Client, txBytes []byte, codeExp uint32, dataExp []byte) error { - res, _ := client.CheckTxSync(types.RequestCheckTx{Tx: txBytes}) - code, data, log := res.Code, res.Data, res.Log - if code != codeExp { - fmt.Println("Failed test: CheckTx") - fmt.Printf("CheckTx response code was unexpected. Got %v expected %v. Log: %v\n", - code, codeExp, log) - return errors.New("checkTx") - } - if !bytes.Equal(data, dataExp) { - fmt.Println("Failed test: CheckTx") - fmt.Printf("CheckTx response data was unexpected. Got %X expected %X\n", - data, dataExp) - return errors.New("checkTx") - } - fmt.Println("Passed test: CheckTx") - return nil -} diff --git a/abci/tests/test_app/app.go b/abci/tests/test_app/app.go deleted file mode 100644 index 9c32fcc7d..000000000 --- a/abci/tests/test_app/app.go +++ /dev/null @@ -1,78 +0,0 @@ -package main - -import ( - "bytes" - "fmt" - "os" - - abcicli "github.com/tendermint/tendermint/abci/client" - "github.com/tendermint/tendermint/abci/types" - "github.com/tendermint/tendermint/libs/log" -) - -func startClient(abciType string) abcicli.Client { - // Start client - client, err := abcicli.NewClient("tcp://127.0.0.1:26658", abciType, true) - if err != nil { - panic(err.Error()) - } - logger := log.NewTMLogger(log.NewSyncWriter(os.Stdout)) - client.SetLogger(logger.With("module", "abcicli")) - if err := client.Start(); err != nil { - panicf("connecting to abci_app: %v", err.Error()) - } - - return client -} - -func setOption(client abcicli.Client, key, value string) { - _, err := client.SetOptionSync(types.RequestSetOption{Key: key, Value: value}) - if err != nil { - panicf("setting %v=%v: \nerr: %v", key, value, err) - } -} - -func commit(client abcicli.Client, hashExp []byte) { - res, err := client.CommitSync() - if err != nil { - panicf("client error: %v", err) - } - if !bytes.Equal(res.Data, hashExp) { - panicf("Commit hash was unexpected. Got %X expected %X", res.Data, hashExp) - } -} - -func deliverTx(client abcicli.Client, txBytes []byte, codeExp uint32, dataExp []byte) { - res, err := client.DeliverTxSync(types.RequestDeliverTx{Tx: txBytes}) - if err != nil { - panicf("client error: %v", err) - } - if res.Code != codeExp { - panicf("DeliverTx response code was unexpected. Got %v expected %v. Log: %v", res.Code, codeExp, res.Log) - } - if !bytes.Equal(res.Data, dataExp) { - panicf("DeliverTx response data was unexpected. Got %X expected %X", res.Data, dataExp) - } -} - -/*func checkTx(client abcicli.Client, txBytes []byte, codeExp uint32, dataExp []byte) { - res, err := client.CheckTxSync(txBytes) - if err != nil { - panicf("client error: %v", err) - } - if res.IsErr() { - panicf("checking tx %X: %v\nlog: %v", txBytes, res.Log) - } - if res.Code != codeExp { - panicf("CheckTx response code was unexpected. Got %v expected %v. Log: %v", - res.Code, codeExp, res.Log) - } - if !bytes.Equal(res.Data, dataExp) { - panicf("CheckTx response data was unexpected. Got %X expected %X", - res.Data, dataExp) - } -}*/ - -func panicf(format string, a ...interface{}) { - panic(fmt.Sprintf(format, a...)) -} diff --git a/abci/tests/test_app/main.go b/abci/tests/test_app/main.go deleted file mode 100644 index ca298d7e2..000000000 --- a/abci/tests/test_app/main.go +++ /dev/null @@ -1,85 +0,0 @@ -package main - -import ( - "fmt" - "log" - "os" - "os/exec" - "time" - - "github.com/tendermint/tendermint/abci/example/code" - "github.com/tendermint/tendermint/abci/types" -) - -var abciType string - -func init() { - abciType = os.Getenv("ABCI") - if abciType == "" { - abciType = "socket" - } -} - -func main() { - testCounter() -} - -const ( - maxABCIConnectTries = 10 -) - -func ensureABCIIsUp(typ string, n int) error { - var err error - cmdString := "abci-cli echo hello" - if typ == "grpc" { - cmdString = "abci-cli --abci grpc echo hello" - } - - for i := 0; i < n; i++ { - cmd := exec.Command("bash", "-c", cmdString) - _, err = cmd.CombinedOutput() - if err == nil { - break - } - <-time.After(500 * time.Millisecond) - } - return err -} - -func testCounter() { - abciApp := os.Getenv("ABCI_APP") - if abciApp == "" { - panic("No ABCI_APP specified") - } - - fmt.Printf("Running %s test with abci=%s\n", abciApp, abciType) - subCommand := fmt.Sprintf("abci-cli %s", abciApp) - cmd := exec.Command("bash", "-c", subCommand) - cmd.Stdout = os.Stdout - if err := cmd.Start(); err != nil { - log.Fatalf("starting %q err: %v", abciApp, err) - } - defer cmd.Wait() - defer cmd.Process.Kill() - - if err := ensureABCIIsUp(abciType, maxABCIConnectTries); err != nil { - log.Fatalf("echo failed: %v", err) - } - - client := startClient(abciType) - defer client.Stop() - - setOption(client, "serial", "on") - commit(client, nil) - deliverTx(client, []byte("abc"), code.CodeTypeBadNonce, nil) - commit(client, nil) - deliverTx(client, []byte{0x00}, types.CodeTypeOK, nil) - commit(client, []byte{0, 0, 0, 0, 0, 0, 0, 1}) - deliverTx(client, []byte{0x00}, code.CodeTypeBadNonce, nil) - deliverTx(client, []byte{0x01}, types.CodeTypeOK, nil) - deliverTx(client, []byte{0x00, 0x02}, types.CodeTypeOK, nil) - deliverTx(client, []byte{0x00, 0x03}, types.CodeTypeOK, nil) - deliverTx(client, []byte{0x00, 0x00, 0x04}, types.CodeTypeOK, nil) - deliverTx(client, []byte{0x00, 0x00, 0x06}, code.CodeTypeBadNonce, nil) - commit(client, []byte{0, 0, 0, 0, 0, 0, 0, 5}) -} diff --git a/abci/tests/test_app/test.sh b/abci/tests/test_app/test.sh deleted file mode 100755 index 0d8301831..000000000 --- a/abci/tests/test_app/test.sh +++ /dev/null @@ -1,28 +0,0 @@ -#! /bin/bash -set -e - -# These tests spawn the counter app and server by execing the ABCI_APP command and run some simple client tests against it - -# Get the directory of where this script is. -export PATH="$GOBIN:$PATH" -SOURCE="${BASH_SOURCE[0]}" -while [ -h "$SOURCE" ] ; do SOURCE="$(readlink "$SOURCE")"; done -DIR="$( cd -P "$( dirname "$SOURCE" )" && pwd )" - -# Change into that dir because we expect that. -cd "$DIR" - -echo "RUN COUNTER OVER SOCKET" -# test golang counter -ABCI_APP="counter" go run -mod=readonly ./*.go -echo "----------------------" - - -echo "RUN COUNTER OVER GRPC" -# test golang counter via grpc -ABCI_APP="counter --abci=grpc" ABCI="grpc" go run -mod=readonly ./*.go -echo "----------------------" - -# test nodejs counter -# TODO: fix node app -#ABCI_APP="node $GOPATH/src/github.com/tendermint/js-abci/example/app.js" go test -test.run TestCounter diff --git a/mempool/clist_mempool_test.go b/mempool/clist_mempool_test.go index 17ab83f33..f7fd603b9 100644 --- a/mempool/clist_mempool_test.go +++ b/mempool/clist_mempool_test.go @@ -6,7 +6,6 @@ import ( "encoding/binary" "fmt" "io/ioutil" - mrand "math/rand" "os" "path/filepath" "testing" @@ -19,12 +18,10 @@ import ( "github.com/tendermint/tendermint/abci/example/counter" "github.com/tendermint/tendermint/abci/example/kvstore" - abciserver "github.com/tendermint/tendermint/abci/server" abci "github.com/tendermint/tendermint/abci/types" cfg "github.com/tendermint/tendermint/config" "github.com/tendermint/tendermint/libs/log" tmrand "github.com/tendermint/tendermint/libs/rand" - "github.com/tendermint/tendermint/libs/service" "github.com/tendermint/tendermint/proxy" "github.com/tendermint/tendermint/types" ) @@ -538,61 +535,6 @@ func TestMempoolTxsBytes(t *testing.T) { assert.EqualValues(t, 0, mempool.TxsBytes()) } -// This will non-deterministically catch some concurrency failures like -// https://github.com/tendermint/tendermint/issues/3509 -// TODO: all of the tests should probably also run using the remote proxy app -// since otherwise we're not actually testing the concurrency of the mempool here! -func TestMempoolRemoteAppConcurrency(t *testing.T) { - sockPath := fmt.Sprintf("unix:///tmp/echo_%v.sock", tmrand.Str(6)) - app := kvstore.NewApplication() - cc, server := newRemoteApp(t, sockPath, app) - defer server.Stop() - config := cfg.ResetTestRoot("mempool_test") - mempool, cleanup := newMempoolWithAppAndConfig(cc, config) - defer cleanup() - - // generate small number of txs - nTxs := 10 - txLen := 200 - txs := make([]types.Tx, nTxs) - for i := 0; i < nTxs; i++ { - txs[i] = tmrand.Bytes(txLen) - } - - // simulate a group of peers sending them over and over - N := config.Mempool.Size - maxPeers := 5 - for i := 0; i < N; i++ { - peerID := mrand.Intn(maxPeers) - txNum := mrand.Intn(nTxs) - tx := txs[txNum] - - // this will err with ErrTxInCache many times ... - mempool.CheckTx(tx, nil, TxInfo{SenderID: uint16(peerID)}) - } - err := mempool.FlushAppConn() - require.NoError(t, err) -} - -// caller must close server -func newRemoteApp( - t *testing.T, - addr string, - app abci.Application, -) ( - clientCreator proxy.ClientCreator, - server service.Service, -) { - clientCreator = proxy.NewRemoteClientCreator(addr, "socket", true) - - // Start server - server = abciserver.NewSocketServer(addr, app) - server.SetLogger(log.TestingLogger().With("module", "abci-server")) - if err := server.Start(); err != nil { - t.Fatalf("Error starting socket server: %v", err.Error()) - } - return clientCreator, server -} func checksumIt(data []byte) string { h := sha256.New() h.Write(data) diff --git a/node/node_test.go b/node/node_test.go index cd52ad06e..6244c03ee 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -304,7 +304,7 @@ func TestNodeNewNodeCustomReactors(t *testing.T) { n, err := NewNode(config, privval.LoadOrGenFilePV(config.PrivValidatorKeyFile(), config.PrivValidatorStateFile()), nodeKey, - proxy.DefaultClientCreator(config.ProxyApp, config.ABCI, config.DBDir()), + proxy.DefaultClientCreator(config.ProxyApp, config.DBDir()), DefaultGenesisDocProviderFunc(config), DefaultDBProvider, DefaultMetricsProvider(config.Instrumentation), diff --git a/proxy/app_conn_test.go b/proxy/app_conn_test.go deleted file mode 100644 index ca15f8977..000000000 --- a/proxy/app_conn_test.go +++ /dev/null @@ -1,153 +0,0 @@ -package proxy - -import ( - "fmt" - "strings" - "testing" - - abcicli "github.com/tendermint/tendermint/abci/client" - "github.com/tendermint/tendermint/abci/example/kvstore" - "github.com/tendermint/tendermint/abci/server" - "github.com/tendermint/tendermint/abci/types" - "github.com/tendermint/tendermint/libs/log" - tmrand "github.com/tendermint/tendermint/libs/rand" -) - -//---------------------------------------- - -type AppConnTest interface { - EchoAsync(string) *abcicli.ReqRes - FlushSync() error - InfoSync(types.RequestInfo) (*types.ResponseInfo, error) -} - -type appConnTest struct { - appConn abcicli.Client -} - -func NewAppConnTest(appConn abcicli.Client) AppConnTest { - return &appConnTest{appConn} -} - -func (app *appConnTest) EchoAsync(msg string) *abcicli.ReqRes { - return app.appConn.EchoAsync(msg) -} - -func (app *appConnTest) FlushSync() error { - return app.appConn.FlushSync() -} - -func (app *appConnTest) InfoSync(req types.RequestInfo) (*types.ResponseInfo, error) { - return app.appConn.InfoSync(req) -} - -//---------------------------------------- - -var SOCKET = "socket" - -func TestEcho(t *testing.T) { - sockPath := fmt.Sprintf("unix:///tmp/echo_%v.sock", tmrand.Str(6)) - clientCreator := NewRemoteClientCreator(sockPath, SOCKET, true) - - // Start server - s := server.NewSocketServer(sockPath, kvstore.NewApplication()) - s.SetLogger(log.TestingLogger().With("module", "abci-server")) - if err := s.Start(); err != nil { - t.Fatalf("Error starting socket server: %v", err.Error()) - } - defer s.Stop() - - // Start client - cli, err := clientCreator.NewABCIClient() - if err != nil { - t.Fatalf("Error creating ABCI client: %v", err.Error()) - } - cli.SetLogger(log.TestingLogger().With("module", "abci-client")) - if err := cli.Start(); err != nil { - t.Fatalf("Error starting ABCI client: %v", err.Error()) - } - - proxy := NewAppConnTest(cli) - t.Log("Connected") - - for i := 0; i < 1000; i++ { - proxy.EchoAsync(fmt.Sprintf("echo-%v", i)) - } - if err := proxy.FlushSync(); err != nil { - t.Error(err) - } -} - -func BenchmarkEcho(b *testing.B) { - b.StopTimer() // Initialize - sockPath := fmt.Sprintf("unix:///tmp/echo_%v.sock", tmrand.Str(6)) - clientCreator := NewRemoteClientCreator(sockPath, SOCKET, true) - - // Start server - s := server.NewSocketServer(sockPath, kvstore.NewApplication()) - s.SetLogger(log.TestingLogger().With("module", "abci-server")) - if err := s.Start(); err != nil { - b.Fatalf("Error starting socket server: %v", err.Error()) - } - defer s.Stop() - - // Start client - cli, err := clientCreator.NewABCIClient() - if err != nil { - b.Fatalf("Error creating ABCI client: %v", err.Error()) - } - cli.SetLogger(log.TestingLogger().With("module", "abci-client")) - if err := cli.Start(); err != nil { - b.Fatalf("Error starting ABCI client: %v", err.Error()) - } - - proxy := NewAppConnTest(cli) - b.Log("Connected") - echoString := strings.Repeat(" ", 200) - b.StartTimer() // Start benchmarking tests - - for i := 0; i < b.N; i++ { - proxy.EchoAsync(echoString) - } - if err := proxy.FlushSync(); err != nil { - b.Error(err) - } - - b.StopTimer() - // info := proxy.InfoSync(types.RequestInfo{""}) - //b.Log("N: ", b.N, info) -} - -func TestInfo(t *testing.T) { - sockPath := fmt.Sprintf("unix:///tmp/echo_%v.sock", tmrand.Str(6)) - clientCreator := NewRemoteClientCreator(sockPath, SOCKET, true) - - // Start server - s := server.NewSocketServer(sockPath, kvstore.NewApplication()) - s.SetLogger(log.TestingLogger().With("module", "abci-server")) - if err := s.Start(); err != nil { - t.Fatalf("Error starting socket server: %v", err.Error()) - } - defer s.Stop() - - // Start client - cli, err := clientCreator.NewABCIClient() - if err != nil { - t.Fatalf("Error creating ABCI client: %v", err.Error()) - } - cli.SetLogger(log.TestingLogger().With("module", "abci-client")) - if err := cli.Start(); err != nil { - t.Fatalf("Error starting ABCI client: %v", err.Error()) - } - - proxy := NewAppConnTest(cli) - t.Log("Connected") - - resInfo, err := proxy.InfoSync(RequestInfo) - if err != nil { - t.Errorf("unexpected error: %v", err) - } - if resInfo.Data != "{\"size\":0}" { - t.Error("Expected ResponseInfo with one element '{\"size\":0}' but got something else") - } -}