Skip to content

Commit

Permalink
Remove admission code (#181)
Browse files Browse the repository at this point in the history
  • Loading branch information
HomayoonAlimohammadi authored Oct 1, 2024
1 parent e4c74a0 commit a01cac6
Show file tree
Hide file tree
Showing 11 changed files with 5 additions and 361 deletions.
11 changes: 0 additions & 11 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,6 @@ var (
watchAvailableStorageMinBytes uint64
lowAvailableStorageAction string

admissionControlPolicy string
acpLimitMaxConcurrentTxn int64
acpOnlyWriteQueries bool

etcdMode bool
watchQueryTimeout time.Duration
}
Expand Down Expand Up @@ -108,9 +104,6 @@ var (
rootCmdOpts.watchAvailableStorageInterval,
rootCmdOpts.watchAvailableStorageMinBytes,
rootCmdOpts.lowAvailableStorageAction,
rootCmdOpts.admissionControlPolicy,
rootCmdOpts.acpLimitMaxConcurrentTxn,
rootCmdOpts.acpOnlyWriteQueries,
rootCmdOpts.connectionPoolConfig,
rootCmdOpts.watchQueryTimeout,
)
Expand Down Expand Up @@ -187,10 +180,6 @@ func init() {
rootCmd.Flags().DurationVar(&rootCmdOpts.watchAvailableStorageInterval, "watch-storage-available-size-interval", 5*time.Second, "Interval to check if the disk is running low on space. Set to 0 to disable the periodic disk size check")
rootCmd.Flags().Uint64Var(&rootCmdOpts.watchAvailableStorageMinBytes, "watch-storage-available-size-min-bytes", 10*1024*1024, "Minimum required available disk size (in bytes) to continue operation. If available disk space gets below this threshold, then the --low-available-storage-action is performed")
rootCmd.Flags().StringVar(&rootCmdOpts.lowAvailableStorageAction, "low-available-storage-action", "none", "Action to perform in case the available storage is low. One of (none|handover|terminate). none means no action is performed. handover means the dqlite node will handover its leadership role, if any. terminate means this dqlite node will shutdown")
rootCmd.Flags().StringVar(&rootCmdOpts.admissionControlPolicy, "admission-control-policy", "allow-all", "Transaction admission control policy to use. One of (allow-all|limit-concurrent-transactions). Set to allow-all to disable the admission control")
// TODO(MK-1408): This value is highly dependent on underlying hardware, thus making the default value a bit useless. The linked card will implement a dynamic way to set this value.
rootCmd.Flags().Int64Var(&rootCmdOpts.acpLimitMaxConcurrentTxn, "admission-control-policy-limit-max-concurrent-transactions", 300, "Maximum number of transactions that are allowed to run concurrently. Transactions will not be admitted after the limit is reached.")
rootCmd.Flags().BoolVar(&rootCmdOpts.acpOnlyWriteQueries, "admission-control-only-for-write-queries", false, "If set, admission control will only be applied to write queries.")
rootCmd.Flags().DurationVar(&rootCmdOpts.watchQueryTimeout, "watch-query-timeout", 20*time.Second, "Timeout for querying events in the watch poll loop. If timeout is reached, the poll loop will be re-triggered. The minimum value is 5 seconds.")

rootCmd.AddCommand(&cobra.Command{
Expand Down
6 changes: 3 additions & 3 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ The following configuration options are available listed in a table format:
| `--watch-storage-available-size-interval` | Interval to check if the disk is running low on space | `5s` |
| `--watch-storage-available-size-min-bytes` | Minimum required available disk size (in bytes) to continue operation | `10*1024*1024`|
| `--low-available-storage-action` | Action to perform in case the available storage is low | `none` |
| `--admission-control-policy` | Transaction admission control policy to use | `allow-all` |
| `--admission-control-policy-limit` | Maximum number of transactions that are allowed to run concurrently | `300` |
| `--admission-control-only-for-write-queries` | If set, admission control will only be applied to write queries | `false` |
| ~~`--admission-control-policy`~~ | `REMOVED` | - |
| ~~`--admission-control-policy-limit`~~ | `REMOVED` | - |
| ~~`--admission-control-only-for-write-queries`~~ | `REMOVED` | - |
| `--watch-query-timeout` | Timeout for querying events in the watch poll loop | `20s` |

## Observability
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ require (
go.opentelemetry.io/otel/sdk v1.28.0
go.opentelemetry.io/otel/sdk/metric v1.28.0
go.opentelemetry.io/otel/trace v1.28.0
golang.org/x/sync v0.7.0
golang.org/x/sys v0.21.0
google.golang.org/grpc v1.65.0
gopkg.in/yaml.v2 v2.4.0
Expand Down Expand Up @@ -72,6 +71,7 @@ require (
go.uber.org/zap v1.27.0 // indirect
golang.org/x/crypto v0.24.0 // indirect
golang.org/x/net v0.26.0 // indirect
golang.org/x/sync v0.7.0 // indirect
golang.org/x/text v0.16.0 // indirect
golang.org/x/time v0.5.0 // indirect
google.golang.org/genproto v0.0.0-20240311173647-c811ad7063a7 // indirect
Expand Down
98 changes: 0 additions & 98 deletions pkg/kine/drivers/generic/admission.go

This file was deleted.

69 changes: 0 additions & 69 deletions pkg/kine/drivers/generic/admission_test.go

This file was deleted.

27 changes: 0 additions & 27 deletions pkg/kine/drivers/generic/generic.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,8 +156,6 @@ type Generic struct {
TranslateErr TranslateErr
ErrCode ErrCode

AdmissionControlPolicy AdmissionControlPolicy

// CompactInterval is interval between database compactions performed by kine.
CompactInterval time.Duration
// PollInterval is the event poll interval used by kine.
Expand Down Expand Up @@ -352,7 +350,6 @@ func Open(ctx context.Context, driverName, dataSourceName string, connPoolConfig

FillSQL: q(`INSERT INTO kine(id, name, created, deleted, create_revision, prev_revision, lease, value, old_value)
VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?)`, paramCharacter, numbered),
AdmissionControlPolicy: &allowAllPolicy{},
}, err
}

Expand Down Expand Up @@ -382,12 +379,6 @@ func (d *Generic) query(ctx context.Context, txName, query string, args ...inter
attribute.String("tx_name", txName),
)

done, err := d.AdmissionControlPolicy.Admit(ctx, txName)
if err != nil {
return nil, fmt.Errorf("denied: %w", err)
}
defer done()

start := time.Now()
retryCount := 0
defer func() {
Expand Down Expand Up @@ -426,12 +417,6 @@ func (d *Generic) execute(ctx context.Context, txName, query string, args ...int
attribute.String("tx_name", txName),
)

done, err := d.AdmissionControlPolicy.Admit(ctx, txName)
if err != nil {
return nil, fmt.Errorf("denied: %w", err)
}
defer done()

if d.LockWrites {
d.Lock()
defer d.Unlock()
Expand Down Expand Up @@ -708,12 +693,6 @@ func (d *Generic) GetCompactRevision(ctx context.Context) (int64, int64, error)
span.End()
}()

done, err := d.AdmissionControlPolicy.Admit(ctx, "revision_interval_sql")
if err != nil {
return 0, 0, fmt.Errorf("denied: %w", err)
}
defer done()

rows, err := d.query(ctx, "revision_interval_sql", revisionIntervalSQL)
if err != nil {
return 0, 0, err
Expand Down Expand Up @@ -793,12 +772,6 @@ func (d *Generic) CurrentRevision(ctx context.Context) (int64, error) {
span.End()
}()

done, err := d.AdmissionControlPolicy.Admit(ctx, "rev_sql")
if err != nil {
return 0, fmt.Errorf("denied: %w", err)
}
defer done()

rows, err := d.query(ctx, "rev_sql", revSQL)
if err != nil {
return 0, err
Expand Down
9 changes: 0 additions & 9 deletions pkg/kine/drivers/generic/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,6 @@ var (
Name: "k8s_dqlite_generic_current_ops",
Help: "Total number of database operations that are currently running by tx_name",
}, []string{"tx_name"})
metricsOpAdmissionControl = prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "k8s_dqlite_generic_op_admission_control",
Help: "Total number of database operations that the admission control handled by tx_name and result",
}, []string{"tx_name", "result"})
)

func errorToResultLabel(err error) string {
Expand All @@ -47,10 +43,6 @@ func recordOpResult(txName string, err error, startTime time.Time) {
metricsOpResult.WithLabelValues(txName, resultLabel).Inc()
}

func recordOpAdmissionControl(txName string, status string) {
metricsOpAdmissionControl.WithLabelValues(txName, status).Inc()
}

func incCurrentOps(txName string) {
metricsCurrentOps.WithLabelValues(txName).Inc()
}
Expand All @@ -65,6 +57,5 @@ func init() {
metricsOpResult,
metricsOpLatency,
metricsCurrentOps,
metricsOpAdmissionControl,
)
}
24 changes: 0 additions & 24 deletions pkg/kine/drivers/sqlite/sqlite.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"fmt"
"net/url"
"os"
"strconv"
"strings"
"time"

Expand All @@ -26,10 +25,6 @@ type opts struct {
compactInterval time.Duration
pollInterval time.Duration
watchQueryTimeout time.Duration

admissionControlPolicy string
admissionControlPolicyLimitMaxConcurrentTxn int64
admissionControlOnlyWriteQueries bool
}

func New(ctx context.Context, dataSourceName string, connectionPoolConfig *generic.ConnectionPoolConfig) (server.Backend, error) {
Expand Down Expand Up @@ -95,11 +90,6 @@ func NewVariant(ctx context.Context, driverName, dataSourceName string, connecti
dialect.CompactInterval = opts.compactInterval
dialect.PollInterval = opts.pollInterval
dialect.WatchQueryTimeout = opts.watchQueryTimeout
dialect.AdmissionControlPolicy = generic.NewAdmissionControlPolicy(
opts.admissionControlPolicy,
opts.admissionControlOnlyWriteQueries,
opts.admissionControlPolicyLimitMaxConcurrentTxn,
)

if driverName == "sqlite3" {
dialect.Retry = func(err error) bool {
Expand Down Expand Up @@ -221,20 +211,6 @@ func parseOpts(dsn string) (opts, error) {
return opts{}, fmt.Errorf("failed to parse watch-query-timeout duration value %q: %w", vs[0], err)
}
result.watchQueryTimeout = d
case "admission-control-policy":
result.admissionControlPolicy = vs[0]
case "admission-control-policy-limit-max-concurrent-txn":
d, err := strconv.ParseInt(vs[0], 10, 64)
if err != nil {
return opts{}, fmt.Errorf("failed to parse max-concurrent-txn value %q: %w", vs[0], err)
}
result.admissionControlPolicyLimitMaxConcurrentTxn = d
case "admission-control-only-write-queries":
d, err := strconv.ParseBool(vs[0])
if err != nil {
return opts{}, fmt.Errorf("failed to parse admission-control-only-writes value %q: %w", vs[0], err)
}
result.admissionControlOnlyWriteQueries = d
default:
return opts{}, fmt.Errorf("unknown option %s=%v", k, vs)
}
Expand Down
6 changes: 0 additions & 6 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,6 @@ func New(
watchAvailableStorageInterval time.Duration,
watchAvailableStorageMinBytes uint64,
lowAvailableStorageAction string,
admissionControlPolicy string,
admissionControlPolicyLimitMaxConcurrentTxn int64,
admissionControlOnlyWriteQueries bool,
connectionPoolConfig generic.ConnectionPoolConfig,
watchQueryTimeout time.Duration,
) (*Server, error) {
Expand Down Expand Up @@ -274,9 +271,6 @@ func New(
params["poll-interval"] = []string{fmt.Sprintf("%v", *v)}
}

params["admission-control-policy"] = []string{admissionControlPolicy}
params["admission-control-policy-limit-max-concurrent-txn"] = []string{fmt.Sprintf("%v", admissionControlPolicyLimitMaxConcurrentTxn)}
params["admission-control-only-write-queries"] = []string{fmt.Sprintf("%v", admissionControlOnlyWriteQueries)}
params["watch-query-timeout"] = []string{fmt.Sprintf("%v", watchQueryTimeout)}

kineConfig.Listener = listen
Expand Down
Loading

0 comments on commit a01cac6

Please sign in to comment.