Skip to content
This repository has been archived by the owner on Oct 11, 2024. It is now read-only.

Storage Factory #1558

Merged
merged 8 commits into from
Jun 5, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 13 additions & 23 deletions cmd/keytransparency-sequencer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,12 @@ import (
"github.com/google/keytransparency/core/adminserver"
"github.com/google/keytransparency/core/sequencer"
"github.com/google/keytransparency/core/sequencer/election"
"github.com/google/keytransparency/impl/mysql/directory"
"github.com/google/keytransparency/impl/mysql/mutationstorage"
"github.com/google/keytransparency/impl"
"github.com/google/keytransparency/internal/forcemaster"

pb "github.com/google/keytransparency/core/api/v1/keytransparency_go_proto"
dir "github.com/google/keytransparency/core/directory"
spb "github.com/google/keytransparency/core/sequencer/sequencer_go_proto"
ktsql "github.com/google/keytransparency/impl/mysql"
etcdelect "github.com/google/trillian/util/election2/etcd"
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"

Expand All @@ -63,8 +61,8 @@ var (
etcdServers = flag.String("etcd_servers", "", "A comma-separated list of etcd servers; no etcd registration if empty")
lockDir = flag.String("lock_file_path", "/keytransparency/master", "etcd lock file directory path")

serverDBPath = flag.String("db", "db", "Database connection string")

dbPath = flag.String("db", "", "Database connection string")
dbEngine = flag.String("db_engine", "mysql", fmt.Sprintf("Storage engines: %v", impl.StorageEngines()))
// Info to connect to the trillian map and log.
mapURL = flag.String("map-url", "", "URL of Trillian Map Server")
logURL = flag.String("log-url", "", "URL of Trillian Log Server for Signed Map Heads")
Expand Down Expand Up @@ -119,20 +117,11 @@ func main() {
}

// Database tables
sqldb, err := ktsql.Open(*serverDBPath)
db, err := impl.NewStorage(ctx, *dbEngine, *dbPath)
if err != nil {
glog.Exit(err)
}
defer sqldb.Close()

mutations, err := mutationstorage.New(sqldb)
if err != nil {
glog.Exitf("Failed to create mutations object: %v", err)
}
directoryStorage, err := directory.NewStorage(sqldb)
if err != nil {
glog.Exitf("Failed to create directory storage object: %v", err)
}
defer db.Close()

grpcServer := grpc.NewServer(
grpc.StreamInterceptor(grpc_prometheus.StreamServerInterceptor),
Expand All @@ -147,11 +136,12 @@ func main() {
defer done()

spb.RegisterKeyTransparencySequencerServer(grpcServer, sequencer.NewServer(
directoryStorage,
db.Directories,
trillian.NewTrillianLogClient(lconn),
trillian.NewTrillianMapClient(mconn),
trillian.NewTrillianMapWriteClient(mconn),
mutations, mutations,
db.Batches,
db.Logs,
spb.NewKeyTransparencySequencerClient(conn),
prometheus.MetricFactory{}))

Expand All @@ -160,9 +150,9 @@ func main() {
trillian.NewTrillianMapClient(mconn),
trillian.NewTrillianAdminClient(lconn),
trillian.NewTrillianAdminClient(mconn),
directoryStorage,
mutations,
mutations,
db.Directories,
db.Logs,
db.Batches,
func(ctx context.Context, spec *keyspb.Specification) (proto.Message, error) {
return der.NewProtoFromSpec(spec)
}))
Expand All @@ -172,12 +162,12 @@ func main() {
grpc_prometheus.EnableHandlingTimeHistogram()

g, gctx := errgroup.WithContext(ctx)
g.Go(func() error { return serverutil.ServeHTTPMetrics(*metricsAddr, serverutil.Readyz(sqldb)) })
g.Go(func() error { return serverutil.ServeHTTPMetrics(*metricsAddr, serverutil.Readyz(db)) })
g.Go(func() error {
return serverutil.ServeHTTPAPIAndGRPC(gctx, lis, grpcServer, conn,
pb.RegisterKeyTransparencyAdminHandler)
})
go runSequencer(gctx, conn, directoryStorage)
go runSequencer(gctx, conn, db.Directories)

glog.Errorf("Sequencer exiting: %v", g.Wait())
}
Expand Down
36 changes: 13 additions & 23 deletions cmd/keytransparency-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package main
import (
"context"
"flag"
"fmt"

"github.com/golang/glog"
"github.com/google/trillian"
Expand All @@ -28,13 +29,11 @@ import (
"github.com/google/keytransparency/cmd/serverutil"
"github.com/google/keytransparency/core/keyserver"
"github.com/google/keytransparency/core/mutator/entry"
"github.com/google/keytransparency/impl"
"github.com/google/keytransparency/impl/authentication"
"github.com/google/keytransparency/impl/authorization"
"github.com/google/keytransparency/impl/mysql/directory"
"github.com/google/keytransparency/impl/mysql/mutationstorage"

pb "github.com/google/keytransparency/core/api/v1/keytransparency_go_proto"
ktsql "github.com/google/keytransparency/impl/mysql"
grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
grpc_auth "github.com/grpc-ecosystem/go-grpc-middleware/auth"
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
Expand All @@ -43,12 +42,13 @@ import (
)

var (
addr = flag.String("addr", ":8080", "The ip:port combination to listen on")
metricsAddr = flag.String("metrics-addr", ":8081", "The ip:port to publish metrics on")
serverDBPath = flag.String("db", "test:zaphod@tcp(localhost:3306)/test", "Database connection string")
keyFile = flag.String("tls-key", "genfiles/server.key", "TLS private key file")
certFile = flag.String("tls-cert", "genfiles/server.crt", "TLS cert file")
authType = flag.String("auth-type", "google", "Sets the type of authentication required from clients to update their entries. Accepted values are google (oauth tokens) and insecure-fake (for testing only).")
addr = flag.String("addr", ":8080", "The ip:port combination to listen on")
metricsAddr = flag.String("metrics-addr", ":8081", "The ip:port to publish metrics on")
dbPath = flag.String("db", "test:zaphod@tcp(localhost:3306)/test", "Database connection string")
dbEngine = flag.String("db_engine", "mysql", fmt.Sprintf("Storage engines: %v", impl.StorageEngines()))
keyFile = flag.String("tls-key", "genfiles/server.key", "TLS private key file")
certFile = flag.String("tls-cert", "genfiles/server.crt", "TLS cert file")
authType = flag.String("auth-type", "google", "Sets the type of authentication required from clients to update their entries. Accepted values are google (oauth tokens) and insecure-fake (for testing only).")

mapURL = flag.String("map-url", "", "URL of Trillian Map Server")
logURL = flag.String("log-url", "", "URL of Trillian Log Server for Signed Map Heads")
Expand All @@ -60,11 +60,11 @@ func main() {
ctx := context.Background()

// Open Resources.
sqldb, err := ktsql.Open(*serverDBPath)
db, err := impl.NewStorage(ctx, *dbEngine, *dbPath)
if err != nil {
glog.Exit(err)
}
defer sqldb.Close()
defer db.Close()

authz := &authorization.AuthzPolicy{}
var authFunc grpc_auth.AuthFunc
Expand All @@ -83,16 +83,6 @@ func main() {
glog.Exitf("Invalid auth-type parameter: %v.", *authType)
}

// Create database and helper objects.
directories, err := directory.NewStorage(sqldb)
if err != nil {
glog.Exitf("Failed to create directory storage: %v", err)
}
logs, err := mutationstorage.New(sqldb)
if err != nil {
glog.Exitf("Failed to create mutations storage: %v", err)
}

// Connect to log and map server.
tconn, err := grpc.Dial(*logURL, grpc.WithInsecure())
if err != nil {
Expand All @@ -106,7 +96,7 @@ func main() {
tmap := trillian.NewTrillianMapClient(mconn)

// Create gRPC server.
ksvr := keyserver.New(tlog, tmap, entry.IsValidEntry, directories, logs, logs,
ksvr := keyserver.New(tlog, tmap, entry.IsValidEntry, db.Directories, db.Logs, db.Batches,
prometheus.MetricFactory{}, int32(*revisionPageSize))
grpcServer := grpc.NewServer(
grpc.StreamInterceptor(grpc_middleware.ChainStreamServer(
Expand Down Expand Up @@ -137,7 +127,7 @@ func main() {
defer done()

g, gctx := errgroup.WithContext(ctx)
g.Go(func() error { return serverutil.ServeHTTPMetrics(*metricsAddr, serverutil.Readyz(sqldb)) })
g.Go(func() error { return serverutil.ServeHTTPMetrics(*metricsAddr, serverutil.Readyz(db)) })
g.Go(func() error {
return serverutil.ServeHTTPAPIAndGRPC(gctx, lis, grpcServer, conn, pb.RegisterKeyTransparencyHandler)
})
Expand Down
5 changes: 2 additions & 3 deletions cmd/serverutil/readyz.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,13 @@
package serverutil

import (
"database/sql"
"net/http"
)

// Readyz is a readiness probe.
func Readyz(db *sql.DB) http.HandlerFunc {
func Readyz(db interface{ HealthCheck() error }) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
if db == nil || db.PingContext(r.Context()) != nil {
if db == nil || db.HealthCheck() != nil {
http.Error(w, http.StatusText(http.StatusServiceUnavailable), http.StatusServiceUnavailable)
return
}
Expand Down
6 changes: 6 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
bitbucket.org/creachadair/shell v0.0.6 h1:reJflDbKqnlnqb4Oo2pQ1/BqmY/eCWcNGHrIUO8qIzc=
bitbucket.org/creachadair/shell v0.0.6/go.mod h1:8Qqi/cYk7vPnsOePHroKXDJYmb5x7ENhtiFtfZq8K+M=
cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
cloud.google.com/go v0.34.0 h1:eOI3/cP2VTU6uZLDYAoic+eyzzB9YyGmJ7eIjl8rOPg=
Expand Down Expand Up @@ -43,6 +44,7 @@ cloud.google.com/go/storage v1.6.0/go.mod h1:N7U0C8pVQ/+NIKOBQyamJIeKQKkZ+mxpohl
cloud.google.com/go/storage v1.8.0 h1:86K1Gel7BQ9/WmNWn7dTKMvTLFzwtBe5FNqYbi9X35g=
cloud.google.com/go/storage v1.8.0/go.mod h1:Wv1Oy7z6Yz3DshWRJFhqM/UCfaWIRTdp0RXyy7KQOVs=
contrib.go.opencensus.io/exporter/stackdriver v0.12.1/go.mod h1:iwB6wGarfphGGe/e5CWqyUk/cLzKnWsOKPVW3no6OTw=
contrib.go.opencensus.io/exporter/stackdriver v0.13.0 h1:Jaz7WbqjtfoCPa1KbfisCX+P5aM3DizEY9pQMU0oAQo=
contrib.go.opencensus.io/exporter/stackdriver v0.13.0/go.mod h1:z2tyTZtPmQ2HvWH4cOmVDgtY+1lomfKdbLnkJvZdc8c=
contrib.go.opencensus.io/resource v0.1.1/go.mod h1:F361eGI91LCmW1I/Saf+rX0+OFcigGlFvXwEGEnkRLA=
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
Expand Down Expand Up @@ -76,6 +78,7 @@ github.com/armon/go-radix v0.0.0-20180808171621-7fddfc383310/go.mod h1:ufUuZ+zHj
github.com/aws/aws-sdk-go v1.19.18/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo=
github.com/aws/aws-sdk-go v1.23.20/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo=
github.com/aws/aws-sdk-go v1.25.37/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo=
github.com/aws/aws-sdk-go v1.25.39 h1:1xxya3nsUaFlEZuoE5PWsIEd47RoDV/kkOGt0qEuwNw=
github.com/aws/aws-sdk-go v1.25.39/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8=
Expand All @@ -85,6 +88,7 @@ github.com/bgentry/speakeasy v0.1.0 h1:ByYyxL9InA1OWqxJqqp2A5pYHUrCiAL6K3J+LKSsQ
github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs=
github.com/bketelsen/crypt v0.0.3-0.20200106085610-5cbc8cc4026c/go.mod h1:MKsuJmJgSg28kpZDP6UIiPt0e0Oz0kqKNGyRaWEPv84=
github.com/census-instrumentation/opencensus-proto v0.2.0/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/census-instrumentation/opencensus-proto v0.2.1 h1:glEXhBS5PSLLv4IXzLA5yPRVX4bilULVyxxbrfOtDAk=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko=
github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc=
Expand Down Expand Up @@ -317,6 +321,7 @@ github.com/inconshreveable/mousetrap v1.0.0 h1:Z8tu5sraLXCXIcARxBp/8cbvlwVa7Z1NH
github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
github.com/jhump/protoreflect v1.5.0 h1:NgpVT+dX71c8hZnxHof2M7QDK7QtohIJ7DYycjnkyfc=
github.com/jhump/protoreflect v1.5.0/go.mod h1:eaTn3RZAmMBcV0fifFvlm6VHNz3wSkYyXYWUh7ymB74=
github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af h1:pmfjZENx5imkbgOkpRUYLnmbU7UEFbjtDA2hxJ1ichM=
github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k=
github.com/jonboulle/clockwork v0.1.0 h1:VKV+ZcuP6l3yW9doeqz6ziZGgcynBVQO+obU0+0hcPo=
github.com/jonboulle/clockwork v0.1.0/go.mod h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22gdxWY5EU2bo=
Expand Down Expand Up @@ -831,6 +836,7 @@ google.golang.org/genproto v0.0.0-20200511104702-f5ebc3bea380/go.mod h1:55QSHmfG
google.golang.org/genproto v0.0.0-20200513103714-09dca8ec2884/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c=
google.golang.org/genproto v0.0.0-20200515170657-fc4c6c6a6587 h1:1Ym+vvUpq1ZHvxzn34gENJX8U4aKO+vhy2P/2+Xl6qQ=
google.golang.org/genproto v0.0.0-20200515170657-fc4c6c6a6587/go.mod h1:YsZOwe1myG/8QRHRsmBRE1LrgQY60beZKjly0O1fX9U=
google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013 h1:+kGHl1aib/qcwaRi1CbqBZ1rk19r85MNUf8HaBghugY=
google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo=
google.golang.org/grpc v1.8.0/go.mod h1:yo6s7OP7yaDglbqo1J04qKzAhqBH6lvTonzMVmEdcZw=
google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
Expand Down
2 changes: 1 addition & 1 deletion impl/mysql/dberrors.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package sql
package mysql

import (
"errors"
Expand Down
2 changes: 1 addition & 1 deletion impl/mysql/dberrors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package sql
package mysql

import (
"fmt"
Expand Down
4 changes: 2 additions & 2 deletions impl/mysql/open.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

// Package sql provides functions for interacting with MySQL.
package sql
// Package mysql provides functions for interacting with MySQL.
package mysql

import (
"database/sql"
Expand Down
110 changes: 110 additions & 0 deletions impl/storage.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
// Copyright 2020 Google Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package impl

import (
"context"
"fmt"

"cloud.google.com/go/spanner"
"github.com/google/keytransparency/core/directory"
"github.com/google/keytransparency/core/sequencer"
"github.com/google/keytransparency/core/water"
"github.com/google/keytransparency/impl/mysql"

pb "github.com/google/keytransparency/core/api/v1/keytransparency_go_proto"
mysqldir "github.com/google/keytransparency/impl/mysql/directory"
mysqlmutations "github.com/google/keytransparency/impl/mysql/mutationstorage"
spanbatch "github.com/google/keytransparency/impl/spanner/batch"
spandir "github.com/google/keytransparency/impl/spanner/directory"
spanmutations "github.com/google/keytransparency/impl/spanner/mutations"
)

// Storage holds an abstract storage implementation
type Storage struct {
Directories directory.Storage
Logs interface {
sequencer.LogsReader

// Copied methods from adminserver.LogsAdmin because of duplicate method.

// AddLogs creates and adds new logs for writing to a directory.
AddLogs(ctx context.Context, directoryID string, logIDs ...int64) error
// SetWritable enables or disables new writes from going to logID.
SetWritable(ctx context.Context, directoryID string, logID int64, enabled bool) error

// Copied methods from keyserver.MutationLogs
SendBatch(ctx context.Context, directoryID string, logID int64, batch []*pb.EntryUpdate) (water.Mark, error)
}
Comment on lines +38 to +50
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you get a Go Native to review this? It looks a bit clunky to me.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a bit of a known issue: golang/go#6977
https://golang.org/ref/spec#Uniqueness_of_identifiers

I think these could be merged if the interfaces were in the same package, because they are in different packages, they are different identifiers.

Batches sequencer.Batcher
healthCheck func() error
Close func()
}

// HealthCheck reports on the health of the underlying database connection.
func (s *Storage) HealthCheck() error { return s.healthCheck() }

// StorageEngines returns a list of supported storage engines.
func StorageEngines() []string { return []string{"mysql", "spanner"} }

// NewStorage returns a Storage with the requested engine.
func NewStorage(ctx context.Context, engine, db string) (*Storage, error) {
switch engine {
case "mysql":
return mysqlStorage(db)
case "spanner":
return spannerStorage(ctx, db)
default:
return nil, fmt.Errorf("unknown db engine %s", engine)
}
}

func spannerStorage(ctx context.Context, db string) (*Storage, error) {
spanClient, err := spanner.NewClient(ctx, db)
if err != nil {
return nil, err
}
return &Storage{
Directories: spandir.New(spanClient),
Batches: spanbatch.New(spanClient),
Logs: spanmutations.New(spanClient),
healthCheck: func() error { return nil },
Close: spanClient.Close,
}, nil
}

func mysqlStorage(db string) (*Storage, error) {
sqldb, err := mysql.Open(db)
if err != nil {
return nil, err
}
directories, err := mysqldir.NewStorage(sqldb)
if err != nil {
sqldb.Close()
return nil, fmt.Errorf("failed to create directory storage: %w", err)
}
logs, err := mysqlmutations.New(sqldb)
if err != nil {
sqldb.Close()
return nil, fmt.Errorf("failed to create mutations storage: %w", err)
}
return &Storage{
Directories: directories,
Batches: logs,
Logs: logs,
healthCheck: sqldb.Ping,
Close: func() { sqldb.Close() },
}, nil
}