diff --git a/store/streaming/file/server/cmd/root.go b/store/streaming/file/server/cmd/root.go index ca76434d63d..15478f5386e 100644 --- a/store/streaming/file/server/cmd/root.go +++ b/store/streaming/file/server/cmd/root.go @@ -27,7 +27,7 @@ const ( var ( configPath string - serverCfg *config.StateServerConfig + serverCfg *config.StateFileServerConfig interfaceRegistry = codecTypes.NewInterfaceRegistry() marshaller = codec.NewProtoCodec(interfaceRegistry) ) @@ -37,6 +37,7 @@ var rootCmd = &cobra.Command{ PersistentPreRun: initFuncs, } +// Execute is the top level command entry point func Execute() { log.Print("----- Starting Cosmos state change file server -----") if err := rootCmd.Execute(); err != nil { @@ -79,7 +80,7 @@ func initFuncs(cmd *cobra.Command, args []string) { } func initConfig() { - serverCfg = config.DefaultStateServerConfig() + serverCfg = config.DefaultStateFileServerConfig() if configPath != "" { switch _, err := os.Stat(configPath); { case os.IsNotExist(err): diff --git a/store/streaming/file/server/cmd/serve.go b/store/streaming/file/server/cmd/serve.go index cac3205a5ee..10b49d264a7 100644 --- a/store/streaming/file/server/cmd/serve.go +++ b/store/streaming/file/server/cmd/serve.go @@ -18,14 +18,6 @@ import ( ) const ( - tomlFlagRemoveAfter = "file-server.remove-after" - tomlFlagFilePrefix = "file-server.file-prefix" - tomlFlagReadDir = "file-server.read-dir" - tomlFlagChainID = "file-server.chain-id" - tomlFlagGRPCAddress = "file-server.grpc-address" - tomlFlagGRPCWebEnable = "file-server.grpc-web-enable" - tomlFlagGRPCWebAddress = "file-server.grpc-web-address" - cliFlagRemoveAfter = "remove-after" cliFlagFilePrefix = "file-prefix" cliFlagReadDir = "read-dir" @@ -33,6 +25,14 @@ const ( cliFlagGRPCAddress = "grpc-address" cliFlagGRPCWebEnable = "grpc-web-enable" cliFlagGRPCWebAddress = "grpc-web-address" + + tomlFlagRemoveAfter = "file-server.remove-after" + tomlFlagFilePrefix = "file-server.file-prefix" + tomlFlagReadDir = "file-server.read-dir" + tomlFlagChainID = "file-server.chain-id" + tomlFlagGRPCAddress = "file-server.grpc-address" + tomlFlagGRPCWebEnable = "file-server.grpc-web-enable" + tomlFlagGRPCWebAddress = "file-server.grpc-web-address" ) var ( diff --git a/store/streaming/file/server/config/config.go b/store/streaming/file/server/config/config.go index 1ba666b20c3..c285faa1225 100644 --- a/store/streaming/file/server/config/config.go +++ b/store/streaming/file/server/config/config.go @@ -20,7 +20,8 @@ const ( DefaultGRPCWebAddress = "0.0.0.0:9093" ) -type StateServerConfig struct { +// StateFileServerConfig contains configuration parameters for the state file server +type StateFileServerConfig struct { GRPCAddress string `mapstructure:"grpc-address"` GRPCWebEnabled bool `mapstructure:"grpc-web-enabled"` GRPCWebAddress string `mapstructure:"grpc-web-address"` @@ -32,18 +33,18 @@ type StateServerConfig struct { LogLevel string `mapstructure:"log-level"` } -// DefaultStateServerConfig returns the reference to ClientConfig with default values. -func DefaultStateServerConfig() *StateServerConfig { - return &StateServerConfig{ - DefaultGRPCAddress, - true, - DefaultGRPCWebAddress, - "", - DefaultReadDir, - "", - true, - "", - "info", +// DefaultStateFileServerConfig returns the reference to ClientConfig with default values. +func DefaultStateFileServerConfig() *StateFileServerConfig { + return &StateFileServerConfig{ + GRPCAddress: DefaultGRPCAddress, + GRPCWebEnabled: true, + GRPCWebAddress: DefaultGRPCWebAddress, + ChainID: "", + ReadDir: DefaultReadDir, + FilePrefix: "", + RemoveAfter: true, + LogFile: "", + LogLevel: "info", } } @@ -61,7 +62,7 @@ func init() { // WriteConfigFile renders config using the template and writes it to // configFilePath. -func WriteConfigFile(configFilePath string, config *StateServerConfig) { +func WriteConfigFile(configFilePath string, config *StateFileServerConfig) { var buffer bytes.Buffer if err := configTemplate.Execute(&buffer, config); err != nil { diff --git a/store/streaming/file/server/grpc/backend.go b/store/streaming/file/server/grpc/backend.go index b300756dc94..5d540fe6a28 100644 --- a/store/streaming/file/server/grpc/backend.go +++ b/store/streaming/file/server/grpc/backend.go @@ -21,15 +21,17 @@ import ( "github.com/tendermint/tendermint/libs/log" ) +// StateFileBackend performs the state file reading and filtering to service Handler requests type StateFileBackend struct { - conf *config.StateServerConfig + conf *config.StateFileServerConfig codec *codec.ProtoCodec logger log.Logger trimPrefix string quitChan <-chan struct{} } -func NewStateFileBackend(conf *config.StateServerConfig, codec *codec.ProtoCodec, logger log.Logger, quitChan <-chan struct{}) *StateFileBackend { +// NewStateFileBackend returns a new StateFileBackend +func NewStateFileBackend(conf *config.StateFileServerConfig, codec *codec.ProtoCodec, logger log.Logger, quitChan <-chan struct{}) *StateFileBackend { trimPrefix := "block-" if conf.FilePrefix != "" { trimPrefix = fmt.Sprintf("%s-%s", conf.FilePrefix, trimPrefix) @@ -45,38 +47,42 @@ func NewStateFileBackend(conf *config.StateServerConfig, codec *codec.ProtoCodec // StreamData streams the requested state file data // this streams new data as it is written to disk -func (sfb *StateFileBackend) StreamData(req *pb.StreamRequest, res chan<- *pb.StreamResponse) error { +func (sfb *StateFileBackend) StreamData(req *pb.StreamRequest, res chan<- *pb.StreamResponse) (error, <-chan struct{}) { w, err := fsnotify.NewWatcher() if err != nil { - return err + return err, nil } + + done := make(chan struct{}) go func() { defer w.Close() + defer close(done) for { select { case event, ok := <-w.Events: - if !ok { + if !ok || event.Op != fsnotify.CloseWrite { continue } - if event.Op == fsnotify.CloseWrite { - fileName := event.Name - readFlag := false - switch { - case strings.Contains(fileName, "begin") && req.BeginBlock: - readFlag = true - res <- sfb.formBeginBlockResponse(fileName, req.StoreKeys) - case strings.Contains(fileName, "tx") && req.DeliverTx: - readFlag = true - res <- sfb.formDeliverTxResponse(fileName, req.StoreKeys) - case strings.Contains(fileName, "end") && req.EndBlock: - readFlag = true - res <- sfb.formEndBlockResponse(fileName, req.StoreKeys) - default: - } - if sfb.conf.RemoveAfter && readFlag { - if err := os.Remove(filepath.Join(sfb.conf.ReadDir, fileName)); err != nil { - sfb.logger.Error("unable to remove state change file", "err", err) - } + + fileName := event.Name + if sfb.conf.FilePrefix != "" && !strings.HasPrefix(fileName, sfb.conf.FilePrefix) { + continue + } + + switch { + case strings.Contains(fileName, "begin") && req.BeginBlock: + res <- sfb.formBeginBlockResponse(fileName, req.StoreKeys) + case strings.Contains(fileName, "tx") && req.DeliverTx: + res <- sfb.formDeliverTxResponse(fileName, req.StoreKeys) + case strings.Contains(fileName, "end") && req.EndBlock: + res <- sfb.formEndBlockResponse(fileName, req.StoreKeys) + default: + continue + } + + if sfb.conf.RemoveAfter { + if err := os.Remove(filepath.Join(sfb.conf.ReadDir, fileName)); err != nil { + sfb.logger.Error("unable to remove state change file", "err", err) } } case err, ok := <-w.Errors: @@ -90,55 +96,61 @@ func (sfb *StateFileBackend) StreamData(req *pb.StreamRequest, res chan<- *pb.St } } }() - return nil + return nil, done } // BackFillData stream the requested state file data // this stream data that is already written to disk -func (sfb *StateFileBackend) BackFillData(req *pb.StreamRequest, res chan<- *pb.StreamResponse) error { +func (sfb *StateFileBackend) BackFillData(req *pb.StreamRequest, res chan<- *pb.StreamResponse) (error, <-chan struct{}) { f, err := os.Open(sfb.conf.ReadDir) if err != nil { - return err + return err, nil } files, err := f.Readdir(-1) if err != nil { - return err + return err, nil } sort.Sort(filesByTimeModified(files)) + + done := make(chan struct{}) go func() { + defer close(done) for _, f := range files { - select { - // short circuit if the parent processes are shutting down + select { // short circuit if the parent processes are shutting down case <-sfb.quitChan: sfb.logger.Info("quiting StateFileBackend BackFillData process") return default: } + if f.IsDir() { continue } + fileName := f.Name() - readFlag := false + if sfb.conf.FilePrefix != "" && !strings.HasPrefix(fileName, sfb.conf.FilePrefix) { + continue + } + switch { case strings.Contains(fileName, "begin") && req.BeginBlock: - readFlag = true res <- sfb.formBeginBlockResponse(fileName, req.StoreKeys) case strings.Contains(fileName, "tx") && req.DeliverTx: - readFlag = true res <- sfb.formDeliverTxResponse(fileName, req.StoreKeys) case strings.Contains(fileName, "end") && req.EndBlock: - readFlag = true res <- sfb.formEndBlockResponse(fileName, req.StoreKeys) default: + continue } - if sfb.conf.RemoveAfter && readFlag { + + if sfb.conf.RemoveAfter { if err := os.Remove(filepath.Join(sfb.conf.ReadDir, fileName)); err != nil { sfb.logger.Error("unable to remove state change file", "err", err) } } } }() - return nil + return nil, done } // BeginBlockDataAt returns a BeginBlockPayload for the provided BeginBlockRequest @@ -168,20 +180,6 @@ func (sfb *StateFileBackend) EndBlockDataAt(ctx context.Context, req *pb.EndBloc return sfb.formEndBlockPayload(fileName, req.StoreKeys) } -type filesByTimeModified []os.FileInfo - -func (fs filesByTimeModified) Len() int { - return len(fs) -} - -func (fs filesByTimeModified) Swap(i, j int) { - fs[i], fs[j] = fs[j], fs[i] -} - -func (fs filesByTimeModified) Less(i, j int) bool { - return fs[i].ModTime().Before(fs[j].ModTime()) -} - func (sfb *StateFileBackend) formBeginBlockResponse(fileName string, storeKeys []string) *pb.StreamResponse { res := new(pb.StreamResponse) res.ChainId = sfb.conf.ChainID @@ -192,6 +190,7 @@ func (sfb *StateFileBackend) formBeginBlockResponse(fileName string, storeKeys [ return res } res.Height = blockHeight + bbp, err := sfb.formBeginBlockPayload(fileName, storeKeys) if err != nil { res.Err = err.Error() @@ -213,14 +212,17 @@ func (sfb *StateFileBackend) formBeginBlockPayload(fileName string, storeKeys [] if len(messageBytes) < 2 { return nil, fmt.Errorf("expected at least two protobuf messages, got %d", len(messageBytes)) } + beginBlockReq := new(abci.RequestBeginBlock) if err := sfb.codec.Unmarshal(messageBytes[0], beginBlockReq); err != nil { return nil, err } + beginBlockRes := new(abci.ResponseBeginBlock) if err := sfb.codec.Unmarshal(messageBytes[len(messageBytes)-1], beginBlockRes); err != nil { return nil, err } + kvPairs := make([]*types.StoreKVPair, 0, len(messageBytes[1:len(messageBytes)-2])) for i := 1; i < len(messageBytes)-1; i++ { kvPair := new(types.StoreKVPair) @@ -231,6 +233,7 @@ func (sfb *StateFileBackend) formBeginBlockPayload(fileName string, storeKeys [] kvPairs = append(kvPairs, kvPair) } } + return &pb.BeginBlockPayload{ Request: beginBlockReq, Response: beginBlockRes, @@ -248,6 +251,7 @@ func (sfb *StateFileBackend) formDeliverTxResponse(fileName string, storeKeys [] return res } res.Height = blockHeight + dtp, err := sfb.formDeliverTxPayload(fileName, storeKeys) if err != nil { res.Err = err.Error() @@ -269,14 +273,17 @@ func (sfb *StateFileBackend) formDeliverTxPayload(fileName string, storeKeys []s if len(messageBytes) < 2 { return nil, fmt.Errorf("expected at least two protobuf messages, got %d", len(messageBytes)) } + deliverTxReq := new(abci.RequestDeliverTx) if err := sfb.codec.Unmarshal(messageBytes[0], deliverTxReq); err != nil { return nil, err } + deliverTxRes := new(abci.ResponseDeliverTx) if err := sfb.codec.Unmarshal(messageBytes[len(messageBytes)-1], deliverTxRes); err != nil { return nil, err } + kvPairs := make([]*types.StoreKVPair, 0, len(messageBytes[1:len(messageBytes)-2])) for i := 1; i < len(messageBytes)-1; i++ { kvPair := new(types.StoreKVPair) @@ -287,6 +294,7 @@ func (sfb *StateFileBackend) formDeliverTxPayload(fileName string, storeKeys []s kvPairs = append(kvPairs, kvPair) } } + return &pb.DeliverTxPayload{ Request: deliverTxReq, Response: deliverTxRes, @@ -304,6 +312,7 @@ func (sfb *StateFileBackend) formEndBlockResponse(fileName string, storeKeys []s return res } res.Height = blockHeight + ebp, err := sfb.formEndBlockPayload(fileName, storeKeys) if err != nil { res.Err = err.Error() @@ -325,14 +334,17 @@ func (sfb *StateFileBackend) formEndBlockPayload(fileName string, storeKeys []st if len(messageBytes) < 2 { return nil, fmt.Errorf("expected at least two protobuf messages, got %d", len(messageBytes)) } + endBlockReq := new(abci.RequestEndBlock) if err := sfb.codec.Unmarshal(messageBytes[0], endBlockReq); err != nil { return nil, err } + endBlockRes := new(abci.ResponseEndBlock) if err := sfb.codec.Unmarshal(messageBytes[len(messageBytes)-1], endBlockRes); err != nil { return nil, err } + kvPairs := make([]*types.StoreKVPair, 0, len(messageBytes[1:len(messageBytes)-2])) for i := 1; i < len(messageBytes)-1; i++ { kvPair := new(types.StoreKVPair) @@ -343,6 +355,7 @@ func (sfb *StateFileBackend) formEndBlockPayload(fileName string, storeKeys []st kvPairs = append(kvPairs, kvPair) } } + return &pb.EndBlockPayload{ Request: endBlockReq, Response: endBlockRes, @@ -365,3 +378,20 @@ func listIsEmptyOrContains(list []string, str string) bool { } return false } + +type filesByTimeModified []os.FileInfo + +// Len satisfies sort.Interface +func (fs filesByTimeModified) Len() int { + return len(fs) +} + +// Swap satisfies sort.Interface +func (fs filesByTimeModified) Swap(i, j int) { + fs[i], fs[j] = fs[j], fs[i] +} + +// Less satisfies sort.Interface +func (fs filesByTimeModified) Less(i, j int) bool { + return fs[i].ModTime().Before(fs[j].ModTime()) +} diff --git a/store/streaming/file/server/grpc/handler.go b/store/streaming/file/server/grpc/handler.go index 0823bcc05a7..9153c48873a 100644 --- a/store/streaming/file/server/grpc/handler.go +++ b/store/streaming/file/server/grpc/handler.go @@ -9,12 +9,13 @@ import ( "github.com/tendermint/tendermint/libs/log" ) +// Handler wraps the StateFileServer interface with an additional Stop() method type Handler interface { pb.StateFileServer Stop() } -// handler is the interface which exposes the StateFile Server methods +// handler is the struct which implements the Handler methods type handler struct { pb.UnimplementedStateFileServer backend *StateFileBackend @@ -23,7 +24,7 @@ type handler struct { } // NewHandler returns the object for the gRPC handler -func NewHandler(conf *config.StateServerConfig, codec *codec.ProtoCodec, logger log.Logger) (Handler, error) { +func NewHandler(conf *config.StateFileServerConfig, codec *codec.ProtoCodec, logger log.Logger) (Handler, error) { quitChan := make(chan struct{}) return &handler{ backend: NewStateFileBackend(conf, codec, logger, quitChan), @@ -32,11 +33,13 @@ func NewHandler(conf *config.StateServerConfig, codec *codec.ProtoCodec, logger }, nil } +// StreamData implements StateFileServer // StreamData streams the requested state file data // this streams new data as it is written to disk func (h *handler) StreamData(req *pb.StreamRequest, srv pb.StateFile_StreamDataServer) error { resChan := make(chan *pb.StreamResponse) - if err := h.backend.StreamData(req, resChan); err != nil { + err, done := h.backend.StreamData(req, resChan) + if err != nil { return err } for { @@ -45,17 +48,23 @@ func (h *handler) StreamData(req *pb.StreamRequest, srv pb.StateFile_StreamDataS if err := srv.Send(res); err != nil { h.logger.Error("StreamData send error", "err", err) } - case <-h.quitChan: + // if Close() is called the backend process will quit and close this channel + // so we don't need a select case for h.quitChan here + // this way we wait for the backend to finish sending before shutting down the handler + case <-done: + h.logger.Info("quiting handler StreamData process") return nil } } } -// BackFillData stream the requested state file data -// this stream data that is already written to disk +// BackFillData implements StateFileServer +// BackFillData streams the requested state file data +// this streams data that is already written to disk func (h *handler) BackFillData(req *pb.StreamRequest, srv pb.StateFile_BackFillDataServer) error { resChan := make(chan *pb.StreamResponse) - if err := h.backend.BackFillData(req, resChan); err != nil { + err, done := h.backend.BackFillData(req, resChan) + if err != nil { return err } for { @@ -64,22 +73,29 @@ func (h *handler) BackFillData(req *pb.StreamRequest, srv pb.StateFile_BackFillD if err := srv.Send(res); err != nil { h.logger.Error("BackFillData send error", "err", err) } - case <-h.quitChan: + // if Close() is called the backend process will quit and close this channel + // so we don't need a select case for h.quitChan here + // this way we wait for the backend to finish sending before shutting down the handler + case <-done: + h.logger.Info("quiting handler BackFillData process") return nil } } } +// BeginBlockDataAt implements StateFileServer // BeginBlockDataAt returns a BeginBlockPayload for the provided BeginBlockRequest func (h *handler) BeginBlockDataAt(ctx context.Context, req *pb.BeginBlockRequest) (*pb.BeginBlockPayload, error) { return h.backend.BeginBlockDataAt(ctx, req) } +// DeliverTxDataAt implements StateFileServer // DeliverTxDataAt returns a DeliverTxPayload for the provided BeginBlockRequest func (h *handler) DeliverTxDataAt(ctx context.Context, req *pb.DeliverTxRequest) (*pb.DeliverTxPayload, error) { return h.backend.DeliverTxDataAt(ctx, req) } +// EndBlockDataAt implements StateFileServer // EndBlockDataAt returns a EndBlockPayload for the provided EndBlockRequest func (h *handler) EndBlockDataAt(ctx context.Context, req *pb.EndBlockRequest) (*pb.EndBlockPayload, error) { return h.backend.EndBlockDataAt(ctx, req)