Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use z.Closer instead of y.Closer #6394

Merged
merged 1 commit into from
Sep 4, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion conn/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"github.com/dgraph-io/dgraph/protos/pb"
"github.com/dgraph-io/dgraph/raftwal"
"github.com/dgraph-io/dgraph/x"
"github.com/dgraph-io/ristretto/z"
)

var (
Expand Down Expand Up @@ -647,7 +648,7 @@ func (n *Node) WaitLinearizableRead(ctx context.Context) error {
}

// RunReadIndexLoop runs the RAFT index in a loop.
func (n *Node) RunReadIndexLoop(closer *y.Closer, readStateCh <-chan raft.ReadState) {
func (n *Node) RunReadIndexLoop(closer *z.Closer, readStateCh <-chan raft.ReadState) {
defer closer.Done()
readIndex := func(activeRctx []byte) (uint64, error) {
// Read Request can get rejected then we would wait indefinitely on the channel
Expand Down
6 changes: 3 additions & 3 deletions conn/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ import (
"sync"
"time"

"github.com/dgraph-io/badger/v2/y"
"github.com/dgraph-io/dgo/v200/protos/api"
"github.com/dgraph-io/dgraph/protos/pb"
"github.com/dgraph-io/dgraph/x"
"github.com/dgraph-io/ristretto/z"
"github.com/golang/glog"
"github.com/pkg/errors"
"go.opencensus.io/plugin/ocgrpc"
Expand All @@ -50,7 +50,7 @@ type Pool struct {

lastEcho time.Time
Addr string
closer *y.Closer
closer *z.Closer
healthInfo pb.HealthInfo
}

Expand Down Expand Up @@ -175,7 +175,7 @@ func newPool(addr string) (*Pool, error) {
if err != nil {
return nil, err
}
pl := &Pool{conn: conn, Addr: addr, lastEcho: time.Now(), closer: y.NewCloser(1)}
pl := &Pool{conn: conn, Addr: addr, lastEcho: time.Now(), closer: z.NewCloser(1)}
go pl.MonitorHealth()
return pl, nil
}
Expand Down
16 changes: 8 additions & 8 deletions dgraph/cmd/alpha/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ import (
"time"

badgerpb "github.com/dgraph-io/badger/v2/pb"
"github.com/dgraph-io/badger/v2/y"
"github.com/dgraph-io/dgo/v200/protos/api"
"github.com/dgraph-io/dgraph/edgraph"
"github.com/dgraph-io/dgraph/ee/enc"
Expand All @@ -47,6 +46,7 @@ import (
"github.com/dgraph-io/dgraph/tok"
"github.com/dgraph-io/dgraph/worker"
"github.com/dgraph-io/dgraph/x"
"github.com/dgraph-io/ristretto/z"
"github.com/golang/glog"
"github.com/pkg/errors"
"github.com/spf13/cast"
Expand Down Expand Up @@ -389,7 +389,7 @@ func setupListener(addr string, port int) (net.Listener, error) {
return net.Listen("tcp", fmt.Sprintf("%s:%d", addr, port))
}

func serveGRPC(l net.Listener, tlsCfg *tls.Config, closer *y.Closer) {
func serveGRPC(l net.Listener, tlsCfg *tls.Config, closer *z.Closer) {
defer closer.Done()

x.RegisterExporters(Alpha.Conf, "dgraph.alpha")
Expand All @@ -412,7 +412,7 @@ func serveGRPC(l net.Listener, tlsCfg *tls.Config, closer *y.Closer) {
s.Stop()
}

func serveHTTP(l net.Listener, tlsCfg *tls.Config, closer *y.Closer) {
func serveHTTP(l net.Listener, tlsCfg *tls.Config, closer *z.Closer) {
defer closer.Done()
srv := &http.Server{
ReadTimeout: 10 * time.Second,
Expand All @@ -435,7 +435,7 @@ func serveHTTP(l net.Listener, tlsCfg *tls.Config, closer *y.Closer) {
}
}

func setupServer(closer *y.Closer) {
func setupServer(closer *z.Closer) {
go worker.RunServer(bindall) // For pb.communication.

laddr := "localhost"
Expand Down Expand Up @@ -564,7 +564,7 @@ func setupServer(closer *y.Closer) {
http.HandleFunc("/ui/keywords", keywordHandler)

// Initialize the servers.
admin.ServerCloser = y.NewCloser(3)
admin.ServerCloser = z.NewCloser(3)
go serveGRPC(grpcListener, tlsCfg, admin.ServerCloser)
go serveHTTP(httpListener, tlsCfg, admin.ServerCloser)

Expand Down Expand Up @@ -783,7 +783,7 @@ func run() {
}
}()

updaters := y.NewCloser(4)
updaters := z.NewCloser(4)
go func() {
worker.StartRaftNodes(worker.State.WALstore, bindall)
atomic.AddUint32(&initDone, 1)
Expand All @@ -809,7 +809,7 @@ func run() {

// Graphql subscribes to alpha to get schema updates. We need to close that before we
// close alpha. This closer is for closing and waiting that subscription.
adminCloser := y.NewCloser(1)
adminCloser := z.NewCloser(1)

setupServer(adminCloser)
glog.Infoln("GRPC and HTTP stopped.")
Expand All @@ -835,7 +835,7 @@ func run() {
}

// listenForCorsUpdate listen for any cors change and update the accepeted cors.
func listenForCorsUpdate(closer *y.Closer) {
func listenForCorsUpdate(closer *z.Closer) {
prefix := x.DataKey("dgraph.cors", 0)
// Remove uid from the key, to get the correct prefix
prefix = prefix[:len(prefix)-8]
Expand Down
9 changes: 5 additions & 4 deletions dgraph/cmd/bulk/reduce.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import (
"github.com/dgraph-io/dgraph/protos/pb"
"github.com/dgraph-io/dgraph/worker"
"github.com/dgraph-io/dgraph/x"
"github.com/dgraph-io/ristretto/z"
)

type reducer struct {
Expand Down Expand Up @@ -333,7 +334,7 @@ func (r *reducer) streamIdFor(pred string) uint32 {
return streamId
}

func (r *reducer) encode(entryCh chan *encodeRequest, closer *y.Closer) {
func (r *reducer) encode(entryCh chan *encodeRequest, closer *z.Closer) {
defer closer.Done()
for req := range entryCh {

Expand Down Expand Up @@ -383,7 +384,7 @@ func (r *reducer) writeTmpSplits(ci *countIndexer, kvsCh chan *bpb.KVList, wg *s
x.Check(ci.splitWriter.Flush())
}

func (r *reducer) startWriting(ci *countIndexer, writerCh chan *encodeRequest, closer *y.Closer) {
func (r *reducer) startWriting(ci *countIndexer, writerCh chan *encodeRequest, closer *z.Closer) {
defer closer.Done()

// Concurrently write split lists to a temporary badger.
Expand Down Expand Up @@ -456,14 +457,14 @@ func (r *reducer) reduce(partitionKeys [][]byte, mapItrs []*mapIterator, ci *cou
fmt.Printf("Num CPUs: %d\n", cpu)
encoderCh := make(chan *encodeRequest, 2*cpu)
writerCh := make(chan *encodeRequest, 2*cpu)
encoderCloser := y.NewCloser(cpu)
encoderCloser := z.NewCloser(cpu)
for i := 0; i < cpu; i++ {
// Start listening to encode entries
// For time being let's lease 100 stream id for each encoder.
go r.encode(encoderCh, encoderCloser)
}
// Start listening to write the badger list.
writerCloser := y.NewCloser(1)
writerCloser := z.NewCloser(1)
go r.startWriting(ci, writerCh, writerCloser)

for i := 0; i < len(partitionKeys); i++ {
Expand Down
4 changes: 2 additions & 2 deletions dgraph/cmd/zero/license.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ package zero
import (
"net/http"

"github.com/dgraph-io/badger/v2/y"
"github.com/dgraph-io/dgraph/protos/pb"
"github.com/dgraph-io/ristretto/z"
)

// dummy function as enterprise features are not available in oss binary.
Expand All @@ -31,7 +31,7 @@ func (n *node) proposeTrialLicense() error {
}

// periodically checks the validity of the enterprise license and updates the membership state.
func (n *node) updateEnterpriseState(closer *y.Closer) {
func (n *node) updateEnterpriseState(closer *z.Closer) {
closer.Done()
}

Expand Down
4 changes: 2 additions & 2 deletions dgraph/cmd/zero/license_ee.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ import (
"net/http"
"time"

"github.com/dgraph-io/badger/v2/y"
"github.com/dgraph-io/dgraph/protos/pb"
"github.com/dgraph-io/dgraph/x"
"github.com/dgraph-io/ristretto/z"
humanize "github.com/dustin/go-humanize"
"github.com/gogo/protobuf/proto"
"github.com/golang/glog"
Expand Down Expand Up @@ -61,7 +61,7 @@ func (s *Server) expireLicense() {
// periodically checks the validity of the enterprise license and
// 1. Sets license.Enabled to false in membership state if license has expired.
// 2. Prints out warning once every day a week before the license is set to expire.
func (n *node) updateEnterpriseState(closer *y.Closer) {
func (n *node) updateEnterpriseState(closer *z.Closer) {
defer closer.Done()

interval := 5 * time.Second
Expand Down
12 changes: 6 additions & 6 deletions dgraph/cmd/zero/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,10 @@ import (

otrace "go.opencensus.io/trace"

"github.com/dgraph-io/badger/v2/y"
"github.com/dgraph-io/dgraph/conn"
"github.com/dgraph-io/dgraph/protos/pb"
"github.com/dgraph-io/dgraph/x"
"github.com/dgraph-io/ristretto/z"
farm "github.com/dgryski/go-farm"
"github.com/golang/glog"
"github.com/google/uuid"
Expand All @@ -44,7 +44,7 @@ type node struct {
*conn.Node
server *Server
ctx context.Context
closer *y.Closer // to stop Run.
closer *z.Closer // to stop Run.

// The last timestamp when this Zero was able to reach quorum.
mu sync.RWMutex
Expand Down Expand Up @@ -582,7 +582,7 @@ func (n *node) initAndStartNode() error {
return nil
}

func (n *node) updateZeroMembershipPeriodically(closer *y.Closer) {
func (n *node) updateZeroMembershipPeriodically(closer *z.Closer) {
defer closer.Done()
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()
Expand All @@ -599,7 +599,7 @@ func (n *node) updateZeroMembershipPeriodically(closer *y.Closer) {

var startOption = otrace.WithSampler(otrace.ProbabilitySampler(0.01))

func (n *node) checkQuorum(closer *y.Closer) {
func (n *node) checkQuorum(closer *z.Closer) {
defer closer.Done()
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
Expand Down Expand Up @@ -637,7 +637,7 @@ func (n *node) checkQuorum(closer *y.Closer) {
}
}

func (n *node) snapshotPeriodically(closer *y.Closer) {
func (n *node) snapshotPeriodically(closer *z.Closer) {
defer closer.Done()
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()
Expand Down Expand Up @@ -679,7 +679,7 @@ func (n *node) Run() {
// snapshot can cause select loop to block while deleting entries, so run
// it in goroutine
readStateCh := make(chan raft.ReadState, 100)
closer := y.NewCloser(5)
closer := z.NewCloser(5)
defer func() {
closer.SignalAndWait()
n.closer.Done()
Expand Down
6 changes: 3 additions & 3 deletions dgraph/cmd/zero/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,12 @@ import (

"github.com/dgraph-io/badger/v2"
bopt "github.com/dgraph-io/badger/v2/options"
"github.com/dgraph-io/badger/v2/y"
"github.com/dgraph-io/dgraph/conn"
"github.com/dgraph-io/dgraph/ee/enc"
"github.com/dgraph-io/dgraph/protos/pb"
"github.com/dgraph-io/dgraph/raftwal"
"github.com/dgraph-io/dgraph/x"
"github.com/dgraph-io/ristretto/z"
"github.com/golang/glog"
"github.com/spf13/cobra"
)
Expand Down Expand Up @@ -151,7 +151,7 @@ func (st *state) serveGRPC(l net.Listener, store *raftwal.DiskStorage) {
m.Cfg.DisableProposalForwarding = true
st.rs = conn.NewRaftServer(m)

st.node = &node{Node: m, ctx: context.Background(), closer: y.NewCloser(1)}
st.node = &node{Node: m, ctx: context.Background(), closer: z.NewCloser(1)}
st.zero = &Server{NumReplicas: opts.numReplicas, Node: st.node}
st.zero.Init()
st.node.server = st.zero
Expand Down Expand Up @@ -302,7 +302,7 @@ func run() {
x.Checkf(err, "Error while opening WAL store")
defer kv.Close()

gcCloser := y.NewCloser(1) // closer for vLogGC
gcCloser := z.NewCloser(1) // closer for vLogGC
go x.RunVlogGC(kv, gcCloser)
defer gcCloser.SignalAndWait()

Expand Down
6 changes: 3 additions & 3 deletions dgraph/cmd/zero/zero.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,12 @@ import (

otrace "go.opencensus.io/trace"

"github.com/dgraph-io/badger/v2/y"
"github.com/dgraph-io/dgo/v200/protos/api"
"github.com/dgraph-io/dgraph/conn"
"github.com/dgraph-io/dgraph/protos/pb"
"github.com/dgraph-io/dgraph/telemetry"
"github.com/dgraph-io/dgraph/x"
"github.com/dgraph-io/ristretto/z"
"github.com/gogo/protobuf/proto"
"github.com/golang/glog"
"github.com/pkg/errors"
Expand Down Expand Up @@ -66,7 +66,7 @@ type Server struct {
// groupMap map[uint32]*Group
nextGroup uint32
leaderChangeCh chan struct{}
closer *y.Closer // Used to tell stream to close.
closer *z.Closer // Used to tell stream to close.
connectLock sync.Mutex // Used to serialize connect requests from servers.

moveOngoing chan struct{}
Expand All @@ -89,7 +89,7 @@ func (s *Server) Init() {
s.nextTxnTs = 1
s.nextGroup = 1
s.leaderChangeCh = make(chan struct{}, 1)
s.closer = y.NewCloser(2) // grpc and http
s.closer = z.NewCloser(2) // grpc and http
s.blockCommitsOn = new(sync.Map)
s.moveOngoing = make(chan struct{}, 1)

Expand Down
6 changes: 3 additions & 3 deletions edgraph/access.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@ package edgraph
import (
"context"

"github.com/dgraph-io/badger/v2/y"
"github.com/dgraph-io/dgo/v200/protos/api"
"github.com/dgraph-io/dgraph/gql"
"github.com/dgraph-io/dgraph/query"
"github.com/dgraph-io/dgraph/x"
"github.com/dgraph-io/ristretto/z"
"github.com/golang/glog"
)

Expand All @@ -42,12 +42,12 @@ func (s *Server) Login(ctx context.Context,
}

// ResetAcl is an empty method since ACL is only supported in the enterprise version.
func ResetAcl(closer *y.Closer) {
func ResetAcl(closer *z.Closer) {
// do nothing
}

// ResetAcls is an empty method since ACL is only supported in the enterprise version.
func RefreshAcls(closer *y.Closer) {
func RefreshAcls(closer *z.Closer) {
// do nothing
<-closer.HasBeenClosed()
closer.Done()
Expand Down
7 changes: 3 additions & 4 deletions edgraph/access_ee.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,12 @@ import (
"time"

"github.com/dgraph-io/dgraph/protos/pb"
"github.com/dgraph-io/ristretto/z"

"github.com/dgraph-io/dgraph/query"

"github.com/pkg/errors"

"github.com/dgraph-io/badger/v2/y"

"github.com/dgraph-io/dgo/v200/protos/api"
"github.com/dgraph-io/dgraph/ee/acl"
"github.com/dgraph-io/dgraph/gql"
Expand Down Expand Up @@ -305,7 +304,7 @@ func authorizeUser(ctx context.Context, userid string, password string) (
}

// RefreshAcls queries for the ACL triples and refreshes the ACLs accordingly.
func RefreshAcls(closer *y.Closer) {
func RefreshAcls(closer *z.Closer) {
defer func() {
glog.Infoln("RefreshAcls closed")
closer.Done()
Expand Down Expand Up @@ -368,7 +367,7 @@ const queryAcls = `
`

// ResetAcl clears the aclCachePtr and upserts the Groot account.
func ResetAcl(closer *y.Closer) {
func ResetAcl(closer *z.Closer) {
defer func() {
glog.Infof("ResetAcl closed")
closer.Done()
Expand Down
Loading