Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support manage connection #2867

Merged
merged 24 commits into from
May 27, 2024
Merged
258 changes: 258 additions & 0 deletions internal/io/connection/connection_pool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,258 @@
// Copyright 2024 EMQ Technologies Co., Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package connection

import (
"encoding/json"
"fmt"
"strings"
"sync"

"github.com/lf-edge/ekuiper/contract/v2/api"
"github.com/lf-edge/ekuiper/v2/internal/pkg/store"
"github.com/lf-edge/ekuiper/v2/internal/topo/context"
"github.com/lf-edge/ekuiper/v2/pkg/kv"
"github.com/lf-edge/ekuiper/v2/pkg/modules"
)

var isTest bool

func GetAllConnectionsID() []string {
globalConnectionManager.RLock()
defer globalConnectionManager.RUnlock()
ids := make([]string, 0)
for key := range globalConnectionManager.connectionPool {
ids = append(ids, key)
}
return ids
}

func PingConnection(ctx api.StreamContext, id string) error {
conn, err := GetNameConnection(id)
if err != nil {
return err
}
return conn.Ping(ctx)
}

func GetNameConnection(selId string) (modules.Connection, error) {
if selId == "" {
return nil, fmt.Errorf("connection id should be defined")
}
globalConnectionManager.RLock()
defer globalConnectionManager.RUnlock()
meta, ok := globalConnectionManager.connectionPool[selId]
if !ok {
return nil, fmt.Errorf("connection %s not existed", selId)
}
return meta.conn, nil
}

func CreateNamedConnection(ctx api.StreamContext, id, typ string, props map[string]any) (modules.Connection, error) {
if id == "" || typ == "" {
return nil, fmt.Errorf("connection id and type should be defined")
}
globalConnectionManager.Lock()
defer globalConnectionManager.Unlock()
_, ok := globalConnectionManager.connectionPool[id]
if ok {
return nil, fmt.Errorf("connection %v already been created", id)
}
meta := ConnectionMeta{
ID: id,
Typ: typ,
Props: props,
}
if !isTest {
b, err := json.Marshal(meta)
if err != nil {
return nil, err

Check warning on line 81 in internal/io/connection/connection_pool.go

View check run for this annotation

Codecov / codecov/patch

internal/io/connection/connection_pool.go#L79-L81

Added lines #L79 - L81 were not covered by tests
}
if err := globalConnectionManager.store.Set(id, string(b)); err != nil {
return nil, err

Check warning on line 84 in internal/io/connection/connection_pool.go

View check run for this annotation

Codecov / codecov/patch

internal/io/connection/connection_pool.go#L83-L84

Added lines #L83 - L84 were not covered by tests
}
}
conn, err := createNamedConnection(ctx, meta)
if err != nil {
return nil, err
}
meta.conn = conn
globalConnectionManager.connectionPool[id] = meta
return conn, nil
}

func CreateNonStoredConnection(ctx api.StreamContext, id, typ string, props map[string]any) (modules.Connection, error) {
if id == "" || typ == "" {
return nil, fmt.Errorf("connection id and type should be defined")
}
globalConnectionManager.Lock()
defer globalConnectionManager.Unlock()
_, ok := globalConnectionManager.connectionPool[id]
if ok {
return nil, fmt.Errorf("connection %v already been created", id)
}
meta := ConnectionMeta{
ID: id,
Typ: typ,
Props: props,
}
conn, err := createNamedConnection(ctx, meta)
if err != nil {
return nil, err

Check warning on line 113 in internal/io/connection/connection_pool.go

View check run for this annotation

Codecov / codecov/patch

internal/io/connection/connection_pool.go#L113

Added line #L113 was not covered by tests
}
meta.conn = conn
globalConnectionManager.connectionPool[id] = meta
return conn, nil
}

func DropNonStoredConnection(ctx api.StreamContext, selId string) error {
if selId == "" {
return fmt.Errorf("connection id should be defined")
}
globalConnectionManager.Lock()
defer globalConnectionManager.Unlock()
meta, ok := globalConnectionManager.connectionPool[selId]
if !ok {
return nil
}
conn := meta.conn
conn.Close(ctx)
delete(globalConnectionManager.connectionPool, selId)
return nil
}

func createNamedConnection(ctx api.StreamContext, meta ConnectionMeta) (modules.Connection, error) {
var conn modules.Connection
var err error
connRegister, ok := modules.ConnectionRegister[strings.ToLower(meta.Typ)]
if !ok {
return nil, fmt.Errorf("unknown connection type")
}
conn, err = connRegister(ctx, meta.ID, meta.Props)
if err != nil {
return nil, err

Check warning on line 145 in internal/io/connection/connection_pool.go

View check run for this annotation

Codecov / codecov/patch

internal/io/connection/connection_pool.go#L145

Added line #L145 was not covered by tests
}
return conn, nil
}

func DropNameConnection(ctx api.StreamContext, selId string) error {
if selId == "" {
return fmt.Errorf("connection id should be defined")
}
globalConnectionManager.Lock()
defer globalConnectionManager.Unlock()
meta, ok := globalConnectionManager.connectionPool[selId]
if !ok {
return nil
}
conn := meta.conn
if conn.Ref(ctx) > 0 {
return fmt.Errorf("connection %s can't be dropped due to reference", selId)
}
if !isTest {
err := globalConnectionManager.store.Delete(selId)
if err != nil {
return fmt.Errorf("drop connection %s failed, err:%v", selId, err)

Check warning on line 167 in internal/io/connection/connection_pool.go

View check run for this annotation

Codecov / codecov/patch

internal/io/connection/connection_pool.go#L165-L167

Added lines #L165 - L167 were not covered by tests
}
}
conn.Close(ctx)
delete(globalConnectionManager.connectionPool, selId)
return nil
}

