Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
johanbrandhorst committed Oct 25, 2024
1 parent ffd0d2a commit da3b915
Show file tree
Hide file tree
Showing 13 changed files with 1,154 additions and 537 deletions.
452 changes: 300 additions & 152 deletions internal/daemon/cluster/handlers/worker_service.go

Large diffs are not rendered by default.

29 changes: 1 addition & 28 deletions internal/daemon/cluster/handlers/worker_service_status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,6 @@ func TestStatus(t *testing.T) {
},
},
WorkerId: worker1.PublicId,
AuthorizedWorkers: &pbs.AuthorizedWorkerList{},
AuthorizedDownstreamWorkers: &pbs.AuthorizedDownstreamWorkerList{},
},
},
Expand Down Expand Up @@ -228,7 +227,6 @@ func TestStatus(t *testing.T) {
},
},
WorkerId: worker1.PublicId,
AuthorizedWorkers: &pbs.AuthorizedWorkerList{},
AuthorizedDownstreamWorkers: &pbs.AuthorizedDownstreamWorkerList{},
JobsRequests: []*pbs.JobChangeRequest{
{
Expand Down Expand Up @@ -336,7 +334,6 @@ func TestStatus(t *testing.T) {
},
},
WorkerId: worker1.PublicId,
AuthorizedWorkers: &pbs.AuthorizedWorkerList{},
AuthorizedDownstreamWorkers: &pbs.AuthorizedDownstreamWorkerList{},
JobsRequests: []*pbs.JobChangeRequest{
{
Expand Down Expand Up @@ -414,7 +411,6 @@ func TestStatus(t *testing.T) {
},
},
WorkerId: worker1.PublicId,
AuthorizedWorkers: &pbs.AuthorizedWorkerList{},
AuthorizedDownstreamWorkers: &pbs.AuthorizedDownstreamWorkerList{},
},
},
Expand Down Expand Up @@ -628,7 +624,6 @@ func TestStatusSessionClosed(t *testing.T) {
},
},
WorkerId: worker1.PublicId,
AuthorizedWorkers: &pbs.AuthorizedWorkerList{},
AuthorizedDownstreamWorkers: &pbs.AuthorizedDownstreamWorkerList{},
},
},
Expand Down Expand Up @@ -797,7 +792,6 @@ func TestStatusDeadConnection(t *testing.T) {
},
},
WorkerId: worker1.PublicId,
AuthorizedWorkers: &pbs.AuthorizedWorkerList{},
AuthorizedDownstreamWorkers: &pbs.AuthorizedDownstreamWorkerList{},
}

