Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrated Workflows From In Memory to Db #33

Merged
merged 2 commits into from
Aug 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions data/scripts/def.sql
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,15 @@ CREATE TABLE IF NOT EXISTS api_keys (
FOREIGN KEY(user_ID) REFERENCES users(id)
);

CREATE TABLE IF NOT EXISTS workflows (
id INTEGER NOT NULL PRIMARY KEY,
topic_name TEXT NOT NULL,
offset INTEGER NOT NULL,
function_name TEXT NOT NULL,
enabled BOOLEAN NOT NULL DEFAULT TRUE,
sink_url TEXT NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
last_modified TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);


4 changes: 2 additions & 2 deletions persistence/apiKeys.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,16 +66,16 @@ func (dstore *Datastore) GetAPIKeysByUserID(userID int) ([]ApiKey, error) {
if err != nil {
return nil, err
}
defer rows.Close()

var apiKeys []ApiKey
for rows.Next() {
var apiKey ApiKey
err := rows.Scan(&apiKey.Id, &apiKey.Key, &apiKey.CreatedAt)
if err != nil {
rows.Close()
return nil, err
}
apiKeys = append(apiKeys, apiKey)
}
rows.Close()
return apiKeys, nil
}
2 changes: 1 addition & 1 deletion persistence/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func setupConnection(isDevMode bool) (*Datastore, error) {
var db_file string

if isDevMode {
db_file = ":memory:"
db_file = "file::memory:?cache=shared"
} else {
db_file = os.Getenv("DB_FILE_LOCATION")
}
Expand Down
5 changes: 3 additions & 2 deletions persistence/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,14 +216,15 @@ func (dstore *Datastore) GcSessions(maxlifetime int64) error {
deleteQuery := "DELETE FROM sessions ID=?;"
dstore.RWMutex.Lock()
rows, err := dstore.db.Query(selectQuery)
dstore.RWMutex.Unlock()
bdkiran marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return err
}
defer rows.Close()
for rows.Next() {
var sdb SessionDB
err = rows.Scan(&sdb.ID, &sdb.TimeAccessed)
if err != nil {
rows.Close()
return err
}
if (sdb.TimeAccessed.Unix() + maxlifetime) < time.Now().Unix() {
Expand All @@ -233,7 +234,7 @@ func (dstore *Datastore) GcSessions(maxlifetime int64) error {
}
}
}
dstore.RWMutex.Unlock()
rows.Close()
bdkiran marked this conversation as resolved.
Show resolved Hide resolved
err = rows.Err()
if err != nil {
return err
Expand Down
4 changes: 2 additions & 2 deletions persistence/stickyConnection.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,16 +52,16 @@ func (dstore *Datastore) GetAllEndpoints(userId int) ([]Endpoint, error) {
if err != nil {
return stickey_connections, err
}
defer rows.Close()

for rows.Next() {
var stickey_connection Endpoint
err := rows.Scan(&stickey_connection.RouteID, &stickey_connection.Security_key, &stickey_connection.TopicName, &stickey_connection.LastModified)
if err != nil {
rows.Close()
return stickey_connections, err
}
stickey_connections = append(stickey_connections, stickey_connection)
}
rows.Close()
return stickey_connections, nil
}

Expand Down
3 changes: 2 additions & 1 deletion persistence/user.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,16 @@ func (dstore *Datastore) getAllUsers() ([]User, error) {
if err != nil {
return users, err
}
defer rows.Close()
for rows.Next() {
var user User
err := rows.Scan(&user.ID, &user.Email, &user.AuthType, &user.CreatedAt, &user.LastModified)
if err != nil {
rows.Close()
return users, err
}
users = append(users, user)
}
rows.Close()
return users, nil
}

Expand Down
155 changes: 155 additions & 0 deletions persistence/workflow.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
package persistence

import (
"errors"
"reflect"
"time"
)

type Workflow struct {
Id int `json:"id"`
TopicName string `json:"topicName"`
Offset int `json:"offset"`
FunctionName string `json:"functionName"`
Enabled bool `json:"enabled"`
SinkURL string `json:"sinkURL"`
LastModified time.Time `json:"lastModified,omitempty"`
}

type funcMap map[string]interface{}

var FUNC_MAP = funcMap{}
bdkiran marked this conversation as resolved.
Show resolved Hide resolved

func (workflow Workflow) Call(params ...interface{}) (result interface{}, err error) {
f := reflect.ValueOf(FUNC_MAP[workflow.FunctionName])
if len(params) != f.Type().NumIn() {
err = errors.New("the number of params is out of index")
return
}
in := make([]reflect.Value, len(params))
for k, param := range params {
in[k] = reflect.ValueOf(param)
}
res := f.Call(in)
result = res[0].Interface()
return
}

func (dstore *Datastore) GetWorkflow(id int) (Workflow, error) {
selectQuery := "SELECT id, topic_name, offset, function_name, enabled, sink_url, last_modified FROM workflows WHERE id=?;"
dstore.RWMutex.RLock()
row := dstore.db.QueryRow(selectQuery, id)
dstore.RWMutex.RUnlock()

var workflow Workflow

err := row.Scan(&workflow.Id, &workflow.TopicName, &workflow.Offset, &workflow.FunctionName, &workflow.Enabled, &workflow.SinkURL, &workflow.LastModified)
if err != nil {
return workflow, err
}
return workflow, nil
}

func (dstore *Datastore) GetWorkflows() ([]Workflow, error) {
workflows := []Workflow{}
selectQuery := "SELECT id, topic_name, offset, function_name, enabled, sink_url, last_modified FROM workflows;"
dstore.RWMutex.RLock()
rows, err := dstore.db.Query(selectQuery)
dstore.RWMutex.RUnlock()
if err != nil {
return workflows, err
}
for rows.Next() {
var workflow Workflow
err := rows.Scan(&workflow.Id, &workflow.TopicName, &workflow.Offset, &workflow.FunctionName, &workflow.Enabled, &workflow.SinkURL, &workflow.LastModified)
if err != nil {
rows.Close()
return workflows, err
}
workflows = append(workflows, workflow)
}
rows.Close()
err = rows.Err()
if err != nil {
return workflows, err
}
return workflows, nil
}

func (dstore *Datastore) GetEnabledWorkflows() ([]Workflow, error) {
workflows := []Workflow{}
selectQuery := "SELECT id, topic_name, offset, function_name, enabled, sink_url, last_modified FROM workflows WHERE enabled=true;"
dstore.RWMutex.RLock()
rows, err := dstore.db.Query(selectQuery)
dstore.RWMutex.RUnlock()
if err != nil {
return workflows, err
}
for rows.Next() {
var workflow Workflow
err := rows.Scan(&workflow.Id, &workflow.TopicName, &workflow.Offset, &workflow.FunctionName, &workflow.Enabled, &workflow.SinkURL, &workflow.LastModified)
if err != nil {
rows.Close()
return workflows, err
}
workflows = append(workflows, workflow)
}
rows.Close()
return workflows, nil
}

func (dstore *Datastore) InsertWorkflow(workflow Workflow) (time.Time, error) {
insertQuery := `
INSERT INTO workflows(id, topic_name, offset, function_name, sink_url, last_modified)
VALUES (?, ?, ?, ?, ?, CURRENT_TIMESTAMP)
RETURNING id, last_modified;
`

dstore.RWMutex.Lock()
row := dstore.db.QueryRow(insertQuery, workflow.Id, workflow.TopicName, workflow.Offset, workflow.FunctionName, workflow.SinkURL)
dstore.RWMutex.Unlock()

var insertedID int
var lastModified time.Time

err := row.Scan(&insertedID, &lastModified)
if err != nil {
return time.Time{}, err
}

return lastModified, nil
}

func (dstore *Datastore) DeleteWorkflow(id int) (int, error) {
deleteQuery := "DELETE FROM workflows WHERE id=?"
dstore.RWMutex.Lock()
res, err := dstore.db.Exec(deleteQuery, id)
dstore.RWMutex.Unlock()
if err != nil {
return 0, err
}
rowsDeleted, err := res.RowsAffected()
if err != nil {
return 0, err
}
return int(rowsDeleted), nil
}

func (dstore *Datastore) UpdateWorkflow(id int) (Workflow, error) {
updateQuery := `
UPDATE workflows
SET enabled = NOT enabled, last_modified = CURRENT_TIMESTAMP
WHERE id = ?
RETURNING id, topic_name, offset, function_name, enabled, sink_url, last_modified;
`

dstore.RWMutex.Lock()
row := dstore.db.QueryRow(updateQuery, id)
dstore.RWMutex.Unlock()
var workflow Workflow
err := row.Scan(&workflow.Id, &workflow.TopicName, &workflow.Offset, &workflow.FunctionName, &workflow.Enabled, &workflow.SinkURL, &workflow.LastModified)
if err != nil {
return workflow, err
}
return workflow, err
}
Loading