diff --git a/conn/node.go b/conn/node.go index a27fecab972..01416f0400c 100644 --- a/conn/node.go +++ b/conn/node.go @@ -38,6 +38,7 @@ import ( "github.com/dgraph-io/dgraph/protos/pb" "github.com/dgraph-io/dgraph/raftwal" "github.com/dgraph-io/dgraph/x" + "github.com/dgraph-io/ristretto/z" ) var ( @@ -647,7 +648,7 @@ func (n *Node) WaitLinearizableRead(ctx context.Context) error { } // RunReadIndexLoop runs the RAFT index in a loop. -func (n *Node) RunReadIndexLoop(closer *y.Closer, readStateCh <-chan raft.ReadState) { +func (n *Node) RunReadIndexLoop(closer *z.Closer, readStateCh <-chan raft.ReadState) { defer closer.Done() readIndex := func(activeRctx []byte) (uint64, error) { // Read Request can get rejected then we would wait indefinitely on the channel diff --git a/conn/pool.go b/conn/pool.go index d81d3d36812..a822b2a4474 100644 --- a/conn/pool.go +++ b/conn/pool.go @@ -21,10 +21,10 @@ import ( "sync" "time" - "github.com/dgraph-io/badger/v2/y" "github.com/dgraph-io/dgo/v200/protos/api" "github.com/dgraph-io/dgraph/protos/pb" "github.com/dgraph-io/dgraph/x" + "github.com/dgraph-io/ristretto/z" "github.com/golang/glog" "github.com/pkg/errors" "go.opencensus.io/plugin/ocgrpc" @@ -50,7 +50,7 @@ type Pool struct { lastEcho time.Time Addr string - closer *y.Closer + closer *z.Closer healthInfo pb.HealthInfo } @@ -175,7 +175,7 @@ func newPool(addr string) (*Pool, error) { if err != nil { return nil, err } - pl := &Pool{conn: conn, Addr: addr, lastEcho: time.Now(), closer: y.NewCloser(1)} + pl := &Pool{conn: conn, Addr: addr, lastEcho: time.Now(), closer: z.NewCloser(1)} go pl.MonitorHealth() return pl, nil } diff --git a/dgraph/cmd/alpha/run.go b/dgraph/cmd/alpha/run.go index 9055abc8148..55aa21e0f3f 100644 --- a/dgraph/cmd/alpha/run.go +++ b/dgraph/cmd/alpha/run.go @@ -36,7 +36,6 @@ import ( "github.com/dgraph-io/dgraph/graphql/web" - "github.com/dgraph-io/badger/v2/y" "github.com/dgraph-io/dgo/v200/protos/api" "github.com/dgraph-io/dgraph/edgraph" "github.com/dgraph-io/dgraph/ee/enc" @@ -46,6 +45,7 @@ import ( "github.com/dgraph-io/dgraph/tok" "github.com/dgraph-io/dgraph/worker" "github.com/dgraph-io/dgraph/x" + "github.com/dgraph-io/ristretto/z" "github.com/golang/glog" "github.com/pkg/errors" "github.com/spf13/cast" @@ -388,7 +388,7 @@ func setupListener(addr string, port int) (net.Listener, error) { return net.Listen("tcp", fmt.Sprintf("%s:%d", addr, port)) } -func serveGRPC(l net.Listener, tlsCfg *tls.Config, closer *y.Closer) { +func serveGRPC(l net.Listener, tlsCfg *tls.Config, closer *z.Closer) { defer closer.Done() x.RegisterExporters(Alpha.Conf, "dgraph.alpha") @@ -411,7 +411,7 @@ func serveGRPC(l net.Listener, tlsCfg *tls.Config, closer *y.Closer) { s.Stop() } -func serveHTTP(l net.Listener, tlsCfg *tls.Config, closer *y.Closer) { +func serveHTTP(l net.Listener, tlsCfg *tls.Config, closer *z.Closer) { defer closer.Done() srv := &http.Server{ ReadTimeout: 10 * time.Second, @@ -434,7 +434,7 @@ func serveHTTP(l net.Listener, tlsCfg *tls.Config, closer *y.Closer) { } } -func setupServer(closer *y.Closer) { +func setupServer(closer *z.Closer) { go worker.RunServer(bindall) // For pb.communication. laddr := "localhost" @@ -546,7 +546,7 @@ func setupServer(closer *y.Closer) { http.HandleFunc("/ui/keywords", keywordHandler) // Initialize the servers. - admin.ServerCloser = y.NewCloser(3) + admin.ServerCloser = z.NewCloser(3) go serveGRPC(grpcListener, tlsCfg, admin.ServerCloser) go serveHTTP(httpListener, tlsCfg, admin.ServerCloser) @@ -759,7 +759,7 @@ func run() { }() // Setup external communication. - aclCloser := y.NewCloser(1) + aclCloser := z.NewCloser(1) go func() { worker.StartRaftNodes(worker.State.WALstore, bindall) // initialization of the admin account can only be done after raft nodes are running @@ -770,7 +770,7 @@ func run() { // Graphql subscribes to alpha to get schema updates. We need to close that before we // close alpha. This closer is for closing and waiting that subscription. - adminCloser := y.NewCloser(1) + adminCloser := z.NewCloser(1) setupServer(adminCloser) glog.Infoln("GRPC and HTTP stopped.") diff --git a/dgraph/cmd/bulk/reduce.go b/dgraph/cmd/bulk/reduce.go index e9bb3e35fba..6204b220fca 100644 --- a/dgraph/cmd/bulk/reduce.go +++ b/dgraph/cmd/bulk/reduce.go @@ -43,6 +43,7 @@ import ( "github.com/dgraph-io/dgraph/protos/pb" "github.com/dgraph-io/dgraph/worker" "github.com/dgraph-io/dgraph/x" + "github.com/dgraph-io/ristretto/z" ) type reducer struct { @@ -333,7 +334,7 @@ func (r *reducer) streamIdFor(pred string) uint32 { return streamId } -func (r *reducer) encode(entryCh chan *encodeRequest, closer *y.Closer) { +func (r *reducer) encode(entryCh chan *encodeRequest, closer *z.Closer) { defer closer.Done() for req := range entryCh { @@ -383,7 +384,7 @@ func (r *reducer) writeTmpSplits(ci *countIndexer, kvsCh chan *bpb.KVList, wg *s x.Check(ci.splitWriter.Flush()) } -func (r *reducer) startWriting(ci *countIndexer, writerCh chan *encodeRequest, closer *y.Closer) { +func (r *reducer) startWriting(ci *countIndexer, writerCh chan *encodeRequest, closer *z.Closer) { defer closer.Done() // Concurrently write split lists to a temporary badger. @@ -456,14 +457,14 @@ func (r *reducer) reduce(partitionKeys [][]byte, mapItrs []*mapIterator, ci *cou fmt.Printf("Num CPUs: %d\n", cpu) encoderCh := make(chan *encodeRequest, 2*cpu) writerCh := make(chan *encodeRequest, 2*cpu) - encoderCloser := y.NewCloser(cpu) + encoderCloser := z.NewCloser(cpu) for i := 0; i < cpu; i++ { // Start listening to encode entries // For time being let's lease 100 stream id for each encoder. go r.encode(encoderCh, encoderCloser) } // Start listening to write the badger list. - writerCloser := y.NewCloser(1) + writerCloser := z.NewCloser(1) go r.startWriting(ci, writerCh, writerCloser) for i := 0; i < len(partitionKeys); i++ { diff --git a/dgraph/cmd/zero/license.go b/dgraph/cmd/zero/license.go index 84f0c75e4b6..6de7366fe6b 100644 --- a/dgraph/cmd/zero/license.go +++ b/dgraph/cmd/zero/license.go @@ -21,8 +21,8 @@ package zero import ( "net/http" - "github.com/dgraph-io/badger/v2/y" "github.com/dgraph-io/dgraph/protos/pb" + "github.com/dgraph-io/ristretto/z" ) // dummy function as enterprise features are not available in oss binary. @@ -31,7 +31,7 @@ func (n *node) proposeTrialLicense() error { } // periodically checks the validity of the enterprise license and updates the membership state. -func (n *node) updateEnterpriseState(closer *y.Closer) { +func (n *node) updateEnterpriseState(closer *z.Closer) { closer.Done() } diff --git a/dgraph/cmd/zero/license_ee.go b/dgraph/cmd/zero/license_ee.go index 29ff9f64c70..95f8ef3c3d9 100644 --- a/dgraph/cmd/zero/license_ee.go +++ b/dgraph/cmd/zero/license_ee.go @@ -20,9 +20,9 @@ import ( "net/http" "time" - "github.com/dgraph-io/badger/v2/y" "github.com/dgraph-io/dgraph/protos/pb" "github.com/dgraph-io/dgraph/x" + "github.com/dgraph-io/ristretto/z" humanize "github.com/dustin/go-humanize" "github.com/gogo/protobuf/proto" "github.com/golang/glog" @@ -61,7 +61,7 @@ func (s *Server) expireLicense() { // periodically checks the validity of the enterprise license and // 1. Sets license.Enabled to false in membership state if license has expired. // 2. Prints out warning once every day a week before the license is set to expire. -func (n *node) updateEnterpriseState(closer *y.Closer) { +func (n *node) updateEnterpriseState(closer *z.Closer) { defer closer.Done() interval := 5 * time.Second diff --git a/dgraph/cmd/zero/raft.go b/dgraph/cmd/zero/raft.go index 120cb70bab4..bf2f39e58fa 100644 --- a/dgraph/cmd/zero/raft.go +++ b/dgraph/cmd/zero/raft.go @@ -28,10 +28,10 @@ import ( otrace "go.opencensus.io/trace" - "github.com/dgraph-io/badger/v2/y" "github.com/dgraph-io/dgraph/conn" "github.com/dgraph-io/dgraph/protos/pb" "github.com/dgraph-io/dgraph/x" + "github.com/dgraph-io/ristretto/z" farm "github.com/dgryski/go-farm" "github.com/golang/glog" "github.com/google/uuid" @@ -44,7 +44,7 @@ type node struct { *conn.Node server *Server ctx context.Context - closer *y.Closer // to stop Run. + closer *z.Closer // to stop Run. // The last timestamp when this Zero was able to reach quorum. mu sync.RWMutex @@ -533,7 +533,7 @@ func (n *node) initAndStartNode() error { return nil } -func (n *node) updateZeroMembershipPeriodically(closer *y.Closer) { +func (n *node) updateZeroMembershipPeriodically(closer *z.Closer) { defer closer.Done() ticker := time.NewTicker(10 * time.Second) defer ticker.Stop() @@ -550,7 +550,7 @@ func (n *node) updateZeroMembershipPeriodically(closer *y.Closer) { var startOption = otrace.WithSampler(otrace.ProbabilitySampler(0.01)) -func (n *node) checkQuorum(closer *y.Closer) { +func (n *node) checkQuorum(closer *z.Closer) { defer closer.Done() ticker := time.NewTicker(time.Second) defer ticker.Stop() @@ -588,7 +588,7 @@ func (n *node) checkQuorum(closer *y.Closer) { } } -func (n *node) snapshotPeriodically(closer *y.Closer) { +func (n *node) snapshotPeriodically(closer *z.Closer) { defer closer.Done() ticker := time.NewTicker(10 * time.Second) defer ticker.Stop() @@ -630,7 +630,7 @@ func (n *node) Run() { // snapshot can cause select loop to block while deleting entries, so run // it in goroutine readStateCh := make(chan raft.ReadState, 100) - closer := y.NewCloser(5) + closer := z.NewCloser(5) defer func() { closer.SignalAndWait() n.closer.Done() diff --git a/dgraph/cmd/zero/run.go b/dgraph/cmd/zero/run.go index 1e613e54e5b..8f7d47fc917 100644 --- a/dgraph/cmd/zero/run.go +++ b/dgraph/cmd/zero/run.go @@ -35,12 +35,12 @@ import ( "github.com/dgraph-io/badger/v2" bopt "github.com/dgraph-io/badger/v2/options" - "github.com/dgraph-io/badger/v2/y" "github.com/dgraph-io/dgraph/conn" "github.com/dgraph-io/dgraph/ee/enc" "github.com/dgraph-io/dgraph/protos/pb" "github.com/dgraph-io/dgraph/raftwal" "github.com/dgraph-io/dgraph/x" + "github.com/dgraph-io/ristretto/z" "github.com/golang/glog" "github.com/spf13/cobra" ) @@ -151,7 +151,7 @@ func (st *state) serveGRPC(l net.Listener, store *raftwal.DiskStorage) { m.Cfg.DisableProposalForwarding = true st.rs = conn.NewRaftServer(m) - st.node = &node{Node: m, ctx: context.Background(), closer: y.NewCloser(1)} + st.node = &node{Node: m, ctx: context.Background(), closer: z.NewCloser(1)} st.zero = &Server{NumReplicas: opts.numReplicas, Node: st.node} st.zero.Init() st.node.server = st.zero @@ -301,7 +301,7 @@ func run() { x.Checkf(err, "Error while opening WAL store") defer kv.Close() - gcCloser := y.NewCloser(1) // closer for vLogGC + gcCloser := z.NewCloser(1) // closer for vLogGC go x.RunVlogGC(kv, gcCloser) defer gcCloser.SignalAndWait() diff --git a/dgraph/cmd/zero/zero.go b/dgraph/cmd/zero/zero.go index ed6b2bb184e..9f326fca396 100644 --- a/dgraph/cmd/zero/zero.go +++ b/dgraph/cmd/zero/zero.go @@ -26,12 +26,12 @@ import ( otrace "go.opencensus.io/trace" - "github.com/dgraph-io/badger/v2/y" "github.com/dgraph-io/dgo/v200/protos/api" "github.com/dgraph-io/dgraph/conn" "github.com/dgraph-io/dgraph/protos/pb" "github.com/dgraph-io/dgraph/telemetry" "github.com/dgraph-io/dgraph/x" + "github.com/dgraph-io/ristretto/z" "github.com/gogo/protobuf/proto" "github.com/golang/glog" "github.com/pkg/errors" @@ -66,7 +66,7 @@ type Server struct { // groupMap map[uint32]*Group nextGroup uint32 leaderChangeCh chan struct{} - closer *y.Closer // Used to tell stream to close. + closer *z.Closer // Used to tell stream to close. connectLock sync.Mutex // Used to serialize connect requests from servers. moveOngoing chan struct{} @@ -89,7 +89,7 @@ func (s *Server) Init() { s.nextTxnTs = 1 s.nextGroup = 1 s.leaderChangeCh = make(chan struct{}, 1) - s.closer = y.NewCloser(2) // grpc and http + s.closer = z.NewCloser(2) // grpc and http s.blockCommitsOn = new(sync.Map) s.moveOngoing = make(chan struct{}, 1) diff --git a/edgraph/access.go b/edgraph/access.go index c79873105fc..e937938b3de 100644 --- a/edgraph/access.go +++ b/edgraph/access.go @@ -21,11 +21,11 @@ package edgraph import ( "context" - "github.com/dgraph-io/badger/v2/y" "github.com/dgraph-io/dgo/v200/protos/api" "github.com/dgraph-io/dgraph/gql" "github.com/dgraph-io/dgraph/query" "github.com/dgraph-io/dgraph/x" + "github.com/dgraph-io/ristretto/z" "github.com/golang/glog" ) @@ -47,7 +47,7 @@ func ResetAcl() { } // ResetAcls is an empty method since ACL is only supported in the enterprise version. -func RefreshAcls(closer *y.Closer) { +func RefreshAcls(closer *z.Closer) { // do nothing <-closer.HasBeenClosed() closer.Done() diff --git a/edgraph/access_ee.go b/edgraph/access_ee.go index d40b4405ecd..f581238cea5 100644 --- a/edgraph/access_ee.go +++ b/edgraph/access_ee.go @@ -20,13 +20,12 @@ import ( "time" "github.com/dgraph-io/dgraph/protos/pb" + "github.com/dgraph-io/ristretto/z" "github.com/dgraph-io/dgraph/query" "github.com/pkg/errors" - "github.com/dgraph-io/badger/v2/y" - "github.com/dgraph-io/dgo/v200/protos/api" "github.com/dgraph-io/dgraph/ee/acl" "github.com/dgraph-io/dgraph/gql" @@ -305,7 +304,7 @@ func authorizeUser(ctx context.Context, userid string, password string) ( } // RefreshAcls queries for the ACL triples and refreshes the ACLs accordingly. -func RefreshAcls(closer *y.Closer) { +func RefreshAcls(closer *z.Closer) { defer closer.Done() if len(worker.Config.HmacSecret) == 0 { // the acl feature is not turned on diff --git a/go.mod b/go.mod index cb871f26d32..3733d49ed84 100644 --- a/go.mod +++ b/go.mod @@ -19,7 +19,7 @@ require ( github.com/dgraph-io/badger/v2 v2.2007.2-0.20200827131741-d5a25b83fbf4 github.com/dgraph-io/dgo/v200 v200.0.0-20200401175452-e463f9234453 github.com/dgraph-io/graphql-transport-ws v0.0.0-20200715131837-c0460019ead2 - github.com/dgraph-io/ristretto v0.0.3-0.20200630154024-f66de99634de + github.com/dgraph-io/ristretto v0.0.4-0.20200904131139-4dec2770af66 github.com/dgrijalva/jwt-go v3.2.0+incompatible github.com/dgrijalva/jwt-go/v4 v4.0.0-preview1 github.com/dgryski/go-farm v0.0.0-20200201041132-a6ae2369ad13 diff --git a/go.sum b/go.sum index a4f5d4a4481..10283c26ef6 100644 --- a/go.sum +++ b/go.sum @@ -87,8 +87,9 @@ github.com/dgraph-io/dgo/v200 v200.0.0-20200401175452-e463f9234453 h1:DTgOrw91nM github.com/dgraph-io/dgo/v200 v200.0.0-20200401175452-e463f9234453/go.mod h1:Co+FwJrnndSrPORO8Gdn20dR7FPTfmXr0W/su0Ve/Ig= github.com/dgraph-io/graphql-transport-ws v0.0.0-20200715131837-c0460019ead2 h1:NSl3XXyON9bgmBJSAvr5FPrgILAovtoTs7FwdtaZZq0= github.com/dgraph-io/graphql-transport-ws v0.0.0-20200715131837-c0460019ead2/go.mod h1:7z3c/5w0sMYYZF5bHsrh8IH4fKwG5O5Y70cPH1ZLLRQ= -github.com/dgraph-io/ristretto v0.0.3-0.20200630154024-f66de99634de h1:t0UHb5vdojIDUqktM6+xJAfScFBsVpXZmqC9dsgJmeA= github.com/dgraph-io/ristretto v0.0.3-0.20200630154024-f66de99634de/go.mod h1:KPxhHT9ZxKefz+PCeOGsrHpl1qZ7i70dGTu2u+Ahh6E= +github.com/dgraph-io/ristretto v0.0.4-0.20200904131139-4dec2770af66 h1:ectpJv2tGhTudyk0JhqE/53o/ObH30u5yt/yThsAn3I= +github.com/dgraph-io/ristretto v0.0.4-0.20200904131139-4dec2770af66/go.mod h1:KPxhHT9ZxKefz+PCeOGsrHpl1qZ7i70dGTu2u+Ahh6E= github.com/dgrijalva/jwt-go v3.2.0+incompatible h1:7qlOGliEKZXTDg6OTjfoBKDXWrumCAMpl/TFQ4/5kLM= github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ= github.com/dgrijalva/jwt-go/v4 v4.0.0-preview1 h1:CaO/zOnF8VvUfEbhRatPcwKVWamvbYd8tQGRWacE9kU= diff --git a/graphql/admin/admin.go b/graphql/admin/admin.go index fa0e2dd5d82..9a98724b489 100644 --- a/graphql/admin/admin.go +++ b/graphql/admin/admin.go @@ -26,7 +26,6 @@ import ( "github.com/pkg/errors" badgerpb "github.com/dgraph-io/badger/v2/pb" - "github.com/dgraph-io/badger/v2/y" "github.com/dgraph-io/dgraph/edgraph" "github.com/dgraph-io/dgraph/graphql/resolve" "github.com/dgraph-io/dgraph/graphql/schema" @@ -35,6 +34,7 @@ import ( "github.com/dgraph-io/dgraph/query" "github.com/dgraph-io/dgraph/worker" "github.com/dgraph-io/dgraph/x" + "github.com/dgraph-io/ristretto/z" ) const ( @@ -382,7 +382,7 @@ type adminServer struct { // NewServers initializes the GraphQL servers. It sets up an empty server for the // main /graphql endpoint and an admin server. The result is mainServer, adminServer. -func NewServers(withIntrospection bool, globalEpoch *uint64, closer *y.Closer) (web.IServeGraphQL, +func NewServers(withIntrospection bool, globalEpoch *uint64, closer *z.Closer) (web.IServeGraphQL, web.IServeGraphQL, *GraphQLHealthStore) { gqlSchema, err := schema.FromString("") if err != nil { @@ -411,7 +411,7 @@ func newAdminResolver( fns *resolve.ResolverFns, withIntrospection bool, epoch *uint64, - closer *y.Closer) *resolve.RequestResolver { + closer *z.Closer) *resolve.RequestResolver { adminSchema, err := schema.FromString(graphqlAdminSchema) if err != nil { diff --git a/graphql/admin/shutdown.go b/graphql/admin/shutdown.go index 5fb6bf63344..e3d3469c16d 100644 --- a/graphql/admin/shutdown.go +++ b/graphql/admin/shutdown.go @@ -19,16 +19,16 @@ package admin import ( "context" - "github.com/dgraph-io/badger/v2/y" "github.com/dgraph-io/dgraph/graphql/resolve" "github.com/dgraph-io/dgraph/graphql/schema" + "github.com/dgraph-io/ristretto/z" "github.com/golang/glog" ) var ( // ServerCloser is used to signal and wait for other goroutines to return gracefully after user // requests shutdown. - ServerCloser *y.Closer + ServerCloser *z.Closer ) func resolveShutdown(ctx context.Context, m schema.Mutation) (*resolve.Resolved, bool) { diff --git a/posting/lists.go b/posting/lists.go index adf0e3a2ea2..a61ff264a34 100644 --- a/posting/lists.go +++ b/posting/lists.go @@ -32,12 +32,12 @@ import ( ostats "go.opencensus.io/stats" "github.com/dgraph-io/badger/v2" - "github.com/dgraph-io/badger/v2/y" "github.com/dgraph-io/dgo/v200/protos/api" - "github.com/golang/glog" - "github.com/dgraph-io/dgraph/protos/pb" "github.com/dgraph-io/dgraph/x" + "github.com/dgraph-io/ristretto/z" + + "github.com/golang/glog" ) const ( @@ -90,7 +90,7 @@ func getMemUsage() int { return rss * os.Getpagesize() } -func updateMemoryMetrics(lc *y.Closer) { +func updateMemoryMetrics(lc *z.Closer) { defer lc.Done() ticker := time.NewTicker(time.Minute) defer ticker.Stop() @@ -130,13 +130,13 @@ func updateMemoryMetrics(lc *y.Closer) { var ( pstore *badger.DB - closer *y.Closer + closer *z.Closer ) // Init initializes the posting lists package, the in memory and dirty list hash. func Init(ps *badger.DB) { pstore = ps - closer = y.NewCloser(1) + closer = z.NewCloser(1) go updateMemoryMetrics(closer) } diff --git a/posting/mvcc.go b/posting/mvcc.go index c1bcc99b5fb..7baa91f76c5 100644 --- a/posting/mvcc.go +++ b/posting/mvcc.go @@ -27,7 +27,6 @@ import ( "github.com/dgraph-io/badger/v2" bpb "github.com/dgraph-io/badger/v2/pb" - "github.com/dgraph-io/badger/v2/y" "github.com/dgraph-io/dgo/v200/protos/api" "github.com/dgraph-io/dgraph/protos/pb" "github.com/dgraph-io/dgraph/x" @@ -101,7 +100,7 @@ func (ir *incrRollupi) addKeyToBatch(key []byte) { } // Process will rollup batches of 64 keys in a go routine. -func (ir *incrRollupi) Process(closer *y.Closer) { +func (ir *incrRollupi) Process(closer *z.Closer) { defer closer.Done() writer := NewTxnWriter(pstore) diff --git a/raftwal/storage.go b/raftwal/storage.go index a3ad8bc4b1b..be3e399869e 100644 --- a/raftwal/storage.go +++ b/raftwal/storage.go @@ -23,9 +23,9 @@ import ( "sync" "github.com/dgraph-io/badger/v2" - "github.com/dgraph-io/badger/v2/y" "github.com/dgraph-io/dgraph/protos/pb" "github.com/dgraph-io/dgraph/x" + "github.com/dgraph-io/ristretto/z" "github.com/gogo/protobuf/proto" "github.com/golang/glog" "github.com/pkg/errors" @@ -42,7 +42,7 @@ type DiskStorage struct { elog trace.EventLog cache *sync.Map - Closer *y.Closer + Closer *z.Closer indexRangeChan chan indexRange } @@ -57,7 +57,7 @@ func Init(db *badger.DB, id uint64, gid uint32) *DiskStorage { id: id, gid: gid, cache: new(sync.Map), - Closer: y.NewCloser(1), + Closer: z.NewCloser(1), indexRangeChan: make(chan indexRange, 16), } if prev, err := RaftId(db); err != nil || prev != id { diff --git a/worker/draft.go b/worker/draft.go index 8156790414f..ef9fdf65aab 100644 --- a/worker/draft.go +++ b/worker/draft.go @@ -38,7 +38,6 @@ import ( otrace "go.opencensus.io/trace" bpb "github.com/dgraph-io/badger/v2/pb" - "github.com/dgraph-io/badger/v2/y" "github.com/dgraph-io/dgraph/conn" "github.com/dgraph-io/dgraph/dgraph/cmd/zero" "github.com/dgraph-io/dgraph/posting" @@ -47,6 +46,7 @@ import ( "github.com/dgraph-io/dgraph/schema" "github.com/dgraph-io/dgraph/types" "github.com/dgraph-io/dgraph/x" + "github.com/dgraph-io/ristretto/z" ) type node struct { @@ -60,12 +60,12 @@ type node struct { applyCh chan []*pb.Proposal ctx context.Context gid uint32 - closer *y.Closer + closer *z.Closer streaming int32 // Used to avoid calculating snapshot // Used to track the ops going on in the system. - ops map[op]*y.Closer + ops map[op]*z.Closer opsLock sync.Mutex canCampaign bool @@ -107,7 +107,7 @@ const ( // Restore operations have preference and cancel all other operations, not just rollups. // You should only call Done() on the returned closer. Calling other functions (such as // SignalAndWait) for closer could result in panics. For more details, see GitHub issue #5034. -func (n *node) startTask(id op) (*y.Closer, error) { +func (n *node) startTask(id op) (*z.Closer, error) { n.opsLock.Lock() defer n.opsLock.Unlock() @@ -126,7 +126,7 @@ func (n *node) startTask(id op) (*y.Closer, error) { } } - closer := y.NewCloser(1) + closer := z.NewCloser(1) switch id { case opRollup: if len(n.ops) > 0 { @@ -172,7 +172,7 @@ func (n *node) startTask(id op) (*y.Closer, error) { n.ops[id] = closer glog.Infof("Operation started with id: %s", id) - go func(id op, closer *y.Closer) { + go func(id op, closer *z.Closer) { closer.Wait() stopTask(id) }(id, closer) @@ -239,8 +239,8 @@ func newNode(store *raftwal.DiskStorage, gid uint32, id uint64, myAddr string) * // to maintain quorum health. applyCh: make(chan []*pb.Proposal, 1000), elog: trace.NewEventLog("Dgraph", "ApplyCh"), - closer: y.NewCloser(4), // Matches CLOSER:1 - ops: make(map[op]*y.Closer), + closer: z.NewCloser(4), // Matches CLOSER:1 + ops: make(map[op]*z.Closer), } if x.WorkerConfig.LudicrousMode { n.ex = newExecutor(&m.Applied) @@ -604,7 +604,7 @@ func (n *node) applyCommitted(proposal *pb.Proposal) error { defer x.UpdateDrainingMode(false) var err error - var closer *y.Closer + var closer *z.Closer closer, err = n.startTask(opRestore) if err != nil { return errors.Wrapf(err, "cannot start restore task") @@ -1010,7 +1010,7 @@ func (n *node) Run() { go n.ReportRaftComms() if x.WorkerConfig.LudicrousMode { - closer := y.NewCloser(2) + closer := z.NewCloser(2) defer closer.SignalAndWait() go x.StoreSync(n.Store, closer) go x.StoreSync(pstore, closer) diff --git a/worker/executor.go b/worker/executor.go index 8e087878ab8..e88acb8b0e1 100644 --- a/worker/executor.go +++ b/worker/executor.go @@ -26,6 +26,8 @@ import ( "github.com/dgraph-io/badger/v2/y" "github.com/dgraph-io/dgraph/posting" "github.com/dgraph-io/dgraph/protos/pb" + "github.com/dgraph-io/ristretto/z" + "github.com/golang/glog" ) @@ -42,14 +44,14 @@ type executor struct { sync.RWMutex predChan map[string]chan *subMutation - closer *y.Closer + closer *z.Closer applied *y.WaterMark } func newExecutor(applied *y.WaterMark) *executor { ex := &executor{ predChan: make(map[string]chan *subMutation), - closer: y.NewCloser(0), + closer: z.NewCloser(0), applied: applied, } go ex.shutdown() diff --git a/worker/groups.go b/worker/groups.go index 690c702c6c2..46be4f8a43e 100644 --- a/worker/groups.go +++ b/worker/groups.go @@ -27,7 +27,6 @@ import ( "github.com/dgraph-io/badger/v2" badgerpb "github.com/dgraph-io/badger/v2/pb" - "github.com/dgraph-io/badger/v2/y" "github.com/dgraph-io/dgo/v200/protos/api" "github.com/dgraph-io/dgraph/conn" "github.com/dgraph-io/dgraph/ee/enc" @@ -35,6 +34,7 @@ import ( "github.com/dgraph-io/dgraph/raftwal" "github.com/dgraph-io/dgraph/schema" "github.com/dgraph-io/dgraph/x" + "github.com/dgraph-io/ristretto/z" "github.com/golang/glog" "github.com/golang/protobuf/proto" "github.com/pkg/errors" @@ -51,7 +51,7 @@ type groupi struct { tablets map[string]*pb.Tablet triggerCh chan struct{} // Used to trigger membership sync blockDeletes *sync.Mutex // Ensure that deletion won't happen when move is going on. - closer *y.Closer + closer *z.Closer // Group checksum is used to determine if the tablets served by the groups have changed from // the membership information that the Alpha has. If so, Alpha cannot service a read. @@ -154,7 +154,7 @@ func StartRaftNodes(walStore *badger.DB, bindall bool) { x.UpdateHealthStatus(true) glog.Infof("Server is ready") - gr.closer = y.NewCloser(3) // Match CLOSER:1 in this file. + gr.closer = z.NewCloser(3) // Match CLOSER:1 in this file. go gr.sendMembershipUpdates() go gr.receiveMembershipUpdates() go gr.processOracleDeltaStream() @@ -1082,7 +1082,7 @@ func askZeroForEE() bool { // SubscribeForUpdates will listen for updates for the given group. func SubscribeForUpdates(prefixes [][]byte, cb func(kvs *badgerpb.KVList), group uint32, - closer *y.Closer) { + closer *z.Closer) { defer closer.Done() for { diff --git a/worker/mutation.go b/worker/mutation.go index 4bf39ec345b..0a958d7e215 100644 --- a/worker/mutation.go +++ b/worker/mutation.go @@ -29,7 +29,6 @@ import ( otrace "go.opencensus.io/trace" "github.com/dgraph-io/badger/v2" - "github.com/dgraph-io/badger/v2/y" "github.com/dgraph-io/dgo/v200" "github.com/dgraph-io/dgo/v200/protos/api" "github.com/dgraph-io/dgraph/conn" @@ -38,6 +37,7 @@ import ( "github.com/dgraph-io/dgraph/schema" "github.com/dgraph-io/dgraph/types" "github.com/dgraph-io/dgraph/x" + "github.com/dgraph-io/ristretto/z" ) var ( @@ -151,7 +151,7 @@ func runSchemaMutation(ctx context.Context, updates []*pb.SchemaUpdate, startTs // done is used to ensure that we only stop the indexing task once. var done uint32 start := time.Now() - stopIndexing := func(closer *y.Closer) { + stopIndexing := func(closer *z.Closer) { // runSchemaMutation can return. stopIndexing could be called by goroutines. if !schema.State().IndexingInProgress() { if atomic.CompareAndSwapUint32(&done, 0, 1) { diff --git a/worker/server_state.go b/worker/server_state.go index bca0dea26af..5bf6e82327e 100644 --- a/worker/server_state.go +++ b/worker/server_state.go @@ -24,9 +24,9 @@ import ( "github.com/dgraph-io/badger/v2" "github.com/dgraph-io/badger/v2/options" - "github.com/dgraph-io/badger/v2/y" "github.com/dgraph-io/dgraph/protos/pb" "github.com/dgraph-io/dgraph/x" + "github.com/dgraph-io/ristretto/z" "github.com/golang/glog" ) @@ -36,7 +36,7 @@ type ServerState struct { Pstore *badger.DB WALstore *badger.DB - gcCloser *y.Closer // closer for valueLogGC + gcCloser *z.Closer // closer for valueLogGC needTs chan tsReq } @@ -179,7 +179,7 @@ func (s *ServerState) initStorage() { opt.EncryptionKey = nil } - s.gcCloser = y.NewCloser(2) + s.gcCloser = z.NewCloser(2) go x.RunVlogGC(s.Pstore, s.gcCloser) go x.RunVlogGC(s.WALstore, s.gcCloser) } diff --git a/x/x.go b/x/x.go index 0e99ab58bf0..d8d0ab28450 100644 --- a/x/x.go +++ b/x/x.go @@ -40,9 +40,9 @@ import ( "google.golang.org/grpc/peer" "github.com/dgraph-io/badger/v2" - "github.com/dgraph-io/badger/v2/y" "github.com/dgraph-io/dgo/v200" "github.com/dgraph-io/dgo/v200/protos/api" + "github.com/dgraph-io/ristretto/z" "github.com/golang/glog" "github.com/pkg/errors" @@ -943,7 +943,7 @@ func IsGuardian(groups []string) bool { // RunVlogGC runs value log gc on store. It runs GC unconditionally after every 10 minutes. // Additionally it also runs GC if vLogSize has grown more than 1 GB in last minute. -func RunVlogGC(store *badger.DB, closer *y.Closer) { +func RunVlogGC(store *badger.DB, closer *z.Closer) { defer closer.Done() // Get initial size on start. _, lastVlogSize := store.Size() @@ -984,7 +984,7 @@ type DB interface { Sync() error } -func StoreSync(db DB, closer *y.Closer) { +func StoreSync(db DB, closer *z.Closer) { defer closer.Done() ticker := time.NewTicker(1 * time.Second) for {