Skip to content

Commit

Permalink
refactor: support plugins to be built in (lf-edge#2295)
Browse files Browse the repository at this point in the history
Signed-off-by: Jiyong Huang <huangjy@emqx.io>
Signed-off-by: Rory Z <16801068+Rory-Z@users.noreply.github.com>
Co-authored-by: Rory Z <16801068+Rory-Z@users.noreply.github.com>
  • Loading branch information
ngjaying and Rory-Z committed Oct 9, 2023
1 parent 9ef75df commit 6a9171c
Show file tree
Hide file tree
Showing 40 changed files with 2,350 additions and 1,918 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/build_packages.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ jobs:
--env KUIPER_SOURCE='/ekuiper' \
--platform ${{ matrix.arch }} \
ghcr.io/lf-edge/ekuiper/base:${{ matrix.golang }}-${{ matrix.os }} \
bash -euc "git config --global --add safe.directory /ekuiper && make pkg && make pkg_core && .github/scripts/test.sh"
bash -euc "git config --global --add safe.directory /ekuiper && make pkg && make pkg_core && make pkg_full && .github/scripts/test.sh"
- name: build
if: matrix.os == 'centos'
run: |
Expand Down
35 changes: 35 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,33 @@ build_core: build_prepare
@mv ./kuiperd $(BUILD_PATH)/$(PACKAGE_NAME)/bin
@echo "Build successfully"

PLUGINS_IN_FULL := \
extensions/sinks/influx \
extensions/sinks/influx2 \
extensions/sinks/zmq \
extensions/sinks/kafka \
extensions/sinks/image \
extensions/sinks/sql \
extensions/sources/random \
extensions/sources/zmq \
extensions/sources/sql \
extensions/sources/video

.PHONY: build_full
build_full: SHELL:=/bin/bash -euo pipefail
build_full: build_prepare
GO111MODULE=on CGO_ENABLED=0 go build -trimpath -ldflags="-s -w -X main.Version=$(VERSION) -X main.LoadFileType=relative" -o kuiper cmd/kuiper/main.go
GO111MODULE=on CGO_ENABLED=1 go build -trimpath -ldflags="-s -w -X main.Version=$(VERSION) -X main.LoadFileType=relative" -tags "full include_nats_messaging" -o kuiperd cmd/kuiperd/main.go
@if [ "$$(uname -s)" = "Linux" ] && [ ! -z $$(which upx) ]; then upx ./kuiper; upx ./kuiperd; fi
@mv ./kuiper ./kuiperd $(BUILD_PATH)/$(PACKAGE_NAME)/bin
@while read plugin; do \
while read line; do \
type=$$(echo $$(dirname $$line) | cut -d'/' -f2); \
cp -r $$line $(BUILD_PATH)/$(PACKAGE_NAME)/etc/$$type/$$(basename $$line); \
done < <(find $$plugin -type f \( -name "*.json" -o -name "*.yaml" \)); \
done < <(echo $(PLUGINS_IN_FULL))
@echo "Build successfully"

.PHONY: pkg_core
pkg_core: build_core
@mkdir -p $(PACKAGES_PATH)
Expand All @@ -84,6 +111,14 @@ pkg_core: build_core
@mv $(BUILD_PATH)/$(PACKAGE_NAME)-core.zip $(BUILD_PATH)/$(PACKAGE_NAME)-core.tar.gz $(PACKAGES_PATH)
@echo "Package core success"

.PHONY: pkg_full
pkg_full: build_full
@mkdir -p $(PACKAGES_PATH)
@cd $(BUILD_PATH) && zip -rq $(PACKAGE_NAME)-full.zip $(PACKAGE_NAME)
@cd $(BUILD_PATH) && tar -czf $(PACKAGE_NAME)-full.tar.gz $(PACKAGE_NAME)
@mv $(BUILD_PATH)/$(PACKAGE_NAME)-full.zip $(BUILD_PATH)/$(PACKAGE_NAME)-full.tar.gz $(PACKAGES_PATH)
@echo "Package full success"

.PHONY: real_pkg
real_pkg:
@mkdir -p $(PACKAGES_PATH)
Expand Down
2 changes: 1 addition & 1 deletion build-plugins.sh
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ build(){
;;
* )
go build -trimpath --buildmode=plugin -o extensions/$PLUGIN_TYPE/$PLUGIN_NAME/$PLUGIN_NAME@$VERSION.so extensions/$PLUGIN_TYPE/$PLUGIN_NAME/*.go
;;
;;
esac
}

Expand Down
217 changes: 217 additions & 0 deletions extensions/sinks/image/ext/image.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,217 @@
// Copyright 2021-2023 EMQ Technologies Co., Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package image

import (
"bytes"
"context"
"fmt"
"image/jpeg"
"image/png"
"os"
"path/filepath"
"strings"
"time"

"github.com/lf-edge/ekuiper/pkg/api"
)

type imageSink struct {
path string
format string
maxAge int
maxCount int
cancel context.CancelFunc
}

func (m *imageSink) Configure(props map[string]interface{}) error {
if i, ok := props["imageFormat"]; ok {
if i, ok := i.(string); ok {
if "png" != i && "jpeg" != i {
return fmt.Errorf("%s image type is not currently supported", i)
}
m.format = i
}
} else {
return fmt.Errorf("Field not found format.")
}

if i, ok := props["path"]; ok {
if ii, ok := i.(string); ok {
m.path = ii
} else {
return fmt.Errorf("%v image type is not supported", i)
}
} else {
return fmt.Errorf("Field not found path.")
}

m.maxAge = 72
if i, ok := props["maxAge"]; ok {
if i, ok := i.(int); ok {
m.maxAge = i
}
}
m.maxCount = 1000
if i, ok := props["maxCount"]; ok {
if i, ok := i.(int); ok {
m.maxCount = i
}
}
return nil
}

func (m *imageSink) Open(ctx api.StreamContext) error {
logger := ctx.GetLogger()
logger.Debug("Opening image sink")

if _, err := os.Stat(m.path); os.IsNotExist(err) {
if err := os.MkdirAll(m.path, os.ModePerm); nil != err {
return fmt.Errorf("fail to open image sink for %v", err)
}
}

t := time.NewTicker(time.Duration(3) * time.Minute)
exeCtx, cancel := ctx.WithCancel()
m.cancel = cancel
go func() {
defer t.Stop()
for {
select {
case <-t.C:
m.delFile(logger)
case <-exeCtx.Done():
logger.Info("image sink done")
return
}
}
}()
return nil
}

func (m *imageSink) delFile(logger api.Logger) error {
dirEntries, err := os.ReadDir(m.path)
if nil != err || 0 == len(dirEntries) {
return err
}

files := make([]os.FileInfo, 0, len(dirEntries))
for _, entry := range dirEntries {
info, err := entry.Info()
if err != nil {
continue
}
files = append(files, info)
}

pos := m.maxCount
delTime := time.Now().Add(time.Duration(0-m.maxAge) * time.Hour)
for i := 0; i < len(files); i++ {
for j := i + 1; j < len(files); j++ {
if files[i].ModTime().Before(files[j].ModTime()) {
files[i], files[j] = files[j], files[i]
}
}
if files[i].ModTime().Before(delTime) && i < pos {
pos = i
break
}
}

for i := pos; i < len(files); i++ {
fname := files[i].Name()
if strings.HasSuffix(fname, m.format) {
fpath := filepath.Join(m.path, fname)
os.Remove(fpath)
}
}
return nil
}

func (m *imageSink) getSuffix() string {
now := time.Now()
year, month, day := now.Date()
hour, minute, second := now.Clock()
nsecond := now.Nanosecond()
return fmt.Sprintf(`%d-%d-%d_%d-%d-%d-%d`, year, month, day, hour, minute, second, nsecond)
}

func (m *imageSink) saveFile(b []byte, fpath string) error {
reader := bytes.NewReader(b)
fp, err := os.Create(fpath)
if nil != err {
return err
}
defer fp.Close()
if "png" == m.format {
if img, err := png.Decode(reader); nil != err {
return err
} else if err = png.Encode(fp, img); nil != err {
return err
}
} else if "jpeg" == m.format {
if img, err := jpeg.Decode(reader); nil != err {
return err
} else if err = jpeg.Encode(fp, img, nil); nil != err {
return err
}
}
return nil
}

func (m *imageSink) saveFiles(images map[string]interface{}) error {
for k, v := range images {
image, ok := v.([]byte)
if !ok {
return fmt.Errorf("found none bytes data %v for path %s", image, k)
}
suffix := m.getSuffix()
fname := fmt.Sprintf(`%s%s.%s`, k, suffix, m.format)
fpath := filepath.Join(m.path, fname)
m.saveFile(image, fpath)
}
return nil
}

func (m *imageSink) Collect(ctx api.StreamContext, item interface{}) error {
logger := ctx.GetLogger()
switch v := item.(type) {
case []map[string]interface{}:
var outer error
for _, vm := range v {
err := m.saveFiles(vm)
if err != nil {
outer = err
logger.Error(err)
}
}
return outer
case map[string]interface{}:
return m.saveFiles(v)
default:
return fmt.Errorf("image sink receive invalid data %v", item)
}
}

func (m *imageSink) Close(ctx api.StreamContext) error {
if m.cancel != nil {
m.cancel()
}
return m.delFile(ctx.GetLogger())
}

func GetSink() api.Sink {
return &imageSink{}
}
Loading

0 comments on commit 6a9171c

Please sign in to comment.