Expand Down Expand Up @@ -952,7 +946,6 @@ func TestStatusWorkerWithKeyId(t *testing.T) {
},
},
WorkerId: worker1.PublicId,
AuthorizedWorkers: &pbs.AuthorizedWorkerList{},
AuthorizedDownstreamWorkers: &pbs.AuthorizedDownstreamWorkerList{},
},
},
Expand Down Expand Up @@ -992,7 +985,6 @@ func TestStatusWorkerWithKeyId(t *testing.T) {
},
},
WorkerId: worker1.PublicId,
AuthorizedWorkers: &pbs.AuthorizedWorkerList{},
AuthorizedDownstreamWorkers: &pbs.AuthorizedDownstreamWorkerList{},
},
},
Expand Down Expand Up @@ -1094,7 +1086,6 @@ func TestStatusAuthorizedWorkers(t *testing.T) {
Name: worker1.GetName(),
Address: worker1.GetAddress(),
},
ConnectedWorkerKeyIdentifiers: []string{},
},
want: &pbs.StatusResponse{
CalculatedUpstreams: []*pbs.UpstreamServer{
Expand All @@ -1104,7 +1095,6 @@ func TestStatusAuthorizedWorkers(t *testing.T) {
},
},
WorkerId: worker1.PublicId,
AuthorizedWorkers: &pbs.AuthorizedWorkerList{},
AuthorizedDownstreamWorkers: &pbs.AuthorizedDownstreamWorkerList{},
},
},
Expand All @@ -1126,7 +1116,6 @@ func TestStatusAuthorizedWorkers(t *testing.T) {
},
},
WorkerId: worker1.PublicId,
AuthorizedWorkers: &pbs.AuthorizedWorkerList{},
AuthorizedDownstreamWorkers: &pbs.AuthorizedDownstreamWorkerList{},
},
},
Expand All @@ -1139,7 +1128,6 @@ func TestStatusAuthorizedWorkers(t *testing.T) {
Name: worker1.GetName(),
Address: worker1.GetAddress(),
},
ConnectedWorkerKeyIdentifiers: []string{w1KeyId, w2KeyId, "unknown"},
},
want: &pbs.StatusResponse{
CalculatedUpstreams: []*pbs.UpstreamServer{
Expand All @@ -1148,10 +1136,7 @@ func TestStatusAuthorizedWorkers(t *testing.T) {
Address: "127.0.0.1",
},
},
WorkerId: worker1.PublicId,
AuthorizedWorkers: &pbs.AuthorizedWorkerList{
WorkerKeyIdentifiers: []string{w1KeyId, w2KeyId},
},
WorkerId: worker1.PublicId,
AuthorizedDownstreamWorkers: &pbs.AuthorizedDownstreamWorkerList{},
},
},
Expand All @@ -1164,7 +1149,6 @@ func TestStatusAuthorizedWorkers(t *testing.T) {
Name: worker1.GetName(),
Address: worker1.GetAddress(),
},
ConnectedWorkerKeyIdentifiers: []string{w1KeyId, w2KeyId, "unknown"},
ConnectedUnmappedWorkerKeyIdentifiers: []string{w2KeyId, "unknown"},
ConnectedWorkerPublicIds: []string{w1.GetPublicId(), "unknown"},
},
Expand All @@ -1176,9 +1160,6 @@ func TestStatusAuthorizedWorkers(t *testing.T) {
},
},
WorkerId: worker1.PublicId,
AuthorizedWorkers: &pbs.AuthorizedWorkerList{
WorkerKeyIdentifiers: []string{w1KeyId, w2KeyId},
},
AuthorizedDownstreamWorkers: &pbs.AuthorizedDownstreamWorkerList{
UnmappedWorkerKeyIdentifiers: []string{w2KeyId},
WorkerPublicIds: []string{w1.GetPublicId()},
Expand All @@ -1194,7 +1175,6 @@ func TestStatusAuthorizedWorkers(t *testing.T) {
Name: worker1.GetName(),
Address: worker1.GetAddress(),
},
ConnectedWorkerKeyIdentifiers: []string{w1KeyId, w2KeyId, "unknown"},
ConnectedUnmappedWorkerKeyIdentifiers: []string{"unknown"},
ConnectedWorkerPublicIds: []string{w1.GetPublicId(), w2.GetPublicId(), "unknown"},
},
Expand All @@ -1206,9 +1186,6 @@ func TestStatusAuthorizedWorkers(t *testing.T) {
},
},
WorkerId: worker1.PublicId,
AuthorizedWorkers: &pbs.AuthorizedWorkerList{
WorkerKeyIdentifiers: []string{w1KeyId, w2KeyId},
},
AuthorizedDownstreamWorkers: &pbs.AuthorizedDownstreamWorkerList{
WorkerPublicIds: []string{w1.GetPublicId(), w2.GetPublicId()},
},
Expand All @@ -1227,10 +1204,6 @@ func TestStatusAuthorizedWorkers(t *testing.T) {
assert.Equal(tc.wantErrMsg, err.Error())
return
}
gotAuthorizedWorkers := got.GetAuthorizedWorkers()
sort.Strings(gotAuthorizedWorkers.GetWorkerKeyIdentifiers())
wantAuthorizedWorkers := tc.want.GetAuthorizedWorkers()
sort.Strings(wantAuthorizedWorkers.GetWorkerKeyIdentifiers())
sort.Strings(got.GetAuthorizedDownstreamWorkers().GetWorkerPublicIds())
sort.Strings(tc.want.GetAuthorizedDownstreamWorkers().GetWorkerPublicIds())
sort.Strings(got.GetAuthorizedDownstreamWorkers().GetUnmappedWorkerKeyIdentifiers())
Expand Down
10 changes: 3 additions & 7 deletions internal/daemon/worker/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,6 @@ func (w *Worker) sendWorkerStatus(cancelCtx context.Context, sessionManager sess

// Range over known sessions and collect info
sessionManager.ForEachLocalSession(func(s session.Session) bool {
var jobInfo pbs.SessionJobInfo
status := s.GetStatus()
sessionId := s.GetId()
localConnections := s.GetLocalConnections()
Expand All @@ -165,7 +164,6 @@ func (w *Worker) sendWorkerStatus(cancelCtx context.Context, sessionManager sess
BytesDown: v.BytesDown(),
})
}
jobInfo.SessionId = sessionId
activeJobs = append(activeJobs, &pbs.JobStatus{
Job: &pbs.Job{
Type: pbs.JOBTYPE_JOBTYPE_SESSION,
Expand All @@ -181,9 +179,6 @@ func (w *Worker) sendWorkerStatus(cancelCtx context.Context, sessionManager sess
return true
})

clientCon := w.GrpcClientConn.Load()
// Send status information
client := pbs.NewServerCoordinationServiceClient(clientCon)
var tags []*pb.TagPair
// If we're not going to request a tag update, no reason to have these
// marshaled on every status call.
Expand Down Expand Up @@ -212,6 +207,9 @@ func (w *Worker) sendWorkerStatus(cancelCtx context.Context, sessionManager sess
}
versionInfo := version.Get()
connectionState := w.downstreamConnManager.Connected()
// Send status information
clientCon := w.GrpcClientConn.Load()
client := pbs.NewServerCoordinationServiceClient(clientCon)
result, err := client.Status(statusCtx, &pbs.StatusRequest{
Jobs: activeJobs,
WorkerStatus: &pb.ServerWorkerStatus{
Expand Down Expand Up @@ -301,8 +299,6 @@ func (w *Worker) sendWorkerStatus(cancelCtx context.Context, sessionManager sess
if authorized := result.GetAuthorizedDownstreamWorkers(); authorized != nil {
connectionState.DisconnectMissingWorkers(authorized.GetWorkerPublicIds())
connectionState.DisconnectMissingUnmappedKeyIds(authorized.GetUnmappedWorkerKeyIdentifiers())
} else if authorized := result.GetAuthorizedWorkers(); authorized != nil {
connectionState.DisconnectAllMissingKeyIds(authorized.GetWorkerKeyIdentifiers())
}
var addrs []string
// This may be empty if we are in a multiple hop scenario
Expand Down
58 changes: 32 additions & 26 deletions internal/gen/controller/servers/servers.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit da3b915

Please sign in to comment.