diff --git a/etc/sources/dirwatch.yaml b/etc/sources/dirwatch.yaml deleted file mode 100644 index 74d9113ff3..0000000000 --- a/etc/sources/dirwatch.yaml +++ /dev/null @@ -1,4 +0,0 @@ -default: - path: /example - allowedExtension: - - txt \ No newline at end of file diff --git a/internal/binder/io/builtin.go b/internal/binder/io/builtin.go index e499f71ecf..7967bad3ab 100644 --- a/internal/binder/io/builtin.go +++ b/internal/binder/io/builtin.go @@ -18,7 +18,6 @@ import ( "github.com/lf-edge/ekuiper/contract/v2/api" "github.com/lf-edge/ekuiper/v2/internal/binder" - "github.com/lf-edge/ekuiper/v2/internal/io/dirwatch" "github.com/lf-edge/ekuiper/v2/internal/io/file" "github.com/lf-edge/ekuiper/v2/internal/io/http" "github.com/lf-edge/ekuiper/v2/internal/io/http/httpserver" @@ -38,7 +37,6 @@ func init() { modules.RegisterSource("httppull", func() api.Source { return &http.HttpPullSource{} }) modules.RegisterSource("httppush", func() api.Source { return &http.HttpPushSource{} }) modules.RegisterSource("file", file.GetSource) - modules.RegisterSource("dirwatch", dirwatch.GetSource) modules.RegisterSource("memory", func() api.Source { return memory.GetSource() }) modules.RegisterSource("neuron", neuron.GetSource) modules.RegisterSource("websocket", func() api.Source { return websocket.GetSource() }) diff --git a/internal/io/dirwatch/source.go b/internal/io/dirwatch/source.go deleted file mode 100644 index 9b2bcb0935..0000000000 --- a/internal/io/dirwatch/source.go +++ /dev/null @@ -1,274 +0,0 @@ -// Copyright 2024 EMQ Technologies Co., Ltd. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package dirwatch - -import ( - "encoding/json" - "fmt" - "os" - "path/filepath" - "sort" - "strings" - "sync" - "time" - - "github.com/fsnotify/fsnotify" - "github.com/lf-edge/ekuiper/contract/v2/api" - - "github.com/lf-edge/ekuiper/v2/internal/conf" - "github.com/lf-edge/ekuiper/v2/pkg/cast" -) - -type FileDirSource struct { - config *FileDirSourceConfig - taskCh chan *FileSourceTask - fileContentCh chan []byte - - watcher *fsnotify.Watcher - rewindMeta *FileDirSourceRewindMeta - wg *sync.WaitGroup -} - -func (f *FileDirSource) Subscribe(ctx api.StreamContext, ingest api.TupleIngest, ingestError api.ErrorIngest) error { - f.wg.Add(2) - go f.startHandleTask(ctx, ingest, ingestError) - go f.handleFileDirNotify(ctx) - return f.readDirFile() -} - -type FileDirSourceConfig struct { - Path string `json:"path"` - AllowedExtension []string `json:"allowedExtension"` -} - -func (f *FileDirSource) Provision(ctx api.StreamContext, configs map[string]any) error { - c := &FileDirSourceConfig{} - if err := cast.MapToStruct(configs, c); err != nil { - return err - } - f.config = c - f.taskCh = make(chan *FileSourceTask, 1024) - f.fileContentCh = make(chan []byte, 1024) - watcher, err := fsnotify.NewWatcher() - if err != nil { - return err - } - f.watcher = watcher - f.rewindMeta = &FileDirSourceRewindMeta{} - if err := f.watcher.Add(f.config.Path); err != nil { - return err - } - conf.Log.Infof("start to watch %v, rule:%v", f.config.Path, ctx.GetRuleId()) - f.wg = &sync.WaitGroup{} - return nil -} - -func (f *FileDirSource) Close(ctx api.StreamContext) error { - f.watcher.Close() - f.wg.Wait() - return nil -} - -func (f *FileDirSource) Connect(ctx api.StreamContext, sch api.StatusChangeHandler) error { - return nil -} - -func (f *FileDirSource) handleFileDirNotify(ctx api.StreamContext) { - defer f.wg.Done() - for { - select { - case <-ctx.Done(): - return - case event, ok := <-f.watcher.Events: - if !ok { - return - } - switch { - case event.Has(fsnotify.Write): - f.taskCh <- &FileSourceTask{ - name: event.Name, - taskType: WriteFile, - } - case event.Has(fsnotify.Create): - f.taskCh <- &FileSourceTask{ - name: event.Name, - taskType: CreateFile, - } - } - case err, ok := <-f.watcher.Errors: - if !ok { - return - } - ctx.GetLogger().Errorf("dirwatch err:%v", err.Error()) - } - } -} - -func (f *FileDirSource) startHandleTask(ctx api.StreamContext, ingest api.TupleIngest, ingestError api.ErrorIngest) { - defer f.wg.Done() - for { - select { - case task := <-f.taskCh: - switch task.taskType { - case WriteFile, CreateFile: - f.ingestFileContent(ctx, task.name, ingest, ingestError) - } - case <-ctx.Done(): - return - } - } -} - -func (f *FileDirSource) ingestFileContent(ctx api.StreamContext, fileName string, ingest api.TupleIngest, ingestError api.ErrorIngest) { - if !checkFileExtension(fileName, f.config.AllowedExtension) { - return - } - willRead, modifyTime, err := f.checkFileRead(fileName) - if err != nil { - ingestError(ctx, err) - return - } - if willRead { - c, err := os.ReadFile(fileName) - if err != nil { - ingestError(ctx, fmt.Errorf("read file %s err: %v", fileName, err)) - return - } - message := make(map[string]interface{}) - message["filename"] = filepath.Base(fileName) - message["modifyTime"] = modifyTime.Unix() - message["content"] = c - f.updateRewindMeta(fileName, modifyTime) - ingest(ctx, message, nil, time.Now()) - } -} - -func (f *FileDirSource) checkFileRead(fileName string) (bool, time.Time, error) { - fInfo, err := os.Stat(fileName) - if err != nil { - return false, time.Time{}, err - } - if fInfo.IsDir() { - return false, time.Time{}, fmt.Errorf("%s is a directory", fileName) - } - fTime := fInfo.ModTime() - if fTime.After(f.rewindMeta.LastModifyTime) { - return true, fTime, nil - } - return false, time.Time{}, nil -} - -func (f *FileDirSource) updateRewindMeta(_ string, modifyTime time.Time) { - if modifyTime.After(f.rewindMeta.LastModifyTime) { - f.rewindMeta.LastModifyTime = modifyTime - } -} - -func (f *FileDirSource) GetOffset() (any, error) { - c, err := json.Marshal(f.rewindMeta) - return string(c), err -} - -func (f *FileDirSource) Rewind(offset any) error { - c, ok := offset.(string) - if !ok { - return fmt.Errorf("fileDirSource rewind failed") - } - f.rewindMeta = &FileDirSourceRewindMeta{} - if err := json.Unmarshal([]byte(c), f.rewindMeta); err != nil { - return err - } - return nil -} - -func (f *FileDirSource) ResetOffset(input map[string]any) error { - return fmt.Errorf("FileDirSource ResetOffset not supported") -} - -func (f *FileDirSource) readDirFile() error { - entries, err := os.ReadDir(f.config.Path) - if err != nil { - return err - } - files := make(FileWithTimeSlice, 0) - for _, entry := range entries { - if !entry.IsDir() { - fileName := entry.Name() - info, err := entry.Info() - if err != nil { - return err - } - files = append(files, FileWithTime{name: fileName, modifyTime: info.ModTime()}) - } - } - sort.Sort(files) - for _, file := range files { - f.taskCh <- &FileSourceTask{name: filepath.Join(f.config.Path, file.name), taskType: CreateFile} - } - return nil -} - -type FileWithTime struct { - name string - modifyTime time.Time -} - -type FileWithTimeSlice []FileWithTime - -func (f FileWithTimeSlice) Len() int { - return len(f) -} - -func (f FileWithTimeSlice) Less(i, j int) bool { - return f[i].modifyTime.Before(f[j].modifyTime) -} - -func (f FileWithTimeSlice) Swap(i, j int) { - f[i], f[j] = f[j], f[i] -} - -type FileSourceTask struct { - name string - previousName string - taskType FileTaskType -} - -type FileTaskType int - -const ( - CreateFile FileTaskType = iota - WriteFile -) - -type FileDirSourceRewindMeta struct { - LastModifyTime time.Time `json:"lastModifyTime"` -} - -func checkFileExtension(name string, allowedExtension []string) bool { - if len(allowedExtension) < 1 { - return true - } - fileExt := strings.TrimPrefix(filepath.Ext(name), ".") - for _, ext := range allowedExtension { - if fileExt == ext { - return true - } - } - return false -} - -func GetSource() api.Source { - return &FileDirSource{} -} diff --git a/internal/io/dirwatch/source_test.go b/internal/io/dirwatch/source_test.go deleted file mode 100644 index bcd672a20b..0000000000 --- a/internal/io/dirwatch/source_test.go +++ /dev/null @@ -1,79 +0,0 @@ -// Copyright 2024 EMQ Technologies Co., Ltd. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package dirwatch - -import ( - "encoding/json" - "os" - "testing" - "time" - - "github.com/lf-edge/ekuiper/contract/v2/api" - "github.com/stretchr/testify/require" - - mockContext "github.com/lf-edge/ekuiper/v2/pkg/mock/context" -) - -func TestFileDirSource(t *testing.T) { - f, err := os.Create("./test123.txt") - require.NoError(t, err) - _, err = f.Write([]byte("123")) - require.NoError(t, err) - f.Close() - defer func() { - require.NoError(t, os.Remove("./test123.txt")) - }() - path, err := os.Getwd() - require.NoError(t, err) - fileDirSource := &FileDirSource{} - c := map[string]interface{}{ - "path": path, - "allowedExtension": []string{"txt"}, - } - ctx, cancel := mockContext.NewMockContext("1", "2").WithCancel() - require.NoError(t, fileDirSource.Provision(ctx, c)) - require.NoError(t, fileDirSource.Connect(ctx, nil)) - output := make(chan any, 10) - require.NoError(t, fileDirSource.Subscribe(ctx, func(ctx api.StreamContext, data any, meta map[string]any, ts time.Time) { - output <- data - }, func(ctx api.StreamContext, err error) {})) - time.Sleep(10 * time.Millisecond) - got := <-output - gotM, ok := got.(map[string]interface{}) - require.True(t, ok) - require.Equal(t, []byte("123"), gotM["content"]) - offset, err := fileDirSource.GetOffset() - require.NoError(t, err) - meta := &FileDirSourceRewindMeta{} - require.NoError(t, json.Unmarshal([]byte(offset.(string)), meta)) - require.True(t, meta.LastModifyTime.After(time.Time{})) - require.Error(t, fileDirSource.ResetOffset(nil)) - require.NoError(t, fileDirSource.Rewind(offset)) - time.Sleep(10 * time.Millisecond) - cancel() - fileDirSource.Close(ctx) -} - -func TestCheckFileExtension(t *testing.T) { - require.True(t, checkFileExtension("test.txt", []string{})) - require.True(t, checkFileExtension("test.txt", []string{"txt", "jpg"})) - require.False(t, checkFileExtension("test.md", []string{"txt", "jpg"})) -} - -func TestRewind(t *testing.T) { - fileDirSource := &FileDirSource{} - require.Error(t, fileDirSource.Rewind(nil)) - require.Error(t, fileDirSource.Rewind("123")) -} diff --git a/internal/io/file/source.go b/internal/io/file/source.go index e28833af3f..5c355cefa3 100644 --- a/internal/io/file/source.go +++ b/internal/io/file/source.go @@ -17,12 +17,13 @@ package file import ( "bufio" "bytes" + "encoding/json" "errors" "fmt" "io" "os" "path/filepath" - "sync" + "sort" "time" "github.com/lf-edge/ekuiper/contract/v2/api" @@ -42,7 +43,6 @@ type SourceConfig struct { Path string `json:"path"` Interval cast.DurationConf `json:"interval"` IsTable bool `json:"isTable"` - Parallel bool `json:"parallel"` SendInterval cast.DurationConf `json:"sendInterval"` ActionAfterRead int `json:"actionAfterRead"` MoveTo string `json:"moveTo"` @@ -50,6 +50,8 @@ type SourceConfig struct { IgnoreEndLines int `json:"ignoreEndLines"` // Only use for planning Decompression string `json:"decompression"` + // state + rewindMeta *FileDirSourceRewindMeta } // Source load data from file system. @@ -64,6 +66,8 @@ type Source struct { // attach to a reader decorator modules.FileStreamDecorator eof api.EOFIngest + // rewind support state + rewindMeta *FileDirSourceRewindMeta } func (fs *Source) Provision(ctx api.StreamContext, props map[string]any) error { @@ -151,10 +155,11 @@ func (fs *Source) Provision(ctx api.StreamContext, props map[string]any) error { } fs.decorator = decorator } + fs.rewindMeta = &FileDirSourceRewindMeta{} return nil } -func (fs *Source) Connect(ctx api.StreamContext, sch api.StatusChangeHandler) error { +func (fs *Source) Connect(_ api.StreamContext, sch api.StatusChangeHandler) error { sch(api.ConnectionConnected, "") return nil } @@ -179,42 +184,59 @@ func (fs *Source) Close(ctx api.StreamContext) error { return nil } +type WithTime struct { + name string + modifyTime time.Time +} + +type WithTimeSlice []WithTime + +func (f WithTimeSlice) Len() int { + return len(f) +} + +func (f WithTimeSlice) Less(i, j int) bool { + return f[i].modifyTime.Before(f[j].modifyTime) +} + +func (f WithTimeSlice) Swap(i, j int) { + f[i], f[j] = f[j], f[i] +} + func (fs *Source) Load(ctx api.StreamContext, ingest api.TupleIngest, ingestError api.ErrorIngest) { if fs.isDir { - ctx.GetLogger().Debugf("Monitor dir %s", fs.file) + ctx.GetLogger().Debugf("Load dir %s", fs.file) entries, err := os.ReadDir(fs.file) // may be just forget to put in the file if err != nil { ingestError(ctx, err) } - if fs.config.Parallel { - var wg sync.WaitGroup - for _, entry := range entries { - if entry.IsDir() { - continue - } - wg.Add(1) - go func(file string) { - e := infra.SafeRun(func() error { - defer wg.Done() - fs.parseFile(ctx, file, ingest, ingestError) - return nil - }) - if e != nil { - ingestError(ctx, e) - } - }(filepath.Join(fs.file, entry.Name())) + files := make(WithTimeSlice, 0, len(entries)) + for _, entry := range entries { + if entry.IsDir() { + continue } - wg.Wait() - } else { - for _, entry := range entries { - if entry.IsDir() { - continue - } - file := filepath.Join(fs.file, entry.Name()) - fs.parseFile(ctx, file, ingest, ingestError) + fileName := entry.Name() + info, err := entry.Info() + if err != nil { + ctx.GetLogger().Errorf("get file info for %s error: %v", fileName, err) + continue + } + path := filepath.Join(fs.file, fileName) + willRead, _, err := fs.checkFileRead(path) + if err != nil { + ingestError(ctx, err) + return + } + if willRead { + files = append(files, WithTime{name: path, modifyTime: info.ModTime()}) } } + sort.Sort(files) + for _, entry := range files { + fs.parseFile(ctx, entry.name, ingest, ingestError) + fs.updateRewindMeta(entry.name, entry.modifyTime) + } } else { fs.parseFile(ctx, fs.file, ingest, ingestError) } @@ -391,6 +413,54 @@ func (fs *Source) TransformType() api.Source { return fs } +/// Rewind support + +type FileDirSourceRewindMeta struct { + LastModifyTime time.Time `json:"lastModifyTime"` +} + +func (fs *Source) GetOffset() (any, error) { + c, err := json.Marshal(fs.rewindMeta) + return string(c), err +} + +func (fs *Source) Rewind(offset any) error { + c, ok := offset.(string) + if !ok { + return fmt.Errorf("fileDirSource rewind failed") + } + fs.rewindMeta = &FileDirSourceRewindMeta{} + if err := json.Unmarshal([]byte(c), fs.rewindMeta); err != nil { + return err + } + return nil +} + +func (fs *Source) ResetOffset(_ map[string]any) error { + return fmt.Errorf("File source ResetOffset not supported") +} + +func (fs *Source) checkFileRead(fileName string) (bool, time.Time, error) { + fInfo, err := os.Stat(fileName) + if err != nil { + return false, time.Time{}, err + } + if fInfo.IsDir() { + return false, time.Time{}, fmt.Errorf("%s is a directory", fileName) + } + fTime := fInfo.ModTime() + if fTime.After(fs.rewindMeta.LastModifyTime) { + return true, fTime, nil + } + return false, time.Time{}, nil +} + +func (fs *Source) updateRewindMeta(_ string, modifyTime time.Time) { + if modifyTime.After(fs.rewindMeta.LastModifyTime) { + fs.rewindMeta.LastModifyTime = modifyTime + } +} + func GetSource() api.Source { return &Source{} } @@ -401,4 +471,5 @@ var ( // if interval is not set, it uses inotify _ api.Bounded = &Source{} _ model.InfoNode = &Source{} + _ api.Rewindable = &Source{} ) diff --git a/internal/io/file/source_test.go b/internal/io/file/source_test.go index 758081580a..13b10b808f 100644 --- a/internal/io/file/source_test.go +++ b/internal/io/file/source_test.go @@ -366,7 +366,6 @@ func TestIntervalAndDir(t *testing.T) { model.NewDefaultRawTupleIgnoreTs([]byte("{\"id\": 2,\"name\": \"Jane Doe\"}"), meta), model.NewDefaultRawTupleIgnoreTs([]byte("{\"id\": 3,\"name\": \"John Smith\"}"), meta), model.NewDefaultRawTupleIgnoreTs([]byte("[{\"id\": 4,\"name\": \"John Smith\"},{\"id\": 5,\"name\": \"John Smith\"}]"), meta), - model.NewDefaultRawTupleIgnoreTs([]byte("{\"id\": 2,\"name\": \"Jane Doe\"}"), meta), } r := GetSource() mock.TestSourceConnector(t, r, map[string]any{ @@ -374,7 +373,6 @@ func TestIntervalAndDir(t *testing.T) { "fileType": "lines", "interval": "1s", "sendInterval": "100ms", - "parallel": true, "ignoreStartLines": 1, // only for test "ignoreTs": true, diff --git a/pkg/mock/test_source.go b/pkg/mock/test_source.go index 841c01389a..5b6c299f89 100644 --- a/pkg/mock/test_source.go +++ b/pkg/mock/test_source.go @@ -16,7 +16,6 @@ package mock import ( "fmt" - "log" "sync" "sync/atomic" "testing" @@ -83,7 +82,7 @@ func TestSourceConnectorCompare(t *testing.T, r api.Source, props map[string]any ) wg.Add(1) ingestErr := func(ctx api.StreamContext, err error) { - log.Println(err) + ctx.GetLogger().Error(err) e = err limit-- if limit == 0 { diff --git a/usage.md b/usage.md deleted file mode 100644 index 107a11825d..0000000000 --- a/usage.md +++ /dev/null @@ -1,57 +0,0 @@ -## Getting Started - -### Create Conf for dirwatch Source - -PUT /metadata/sources/dirwatch/confKeys/watch1 - -```json -{ - "path": "/Users/yisa/Downloads/Github/emqx/ekuiper/_build/watch", - "allowedExtension": ["txt"] -} -``` - -### Create Stream for dirwatch Source - -POST /streams - -```json -{ - "sql":" CREATE stream watch () WITH (TYPE=\"dirwatch\",CONF_KEY=\"watch1\");" -} -``` - -### Create Rule for dirwatch Source - -POST /rules - -```json -{ - "id": "rule1", - "sql": "SELECT * from watch", - "actions": [ - { - "log": { - } - } - ], - "options": { - "qos":1, - "checkpointInterval": "1s" - } -} -``` - -### Create File in dir - -Create test.txt file in Dir - -```txt -123 -``` - -Recv Sink like below: - -```json -{"content":"MTIz","filename":"test.txt","modifyTime":1732241987} -```