diff --git a/etc/sources/dirwatch.yaml b/etc/sources/dirwatch.yaml deleted file mode 100644 index 74d9113ff3..0000000000 --- a/etc/sources/dirwatch.yaml +++ /dev/null @@ -1,4 +0,0 @@ -default: - path: /example - allowedExtension: - - txt \ No newline at end of file diff --git a/internal/binder/io/builtin.go b/internal/binder/io/builtin.go index e499f71ecf..7967bad3ab 100644 --- a/internal/binder/io/builtin.go +++ b/internal/binder/io/builtin.go @@ -18,7 +18,6 @@ import ( "github.com/lf-edge/ekuiper/contract/v2/api" "github.com/lf-edge/ekuiper/v2/internal/binder" - "github.com/lf-edge/ekuiper/v2/internal/io/dirwatch" "github.com/lf-edge/ekuiper/v2/internal/io/file" "github.com/lf-edge/ekuiper/v2/internal/io/http" "github.com/lf-edge/ekuiper/v2/internal/io/http/httpserver" @@ -38,7 +37,6 @@ func init() { modules.RegisterSource("httppull", func() api.Source { return &http.HttpPullSource{} }) modules.RegisterSource("httppush", func() api.Source { return &http.HttpPushSource{} }) modules.RegisterSource("file", file.GetSource) - modules.RegisterSource("dirwatch", dirwatch.GetSource) modules.RegisterSource("memory", func() api.Source { return memory.GetSource() }) modules.RegisterSource("neuron", neuron.GetSource) modules.RegisterSource("websocket", func() api.Source { return websocket.GetSource() }) diff --git a/internal/io/dirwatch/source.go b/internal/io/dirwatch/source.go deleted file mode 100644 index 9b2bcb0935..0000000000 --- a/internal/io/dirwatch/source.go +++ /dev/null @@ -1,274 +0,0 @@ -// Copyright 2024 EMQ Technologies Co., Ltd. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package dirwatch - -import ( - "encoding/json" - "fmt" - "os" - "path/filepath" - "sort" - "strings" - "sync" - "time" - - "github.com/fsnotify/fsnotify" - "github.com/lf-edge/ekuiper/contract/v2/api" - - "github.com/lf-edge/ekuiper/v2/internal/conf" - "github.com/lf-edge/ekuiper/v2/pkg/cast" -) - -type FileDirSource struct { - config *FileDirSourceConfig - taskCh chan *FileSourceTask - fileContentCh chan []byte - - watcher *fsnotify.Watcher - rewindMeta *FileDirSourceRewindMeta - wg *sync.WaitGroup -} - -func (f *FileDirSource) Subscribe(ctx api.StreamContext, ingest api.TupleIngest, ingestError api.ErrorIngest) error { - f.wg.Add(2) - go f.startHandleTask(ctx, ingest, ingestError) - go f.handleFileDirNotify(ctx) - return f.readDirFile() -} - -type FileDirSourceConfig struct { - Path string `json:"path"` - AllowedExtension []string `json:"allowedExtension"` -} - -func (f *FileDirSource) Provision(ctx api.StreamContext, configs map[string]any) error { - c := &FileDirSourceConfig{} - if err := cast.MapToStruct(configs, c); err != nil { - return err - } - f.config = c - f.taskCh = make(chan *FileSourceTask, 1024) - f.fileContentCh = make(chan []byte, 1024) - watcher, err := fsnotify.NewWatcher() - if err != nil { - return err - } - f.watcher = watcher - f.rewindMeta = &FileDirSourceRewindMeta{} - if err := f.watcher.Add(f.config.Path); err != nil { - return err - } - conf.Log.Infof("start to watch %v, rule:%v", f.config.Path, ctx.GetRuleId()) - f.wg = &sync.WaitGroup{} - return nil -} - -func (f *FileDirSource) Close(ctx api.StreamContext) error { - f.watcher.Close() - f.wg.Wait() - return nil -} - -func (f *FileDirSource) Connect(ctx api.StreamContext, sch api.StatusChangeHandler) error { - return nil -} - -func (f *FileDirSource) handleFileDirNotify(ctx api.StreamContext) { - defer f.wg.Done() - for { - select { - case <-ctx.Done(): - return - case event, ok := <-f.watcher.Events: - if !ok { - return - } - switch { - case event.Has(fsnotify.Write): - f.taskCh <- &FileSourceTask{ - name: event.Name, - taskType: WriteFile, - } - case event.Has(fsnotify.Create): - f.taskCh <- &FileSourceTask{ - name: event.Name, - taskType: CreateFile, - } - } - case err, ok := <-f.watcher.Errors: - if !ok { - return - } - ctx.GetLogger().Errorf("dirwatch err:%v", err.Error()) - } - } -} - -func (f *FileDirSource) startHandleTask(ctx api.StreamContext, ingest api.TupleIngest, ingestError api.ErrorIngest) { - defer f.wg.Done() - for { - select { - case task := <-f.taskCh: - switch task.taskType { - case WriteFile, CreateFile: - f.ingestFileContent(ctx, task.name, ingest, ingestError) - } - case <-ctx.Done(): - return - } - } -} - -func (f *FileDirSource) ingestFileContent(ctx api.StreamContext, fileName string, ingest api.TupleIngest, ingestError api.ErrorIngest) { - if !checkFileExtension(fileName, f.config.AllowedExtension) { - return - } - willRead, modifyTime, err := f.checkFileRead(fileName) - if err != nil { - ingestError(ctx, err) - return - } - if willRead { - c, err := os.ReadFile(fileName) - if err != nil { - ingestError(ctx, fmt.Errorf("read file %s err: %v", fileName, err)) - return - } - message := make(map[string]interface{}) - message["filename"] = filepath.Base(fileName) - message["modifyTime"] = modifyTime.Unix() - message["content"] = c - f.updateRewindMeta(fileName, modifyTime) - ingest(ctx, message, nil, time.Now()) - } -} - -func (f *FileDirSource) checkFileRead(fileName string) (bool, time.Time, error) { - fInfo, err := os.Stat(fileName) - if err != nil { - return false, time.Time{}, err - } - if fInfo.IsDir() { - return false, time.Time{}, fmt.Errorf("%s is a directory", fileName) - } - fTime := fInfo.ModTime() - if fTime.After(f.rewindMeta.LastModifyTime) { - return true, fTime, nil - } - return false, time.Time{}, nil -} - -func (f *FileDirSource) updateRewindMeta(_ string, modifyTime time.Time) { - if modifyTime.After(f.rewindMeta.LastModifyTime) { - f.rewindMeta.LastModifyTime = modifyTime - } -} - -func (f *FileDirSource) GetOffset() (any, error) { - c, err := json.Marshal(f.rewindMeta) - return string(c), err -} - -func (f *FileDirSource) Rewind(offset any) error { - c, ok := offset.(string) - if !ok { - return fmt.Errorf("fileDirSource rewind failed") - } - f.rewindMeta = &FileDirSourceRewindMeta{} - if err := json.Unmarshal([]byte(c), f.rewindMeta); err != nil { - return err - } - return nil -} - -func (f *FileDirSource) ResetOffset(input map[string]any) error { - return fmt.Errorf("FileDirSource ResetOffset not supported") -} - -func (f *FileDirSource) readDirFile() error { - entries, err := os.ReadDir(f.config.Path) - if err != nil { - return err - } - files := make(FileWithTimeSlice, 0) - for _, entry := range entries { - if !entry.IsDir() { - fileName := entry.Name() - info, err := entry.Info() - if err != nil { - return err - } - files = append(files, FileWithTime{name: fileName, modifyTime: info.ModTime()}) - } - } - sort.Sort(files) - for _, file := range files { - f.taskCh <- &FileSourceTask{name: filepath.Join(f.config.Path, file.name), taskType: CreateFile} - } - return nil -} - -type FileWithTime struct { - name string - modifyTime time.Time -} - -type FileWithTimeSlice []FileWithTime - -func (f FileWithTimeSlice) Len() int { - return len(f) -} - -func (f FileWithTimeSlice) Less(i, j int) bool { - return f[i].modifyTime.Before(f[j].modifyTime) -} - -func (f FileWithTimeSlice) Swap(i, j int) { - f[i], f[j] = f[j], f[i] -} - -type FileSourceTask struct { - name string - previousName string - taskType FileTaskType -} - -type FileTaskType int - -const ( - CreateFile FileTaskType = iota - WriteFile -) - -type FileDirSourceRewindMeta struct { - LastModifyTime time.Time `json:"lastModifyTime"` -} - -func checkFileExtension(name string, allowedExtension []string) bool { - if len(allowedExtension) < 1 { - return true - } - fileExt := strings.TrimPrefix(filepath.Ext(name), ".") - for _, ext := range allowedExtension { - if fileExt == ext { - return true - } - } - return false -} - -func GetSource() api.Source { - return &FileDirSource{} -} diff --git a/internal/io/dirwatch/source_test.go b/internal/io/dirwatch/source_test.go deleted file mode 100644 index bcd672a20b..0000000000 --- a/internal/io/dirwatch/source_test.go +++ /dev/null @@ -1,79 +0,0 @@ -// Copyright 2024 EMQ Technologies Co., Ltd. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package dirwatch - -import ( - "encoding/json" - "os" - "testing" - "time" - - "github.com/lf-edge/ekuiper/contract/v2/api" - "github.com/stretchr/testify/require" - - mockContext "github.com/lf-edge/ekuiper/v2/pkg/mock/context" -) - -func TestFileDirSource(t *testing.T) { - f, err := os.Create("./test123.txt") - require.NoError(t, err) - _, err = f.Write([]byte("123")) - require.NoError(t, err) - f.Close() - defer func() { - require.NoError(t, os.Remove("./test123.txt")) - }() - path, err := os.Getwd() - require.NoError(t, err) - fileDirSource := &FileDirSource{} - c := map[string]interface{}{ - "path": path, - "allowedExtension": []string{"txt"}, - } - ctx, cancel := mockContext.NewMockContext("1", "2").WithCancel() - require.NoError(t, fileDirSource.Provision(ctx, c)) - require.NoError(t, fileDirSource.Connect(ctx, nil)) - output := make(chan any, 10) - require.NoError(t, fileDirSource.Subscribe(ctx, func(ctx api.StreamContext, data any, meta map[string]any, ts time.Time) { - output <- data - }, func(ctx api.StreamContext, err error) {})) - time.Sleep(10 * time.Millisecond) - got := <-output - gotM, ok := got.(map[string]interface{}) - require.True(t, ok) - require.Equal(t, []byte("123"), gotM["content"]) - offset, err := fileDirSource.GetOffset() - require.NoError(t, err) - meta := &FileDirSourceRewindMeta{} - require.NoError(t, json.Unmarshal([]byte(offset.(string)), meta)) - require.True(t, meta.LastModifyTime.After(time.Time{})) - require.Error(t, fileDirSource.ResetOffset(nil)) - require.NoError(t, fileDirSource.Rewind(offset)) - time.Sleep(10 * time.Millisecond) - cancel() - fileDirSource.Close(ctx) -} - -func TestCheckFileExtension(t *testing.T) { - require.True(t, checkFileExtension("test.txt", []string{})) - require.True(t, checkFileExtension("test.txt", []string{"txt", "jpg"})) - require.False(t, checkFileExtension("test.md", []string{"txt", "jpg"})) -} - -func TestRewind(t *testing.T) { - fileDirSource := &FileDirSource{} - require.Error(t, fileDirSource.Rewind(nil)) - require.Error(t, fileDirSource.Rewind("123")) -} diff --git a/usage.md b/usage.md deleted file mode 100644 index 107a11825d..0000000000 --- a/usage.md +++ /dev/null @@ -1,57 +0,0 @@ -## Getting Started - -### Create Conf for dirwatch Source - -PUT /metadata/sources/dirwatch/confKeys/watch1 - -```json -{ - "path": "/Users/yisa/Downloads/Github/emqx/ekuiper/_build/watch", - "allowedExtension": ["txt"] -} -``` - -### Create Stream for dirwatch Source - -POST /streams - -```json -{ - "sql":" CREATE stream watch () WITH (TYPE=\"dirwatch\",CONF_KEY=\"watch1\");" -} -``` - -### Create Rule for dirwatch Source - -POST /rules - -```json -{ - "id": "rule1", - "sql": "SELECT * from watch", - "actions": [ - { - "log": { - } - } - ], - "options": { - "qos":1, - "checkpointInterval": "1s" - } -} -``` - -### Create File in dir - -Create test.txt file in Dir - -```txt -123 -``` - -Recv Sink like below: - -```json -{"content":"MTIz","filename":"test.txt","modifyTime":1732241987} -```