var globalConnectionManager *ConnectionManager

func InitConnectionManagerInTest() {
isTest = true
InitConnectionManager()
}

func InitConnectionManager() error {
globalConnectionManager = &ConnectionManager{
connectionPool: make(map[string]ConnectionMeta),
}
if !isTest {
globalConnectionManager.store, _ = store.GetKV("connectionMeta")
kvs, _ := globalConnectionManager.store.All()
for connectionID, raw := range kvs {
meta := ConnectionMeta{}
err := json.Unmarshal([]byte(raw), &meta)
if err != nil {
return fmt.Errorf("initialize connection:%v failed, err:%v", connectionID, err)

Check warning on line 193 in internal/io/connection/connection_pool.go

View check run for this annotation

Codecov / codecov/patch

internal/io/connection/connection_pool.go#L187-L193

Added lines #L187 - L193 were not covered by tests
}
conn, err := createNamedConnection(context.Background(), meta)
if err != nil {
return fmt.Errorf("initialize connection:%v failed, err:%v", connectionID, err)

Check warning on line 197 in internal/io/connection/connection_pool.go

View check run for this annotation

Codecov / codecov/patch

internal/io/connection/connection_pool.go#L195-L197

Added lines #L195 - L197 were not covered by tests
}
meta.conn = conn
globalConnectionManager.connectionPool[connectionID] = meta

Check warning on line 200 in internal/io/connection/connection_pool.go

View check run for this annotation

Codecov / codecov/patch

internal/io/connection/connection_pool.go#L199-L200

Added lines #L199 - L200 were not covered by tests
}
}
return nil
}

type ConnectionManager struct {
sync.RWMutex
store kv.KeyValue
ngjaying marked this conversation as resolved.
Show resolved Hide resolved
connectionPool map[string]ConnectionMeta
}

type ConnectionMeta struct {
ID string `json:"id"`
Typ string `json:"typ"`
Props map[string]any `json:"props"`
conn modules.Connection `json:"-"`
}

type mockConnection struct {
ngjaying marked this conversation as resolved.
Show resolved Hide resolved
id string
ref int
}

func (m *mockConnection) Ping(ctx api.StreamContext) error {
return nil
}

func (m *mockConnection) Close(ctx api.StreamContext) {
return
}

func (m *mockConnection) Attach(ctx api.StreamContext) {
m.ref++
return
}

func (m *mockConnection) DetachSub(ctx api.StreamContext, props map[string]any) {
m.ref--
return
}

func (m *mockConnection) DetachPub(ctx api.StreamContext, props map[string]any) {
m.ref--
return
}

func (m *mockConnection) Ref(ctx api.StreamContext) int {
return m.ref
}

func CreateMockConnection(ctx api.StreamContext, id string, props map[string]any) (modules.Connection, error) {
m := &mockConnection{id: id, ref: 0}
return m, nil
}

func init() {
modules.ConnectionRegister["mock"] = CreateMockConnection
}
78 changes: 78 additions & 0 deletions internal/io/connection/connection_pool_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
// Copyright 2024 EMQ Technologies Co., Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package connection

import (
"testing"

"github.com/stretchr/testify/require"

"github.com/lf-edge/ekuiper/v2/internal/topo/context"
)

func TestConnection(t *testing.T) {
InitConnectionManagerInTest()
ctx := context.Background()
conn, err := CreateNamedConnection(ctx, "id1", "mock", nil)
require.NoError(t, err)
require.NotNil(t, conn)
require.NoError(t, conn.Ping(ctx))
_, err = CreateNamedConnection(ctx, "id1", "mock", nil)
require.Error(t, err)
require.Equal(t, 0, conn.Ref(ctx))
conn.Attach(ctx)
require.Equal(t, 1, conn.Ref(ctx))
conn.Attach(ctx)
require.Equal(t, 2, conn.Ref(ctx))
conn.DetachPub(ctx, nil)
require.Equal(t, 1, conn.Ref(ctx))
err = DropNameConnection(ctx, "id1")
require.Error(t, err)
conn2, err := GetNameConnection("id1")
require.NoError(t, err)
require.NotNil(t, conn2)
conn.DetachSub(ctx, nil)
require.Equal(t, 0, conn.Ref(ctx))
err = DropNameConnection(ctx, "id1")
require.NoError(t, err)
err = DropNameConnection(ctx, "id1")
require.NoError(t, err)
conn3, err := GetNameConnection("id1")
require.Error(t, err)
require.Nil(t, conn3)

_, err = CreateNamedConnection(ctx, "", "mock", nil)
require.Error(t, err)
err = DropNameConnection(ctx, "")
require.Error(t, err)
_, err = CreateNamedConnection(ctx, "12", "unknown", nil)
require.Error(t, err)
_, err = GetNameConnection("")
require.Error(t, err)
err = PingConnection(ctx, "")
require.Error(t, err)
_, err = CreateNonStoredConnection(ctx, "", "mock", nil)
require.Error(t, err)

conn4, err := CreateNonStoredConnection(ctx, "id2", "mock", nil)
require.NoError(t, err)
require.NotNil(t, conn4)
_, err = CreateNonStoredConnection(ctx, "id2", "mock", nil)
require.Error(t, err)
err = DropNonStoredConnection(ctx, "")
require.Error(t, err)
err = DropNonStoredConnection(ctx, "nonexists")
require.NoError(t, err)
}
Loading
Loading