From c849c531bf13869a6e84fe1557c83b7fe37e06f5 Mon Sep 17 00:00:00 2001 From: Barry Date: Mon, 16 Dec 2024 11:59:35 +0800 Subject: [PATCH] update --- cmd/run.go | 13 ++-- config/default.go | 9 +++ db/service.go | 157 ++++++++++++++++----------------------------- db/types/sqlite.go | 25 ++++++++ 4 files changed, 97 insertions(+), 107 deletions(-) diff --git a/cmd/run.go b/cmd/run.go index 05472c10..d9ddb13f 100644 --- a/cmd/run.go +++ b/cmd/run.go @@ -4,6 +4,7 @@ import ( "context" "crypto/ecdsa" "fmt" + "github.com/0xPolygon/cdk/db/types" "math/big" "net/http" "os" @@ -824,15 +825,15 @@ func runSqliteServiceIfNeeded( if isNeeded([]string{ cdkcommon.AGGREGATOR}, components) { - dbPath[sqldb.AGG_TX_MGR] = cfg.Aggregator.EthTxManager.StoragePath - dbPath[sqldb.AGG_SYNC] = cfg.Aggregator.Synchronizer.SQLDB.DataSource - dbPath[sqldb.AGG_REORG_L1] = cfg.ReorgDetectorL1.DBPath + dbPath[types.AggTxMgr] = cfg.Aggregator.EthTxManager.StoragePath + dbPath[types.AggSync] = cfg.Aggregator.Synchronizer.SQLDB.DataSource + dbPath[types.AggReorgL1] = cfg.ReorgDetectorL1.DBPath } else if isNeeded([]string{ cdkcommon.SEQUENCE_SENDER}, components) { - dbPath[sqldb.SEQS_TX_MGR] = cfg.SequenceSender.EthTxManager.StoragePath - dbPath[sqldb.SEQS_L1_TREE] = cfg.L1InfoTreeSync.DBPath - dbPath[sqldb.SEQS_REORG_L1] = cfg.ReorgDetectorL1.DBPath + dbPath[types.SeqsTxMgr] = cfg.SequenceSender.EthTxManager.StoragePath + dbPath[types.SeqsL1Tree] = cfg.L1InfoTreeSync.DBPath + dbPath[types.SeqsReorgL1] = cfg.ReorgDetectorL1.DBPath } else { log.Warn("No need to start sqlite service") return diff --git a/config/default.go b/config/default.go index 6a505b88..4668d4d9 100644 --- a/config/default.go +++ b/config/default.go @@ -340,4 +340,13 @@ SaveCertificatesToFilesPath = "" MaxRetriesStoreCertificate = 3 DelayBeetweenRetries = "60s" KeepCertificatesHistory = true + +[Sqlite] +Host = "0.0.0.0" +Port = 8081 +ReadTimeout = "2s" +WriteTimeout = "2s" +AuthMethodList = "select,insert,update,delete" +MaxRequestsPerIPAndSecond = 500 + ` diff --git a/db/service.go b/db/service.go index 1d9500ec..18710c0a 100644 --- a/db/service.go +++ b/db/service.go @@ -2,67 +2,41 @@ package db import ( "context" - dbSql "database/sql" "errors" "fmt" "strings" "time" + "database/sql" "github.com/0xPolygon/cdk-rpc/rpc" "github.com/0xPolygon/cdk/db/types" "github.com/0xPolygon/cdk/log" - "go.opentelemetry.io/otel" - "go.opentelemetry.io/otel/metric" -) - -const ( - NAME = "sqlite" - meterName = "github.com/0xPolygon/cdk/sqlite/service" - - METHOD_SELECT = "select" - METHOD_INSERT = "insert" - METHOD_UPDATE = "update" - METHOD_DELETE = "delete" - - LIMIT_SQL_LEN = 6 - - zeroHex = "0x0" - - SEQS_L1_TREE = "seqs_l1tree" - SEQS_TX_MGR = "seqs_txmgr" - SEQS_REORG_L1 = "seqs_reorg_l1" - - AGG_SYNC = "agg_sync" - AGG_TX_MGR = "agg_txmgr" - AGG_REORG_L1 = "agg_reorg_l1" ) type SqliteEndpoints struct { logger *log.Logger - meter metric.Meter readTimeout time.Duration writeTimeout time.Duration authMethods []string dbMaps map[string]string - sqlDBs map[string]*dbSql.DB + sqlDBs map[string]*sql.DB } func CreateSqliteService( cfg Config, dbMaps map[string]string, ) *rpc.Server { - logger := log.WithFields("module", NAME) + logger := log.WithFields("module", types.NAME) - meter := otel.Meter(meterName) methodList := strings.Split(cfg.AuthMethodList, ",") log.Info(fmt.Sprintf("Sqlite service method auth list: %s", methodList)) for _, s := range methodList { methodList = append(methodList, s) } log.Info(fmt.Sprintf("Sqlite service dbMaps: %v", dbMaps)) - sqlDBs := make(map[string]*dbSql.DB) + sqlDBs := make(map[string]*sql.DB) for k, dbPath := range dbMaps { log.Info(fmt.Sprintf("Sqlite service: %s, %s", k, dbPath)) db, err := NewSQLiteDB(dbPath) @@ -75,10 +49,9 @@ func CreateSqliteService( services := []rpc.Service{ { - Name: NAME, + Name: types.NAME, Service: &SqliteEndpoints{ logger: logger, - meter: meter, readTimeout: cfg.ReadTimeout.Duration, writeTimeout: cfg.WriteTimeout.Duration, authMethods: methodList, @@ -97,23 +70,19 @@ func CreateSqliteService( }, services, rpc.WithLogger(logger.GetSugaredLogger())) } -type A struct { - Fields map[string]interface{} -} - func (b *SqliteEndpoints) Select( - db string, - sql string, + dbName string, + sqlCmd string, ) (interface{}, rpc.Error) { - err, dbCon := b.checkAndGetDB(db, sql, METHOD_SELECT) + err, dbCon := b.checkAndGetDB(dbName, sqlCmd, types.MethodSelect) if err != nil { - return zeroHex, rpc.NewRPCError(rpc.DefaultErrorCode, fmt.Sprintf("check params invalid: %s", err.Error())) + return types.ZeroHex, rpc.NewRPCError(rpc.DefaultErrorCode, fmt.Sprintf("check params invalid: %s", err.Error())) } ctx, cancel := context.WithTimeout(context.Background(), b.readTimeout) defer cancel() - rows, err := dbCon.QueryContext(ctx, sql) + rows, err := dbCon.QueryContext(ctx, sqlCmd) if err != nil { - if errors.Is(err, dbSql.ErrNoRows) { + if errors.Is(err, sql.ErrNoRows) { return nil, rpc.NewRPCError( rpc.DefaultErrorCode, fmt.Sprintf("No rows"), ErrNotFound) } @@ -129,13 +98,35 @@ func (b *SqliteEndpoints) Select( } func (b *SqliteEndpoints) Insert( + dbName string, + sqlCmd string, +) (interface{}, rpc.Error) { + return b.alterMethod(types.MethodInsert, dbName, sqlCmd) +} + +func (b *SqliteEndpoints) Delete( + dbName string, + sqlCmd string, +) (interface{}, rpc.Error) { + return b.alterMethod(types.MethodDelete, dbName, sqlCmd) +} + +func (b *SqliteEndpoints) Update( + dbName string, + sqlCmd string, +) (interface{}, rpc.Error) { + return b.alterMethod(types.MethodUpdate, dbName, sqlCmd) +} + +func (b *SqliteEndpoints) alterMethod( + method string, db string, sql string, ) (interface{}, rpc.Error) { - log.Info(fmt.Sprintf("Sqlite service insert: %s, %s", db, sql)) - err, dbCon := b.checkAndGetDB(db, sql, METHOD_INSERT) + log.Info(fmt.Sprintf("Sqlite: %s, %s", db, sql)) + err, dbCon := b.checkAndGetDB(db, sql, method) if err != nil { - return zeroHex, rpc.NewRPCError(rpc.DefaultErrorCode, fmt.Sprintf("check params invalid: %s", err.Error())) + return types.ZeroHex, rpc.NewRPCError(rpc.DefaultErrorCode, fmt.Sprintf("check params invalid: %s", err.Error())) } ctx, cancel := context.WithTimeout(context.Background(), b.readTimeout) defer cancel() @@ -143,13 +134,13 @@ func (b *SqliteEndpoints) Insert( tx, err := NewTx(ctx, dbCon) if err != nil { log.Error(fmt.Sprintf("failed to create tx: %s", err)) - return zeroHex, rpc.NewRPCError(rpc.DefaultErrorCode, fmt.Sprintf("failed to create tx: %s", err)) + return types.ZeroHex, rpc.NewRPCError(rpc.DefaultErrorCode, fmt.Sprintf("failed to create tx: %s", err)) } shouldRollback := true defer func() { if shouldRollback { - if errRllbck := tx.Rollback(); errRllbck != nil { - log.Errorf("error while rolling back tx %v", errRllbck) + if errRollback := tx.Rollback(); errRollback != nil { + log.Errorf("error while rolling back tx %v", errRollback) } } }() @@ -157,17 +148,17 @@ func (b *SqliteEndpoints) Insert( ct, err := tx.Exec(sql) if err != nil { log.Error(fmt.Sprintf("failed to exec: %s", err)) - return zeroHex, rpc.NewRPCError(rpc.DefaultErrorCode, fmt.Sprintf("failed to exec: %s", err)) + return types.ZeroHex, rpc.NewRPCError(rpc.DefaultErrorCode, fmt.Sprintf("failed to exec: %s", err)) } count, err := ct.RowsAffected() if err != nil { log.Error(fmt.Sprintf("failed to get rows affected: %s", err)) - return zeroHex, rpc.NewRPCError(rpc.DefaultErrorCode, fmt.Sprintf("failed to get rows affected: %s", err)) + return types.ZeroHex, rpc.NewRPCError(rpc.DefaultErrorCode, fmt.Sprintf("failed to get rows affected: %s", err)) } if err := tx.Commit(); err != nil { log.Error(fmt.Sprintf("failed to commit: %s", err)) - return zeroHex, rpc.NewRPCError(rpc.DefaultErrorCode, fmt.Sprintf("failed to commit: %s", err)) + return types.ZeroHex, rpc.NewRPCError(rpc.DefaultErrorCode, fmt.Sprintf("failed to commit: %s", err)) } shouldRollback = false @@ -176,45 +167,20 @@ func (b *SqliteEndpoints) Insert( }, nil } -func (b *SqliteEndpoints) Update( - db string, - sql string, -) (interface{}, rpc.Error) { - ctx, cancel := context.WithTimeout(context.Background(), b.readTimeout) - defer cancel() - c, merr := b.meter.Int64Counter("claim_proof") - if merr != nil { - b.logger.Warnf("failed to create claim_proof counter: %s", merr) - } - c.Add(ctx, 1) - - return types.SqliteData{}, nil -} - -func (b *SqliteEndpoints) Delete( - db string, - sql string, -) (interface{}, rpc.Error) { - log.Info(fmt.Sprintf("Sqlite service Delete: %s, %s", db, sql)) - - return types.SqliteData{}, nil -} - func (b *SqliteEndpoints) GetDbs() (interface{}, rpc.Error) { - //var dbList []string dbList := make(map[string][]string) + ctx, cancel := context.WithTimeout(context.Background(), b.readTimeout) + defer cancel() for k, dbPath := range b.dbMaps { log.Info(fmt.Sprintf("Sqlite service: %s, %s", k, dbPath)) - sql := "SELECT name FROM sqlite_master WHERE type = 'table' ORDER BY name;" - err, dbCon := b.checkAndGetDB(k, sql, METHOD_SELECT) + sqlCmd := "SELECT name FROM sqlite_master WHERE type = 'table' ORDER BY name;" + err, dbCon := b.checkAndGetDB(k, sqlCmd, types.MethodSelect) if err != nil { - return zeroHex, rpc.NewRPCError(rpc.DefaultErrorCode, fmt.Sprintf("%s", err.Error())) + return types.ZeroHex, rpc.NewRPCError(rpc.DefaultErrorCode, fmt.Sprintf("%s", err.Error())) } - ctx, cancel := context.WithTimeout(context.Background(), b.readTimeout) - defer cancel() - rows, err := dbCon.QueryContext(ctx, sql) + rows, err := dbCon.QueryContext(ctx, sqlCmd) if err != nil { - if errors.Is(err, dbSql.ErrNoRows) { + if errors.Is(err, sql.ErrNoRows) { return nil, rpc.NewRPCError( rpc.DefaultErrorCode, fmt.Sprintf("No rows"), ErrNotFound) } @@ -225,24 +191,21 @@ func (b *SqliteEndpoints) GetDbs() (interface{}, rpc.Error) { if err != nil { return nil, rpc.NewRPCError(rpc.DefaultErrorCode, fmt.Sprintf("failed to get results: %s", err.Error())) } - dbList[k] = result } return dbList, nil } -func (b *SqliteEndpoints) checkAndGetDB(db string, sql string, method string) (error, *dbSql.DB) { - log.Info(fmt.Sprintf("Sqlite endpoints, check db:%v,sql:%v,method:%v", db, sql, method)) - if len(sql) <= LIMIT_SQL_LEN { +func (b *SqliteEndpoints) checkAndGetDB(db string, sql string, method string) (error, *sql.DB) { + log.Info(fmt.Sprintf("Sqlite check db:%v, sql:%v, method:%v", db, sql, method)) + if len(sql) <= types.LimitSqlLen { return fmt.Errorf("sql length is too short"), nil } - sqlMethod := strings.ToLower(sql[:6]) if sqlMethod != method { return fmt.Errorf("sql method is not valid"), nil } - found := false for _, str := range b.authMethods { if str == method { @@ -253,7 +216,6 @@ func (b *SqliteEndpoints) checkAndGetDB(db string, sql string, method string) (e if !found { return fmt.Errorf("sql method is not authorized"), nil } - dbCon, ok := b.sqlDBs[db] if !ok { return fmt.Errorf("sql db is not valid"), nil @@ -261,40 +223,34 @@ func (b *SqliteEndpoints) checkAndGetDB(db string, sql string, method string) (e return nil, dbCon } -func getResults(rows *dbSql.Rows) (error, []A) { - var result []A +func getResults(rows *sql.Rows) (error, []types.QueryData) { + var result []types.QueryData columns, err := rows.Columns() if err != nil { log.Error(fmt.Sprintf("Failed to get columns: %v", err)) return err, nil } for rows.Next() { - record := A{Fields: make(map[string]interface{})} - + record := types.QueryData{Fields: make(map[string]interface{})} values := make([]interface{}, len(columns)) valuePtrs := make([]interface{}, len(columns)) for i := range values { valuePtrs[i] = &values[i] } - if err := rows.Scan(valuePtrs...); err != nil { log.Error("Failed to scan row: %v", err) return err, nil } - for i, colName := range columns { record.Fields[colName] = values[i] } - result = append(result, record) } - return nil, result } -func getTables(rows *dbSql.Rows) (error, []string) { +func getTables(rows *sql.Rows) (error, []string) { var result []string - for rows.Next() { var tableName string if err := rows.Scan(&tableName); err != nil { @@ -303,6 +259,5 @@ func getTables(rows *dbSql.Rows) (error, []string) { log.Info(fmt.Sprintf("Table name: %s", tableName)) result = append(result, tableName) } - return nil, result } diff --git a/db/types/sqlite.go b/db/types/sqlite.go index 9cfd5847..6126a21b 100644 --- a/db/types/sqlite.go +++ b/db/types/sqlite.go @@ -3,3 +3,28 @@ package types type SqliteData struct { RowsAffected int64 } + +const ( + NAME = "sqlite" + + MethodSelect = "select" + MethodInsert = "insert" + MethodUpdate = "update" + MethodDelete = "delete" + + LimitSqlLen = 6 + + ZeroHex = "0x0" + + SeqsL1Tree = "seqs_l1tree" + SeqsTxMgr = "seqs_txmgr" + SeqsReorgL1 = "seqs_reorg_l1" + + AggSync = "agg_sync" + AggTxMgr = "agg_txmgr" + AggReorgL1 = "agg_reorg_l1" +) + +type QueryData struct { + Fields map[string]interface{} +}