This repository has been archived by the owner on Sep 22, 2020. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 172
/
server.go
123 lines (107 loc) · 2.54 KB
/
server.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
package torus
import (
"io"
"sync"
"golang.org/x/net/context"
"github.com/coreos/torus/models"
"github.com/prometheus/client_golang/prometheus"
)
var (
promOps = prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "torus_server_ops_total",
Help: "Number of times an atomic update failed and needed to be retried",
}, []string{"kind"})
)
func init() {
prometheus.MustRegister(promOps)
}
const (
CtxWriteLevel int = iota
CtxReadLevel
)
// Server is the type representing the generic distributed block store.
type Server struct {
mut sync.RWMutex
infoMut sync.Mutex
Blocks BlockStore
MDS MetadataService
INodes *INodeStore
peersMap map[string]*models.PeerInfo
closeChans []chan interface{}
Cfg Config
peerInfo *models.PeerInfo
ctx context.Context
lease int64
leaseMut sync.RWMutex
heartbeating bool
ReplicationOpen bool
timeoutCallbacks []func(string)
}
func (s *Server) createOrRenewLease(ctx context.Context) error {
s.leaseMut.Lock()
defer s.leaseMut.Unlock()
if s.lease != 0 {
err := s.MDS.WithContext(ctx).RenewLease(s.lease)
if err == nil {
return nil
}
clog.Errorf("Failed to renew, grant new lease for %d: %s", s.lease, err)
}
var err error
s.lease, err = s.MDS.WithContext(ctx).GetLease()
return err
}
func (s *Server) Lease() int64 {
s.leaseMut.RLock()
defer s.leaseMut.RUnlock()
return s.lease
}
func (s *Server) Close() error {
for _, c := range s.closeChans {
close(c)
}
err := s.MDS.Close()
if err != nil {
clog.Errorf("couldn't close mds: %s", err)
return err
}
err = s.INodes.Close()
if err != nil {
clog.Errorf("couldn't close inodes: %s", err)
return err
}
err = s.Blocks.Close()
if err != nil {
clog.Errorf("couldn't close blocks: %s", err)
return err
}
return nil
}
// Debug writes a bunch of debug output to the io.Writer.
func (s *Server) Debug(w io.Writer) error {
if v, ok := s.MDS.(DebugMetadataService); ok {
io.WriteString(w, "# MDS\n")
return v.DumpMetadata(w)
}
return nil
}
func (s *Server) getContext() context.Context {
if s.ctx == nil {
s.ctx = s.ExtendContext(context.TODO())
}
return s.ctx
}
func (s *Server) ExtendContext(ctx context.Context) context.Context {
wl := context.WithValue(ctx, CtxWriteLevel, s.Cfg.WriteLevel)
rl := context.WithValue(wl, CtxReadLevel, s.Cfg.ReadLevel)
return rl
}
func (s *Server) GetPeerMap() map[string]*models.PeerInfo {
s.infoMut.Lock()
defer s.infoMut.Unlock()
out := make(map[string]*models.PeerInfo)
for k, v := range s.peersMap {
out[k] = v
}
return out
}