Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use NodeSystemInfo rather than node labels #152

Merged
merged 2 commits into from
Oct 1, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions cmd/k8s-netperf/k8s-netperf.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ var rootCmd = &cobra.Command{

if pavail {
for i, npr := range sr.Results {
if len(npr.ClientNodeInfo.Hostname) > 0 && len(npr.ServerNodeInfo.Hostname) > 0 {
if len(npr.ClientNodeInfo.NodeName) > 0 && len(npr.ServerNodeInfo.NodeName) > 0 {
sr.Results[i].ClientMetrics, _ = metrics.QueryNodeCPU(npr.ClientNodeInfo, pcon, npr.StartTime, npr.EndTime)
sr.Results[i].ServerMetrics, _ = metrics.QueryNodeCPU(npr.ServerNodeInfo, pcon, npr.StartTime, npr.EndTime)
sr.Results[i].ClientPodCPU, _ = metrics.TopPodCPU(npr.ClientNodeInfo, pcon, npr.StartTime, npr.EndTime)
Expand Down Expand Up @@ -465,8 +465,6 @@ func executeWorkload(nc config.Config,
npr.EndTime = time.Now()
npr.ClientNodeInfo = s.ClientNodeInfo
npr.ServerNodeInfo = s.ServerNodeInfo
npr.ServerNodeLabels, _ = k8s.GetNodeLabels(s.ClientSet, s.ServerNodeInfo.Hostname)
npr.ClientNodeLabels, _ = k8s.GetNodeLabels(s.ClientSet, s.ClientNodeInfo.Hostname)

return npr
}
Expand Down
108 changes: 55 additions & 53 deletions pkg/archive/archive.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,36 +19,36 @@ const ltcyMetric = "usec"

// Doc struct of the JSON document to be indexed
type Doc struct {
UUID string `json:"uuid"`
Timestamp time.Time `json:"timestamp"`
HostNetwork bool `json:"hostNetwork"`
Driver string `json:"driver"`
Parallelism int `json:"parallelism"`
Profile string `json:"profile"`
Duration int `json:"duration"`
Service bool `json:"service"`
Local bool `json:"local"`
Virt bool `json:"virt"`
AcrossAZ bool `json:"acrossAZ"`
Samples int `json:"samples"`
Messagesize int `json:"messageSize"`
Burst int `json:"burst"`
Throughput float64 `json:"throughput"`
Latency float64 `json:"latency"`
TputMetric string `json:"tputMetric"`
LtcyMetric string `json:"ltcyMetric"`
TCPRetransmit float64 `json:"tcpRetransmits"`
UDPLossPercent float64 `json:"udpLossPercent"`
ToolVersion string `json:"toolVersion"`
ToolGitCommit string `json:"toolGitCommit"`
Metadata result.Metadata `json:"metadata"`
ServerNodeCPU metrics.NodeCPU `json:"serverCPU"`
ServerPodCPU []metrics.PodCPU `json:"serverPods"`
ClientNodeCPU metrics.NodeCPU `json:"clientCPU"`
ClientPodCPU []metrics.PodCPU `json:"clientPods"`
ClientNodeLabels map[string]string `json:"clientNodeLabels"`
ServerNodeLabels map[string]string `json:"serverNodeLabels"`
Confidence []float64 `json:"confidence"`
UUID string `json:"uuid"`
Timestamp time.Time `json:"timestamp"`
HostNetwork bool `json:"hostNetwork"`
Driver string `json:"driver"`
Parallelism int `json:"parallelism"`
Profile string `json:"profile"`
Duration int `json:"duration"`
Service bool `json:"service"`
Local bool `json:"local"`
Virt bool `json:"virt"`
AcrossAZ bool `json:"acrossAZ"`
Samples int `json:"samples"`
Messagesize int `json:"messageSize"`
Burst int `json:"burst"`
Throughput float64 `json:"throughput"`
Latency float64 `json:"latency"`
TputMetric string `json:"tputMetric"`
LtcyMetric string `json:"ltcyMetric"`
TCPRetransmit float64 `json:"tcpRetransmits"`
UDPLossPercent float64 `json:"udpLossPercent"`
ToolVersion string `json:"toolVersion"`
ToolGitCommit string `json:"toolGitCommit"`
Metadata result.Metadata `json:"metadata"`
ServerNodeCPU metrics.NodeCPU `json:"serverCPU"`
ServerPodCPU []metrics.PodCPU `json:"serverPods"`
ClientNodeCPU metrics.NodeCPU `json:"clientCPU"`
ClientPodCPU []metrics.PodCPU `json:"clientPods"`
Confidence []float64 `json:"confidence"`
ServerNodeInfo metrics.NodeInfo `json:"serverNodeInfo"`
ClientNodeInfo metrics.NodeInfo `json:"clientNodeInfo"`
}

// Connect returns a client connected to the desired cluster.
Expand Down Expand Up @@ -89,29 +89,31 @@ func BuildDocs(sr result.ScenarioResults, uuid string) ([]interface{}, error) {
}
c := []float64{lo, hi}
d := Doc{
UUID: uuid,
Timestamp: time,
ToolVersion: sr.Version,
ToolGitCommit: sr.GitCommit,
Driver: r.Driver,
HostNetwork: r.HostNetwork,
Parallelism: r.Parallelism,
Profile: r.Profile,
Duration: r.Duration,
Virt: sr.Virt,
Samples: r.Samples,
Service: r.Service,
Messagesize: r.MessageSize,
Burst: r.Burst,
TputMetric: r.Metric,
LtcyMetric: ltcyMetric,
ServerNodeCPU: r.ServerMetrics,
ClientNodeCPU: r.ClientMetrics,
ServerPodCPU: r.ServerPodCPU.Results,
ClientPodCPU: r.ClientPodCPU.Results,
Metadata: sr.Metadata,
AcrossAZ: r.AcrossAZ,
Confidence: c,
UUID: uuid,
Timestamp: time,
ToolVersion: sr.Version,
ToolGitCommit: sr.GitCommit,
Driver: r.Driver,
HostNetwork: r.HostNetwork,
Parallelism: r.Parallelism,
Profile: r.Profile,
Duration: r.Duration,
Virt: sr.Virt,
Samples: r.Samples,
Service: r.Service,
Messagesize: r.MessageSize,
Burst: r.Burst,
TputMetric: r.Metric,
LtcyMetric: ltcyMetric,
ServerNodeCPU: r.ServerMetrics,
ClientNodeCPU: r.ClientMetrics,
ServerPodCPU: r.ServerPodCPU.Results,
ClientPodCPU: r.ClientPodCPU.Results,
Metadata: sr.Metadata,
AcrossAZ: r.AcrossAZ,
Confidence: c,
ClientNodeInfo: r.ClientNodeInfo,
ServerNodeInfo: r.ServerNodeInfo,
}
UDPLossPercent, e := result.Average(r.LossSummary)
if e != nil {
Expand Down
129 changes: 30 additions & 99 deletions pkg/k8s/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
corev1 "k8s.io/api/core/v1"
v1 "k8s.io/api/rbac/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
Expand Down Expand Up @@ -218,7 +219,8 @@ func BuildSUT(client *kubernetes.Clientset, s *config.PerfScenarios) error {
return err
}
}
s.ClientNodeInfo, _ = GetPodNodeInfo(client, cdp)
s.ClientNodeInfo, err = GetPodNodeInfo(client, labels.Set(cdp.Labels).String())
return err
}

// Create iperf service
Expand Down Expand Up @@ -431,9 +433,12 @@ func BuildSUT(client *kubernetes.Clientset, s *config.PerfScenarios) error {
if err != nil {
return err
}
s.ServerNodeInfo, _ = GetPodNodeInfo(client, sdp)
s.ServerNodeInfo, err = GetPodNodeInfo(client, labels.Set(sdp.Labels).String())
if err != nil {
return err
}
if !s.NodeLocal {
s.ClientNodeInfo, _ = GetPodNodeInfo(client, cdpAcross)
s.ClientNodeInfo, err = GetPodNodeInfo(client, labels.Set(cdpAcross.Labels).String())
}
if err != nil {
return err
Expand All @@ -459,17 +464,17 @@ func launchServerVM(perf *config.PerfScenarios, name string, podAff *corev1.PodA
return err
}
if strings.Contains(name, "host") {
perf.ServerHost, err = GetNakedPods(perf.ClientSet, fmt.Sprintf("app=%s", serverRole))
perf.ServerHost, err = GetPods(perf.ClientSet, fmt.Sprintf("app=%s", serverRole))
if err != nil {
return err
}
} else {
perf.Server, err = GetNakedPods(perf.ClientSet, fmt.Sprintf("app=%s", serverRole))
perf.Server, err = GetPods(perf.ClientSet, fmt.Sprintf("app=%s", serverRole))
if err != nil {
return err
}
}
perf.ServerNodeInfo, _ = GetNakedPodNodeInfo(perf.ClientSet, fmt.Sprintf("app=%s", serverRole))
perf.ServerNodeInfo, _ = GetPodNodeInfo(perf.ClientSet, fmt.Sprintf("app=%s", serverRole))
return nil
}

Expand All @@ -485,17 +490,17 @@ func launchClientVM(perf *config.PerfScenarios, name string, podAff *corev1.PodA
return err
}
if strings.Contains(name, "host") {
perf.ClientHost, err = GetNakedPods(perf.ClientSet, fmt.Sprintf("app=%s", name))
perf.ClientHost, err = GetPods(perf.ClientSet, fmt.Sprintf("app=%s", name))
if err != nil {
return err
}
} else {
perf.ClientAcross, err = GetNakedPods(perf.ClientSet, fmt.Sprintf("app=%s", name))
perf.ClientAcross, err = GetPods(perf.ClientSet, fmt.Sprintf("app=%s", name))
if err != nil {
return err
}
}
perf.ClientNodeInfo, _ = GetNakedPodNodeInfo(perf.ClientSet, fmt.Sprintf("app=%s", name))
perf.ClientNodeInfo, _ = GetPodNodeInfo(perf.ClientSet, fmt.Sprintf("app=%s", name))
return nil
}

Expand Down Expand Up @@ -525,7 +530,7 @@ func deployDeployment(client *kubernetes.Clientset, dp DeploymentParams) (corev1
return pods, err
}
// Retrieve pods which match the server/client role labels
pods, err = GetPods(client, dp)
pods, err = GetPods(client, labels.Set(dp.Labels).String())
if err != nil {
return pods, err
}
Expand All @@ -544,7 +549,7 @@ func WaitForReady(c *kubernetes.Clientset, dp DeploymentParams) (bool, error) {
for event := range dw.ResultChan() {
d, ok := event.Object.(*appsv1.Deployment)
if !ok {
fmt.Println("❌ Issue with the Deployment")
log.Error("❌ Issue with the Deployment")
}
if d.Name == dp.Name {
if d.Status.ReadyReplicas == 1 {
Expand Down Expand Up @@ -660,46 +665,8 @@ func CreateDeployment(dp DeploymentParams, client *kubernetes.Clientset) (*appsv
return dc.Create(context.TODO(), deployment, metav1.CreateOptions{})
}

// GetNodeLabels Return Labels for a specific node
func GetNodeLabels(c *kubernetes.Clientset, node string) (map[string]string, error) {
log.Debugf("Looking for Node labels for node - %s", node)
nodeInfo, err := c.CoreV1().Nodes().Get(context.TODO(), node, metav1.GetOptions{})
if err != nil {
return nil, err
}
return nodeInfo.GetLabels(), nil
}

// GetPodNodeInfo collects the node information for a specific pod
func GetPodNodeInfo(c *kubernetes.Clientset, dp DeploymentParams) (metrics.NodeInfo, error) {
var info metrics.NodeInfo
d, err := c.AppsV1().Deployments(dp.Namespace).Get(context.TODO(), dp.Name, metav1.GetOptions{})
if err != nil {
return info, fmt.Errorf("❌ Failure to capture deployment: %v", err)
}
selector, err := metav1.LabelSelectorAsSelector(d.Spec.Selector)
if err != nil {
return info, fmt.Errorf("❌ Failure to capture deployment label: %v", err)
}
pods, err := c.CoreV1().Pods(dp.Namespace).List(context.TODO(), metav1.ListOptions{LabelSelector: selector.String(), FieldSelector: "status.phase=Running"})
if err != nil {
return info, fmt.Errorf("❌ Failure to capture pods: %v", err)
}
for pod := range pods.Items {
p := pods.Items[pod]
if pods.Items[pod].DeletionTimestamp != nil {
continue
} else {
info.IP = p.Status.HostIP
info.Hostname = p.Spec.NodeName
}
}
log.Debugf("%s Running on %s with IP %s", d.Name, info.Hostname, info.IP)
return info, nil
}

// GetNakedPodNodeInfo collects the node information for a specific pod
func GetNakedPodNodeInfo(c *kubernetes.Clientset, label string) (metrics.NodeInfo, error) {
// GetPodNodeInfo collects the node information for a node running a pod with a specific label
func GetPodNodeInfo(c *kubernetes.Clientset, label string) (metrics.NodeInfo, error) {
var info metrics.NodeInfo
listOpt := metav1.ListOptions{
LabelSelector: label,
Expand All @@ -709,65 +676,29 @@ func GetNakedPodNodeInfo(c *kubernetes.Clientset, label string) (metrics.NodeInf
if err != nil {
return info, fmt.Errorf("❌ Failure to capture pods: %v", err)
}
for pod := range pods.Items {
p := pods.Items[pod]
if pods.Items[pod].DeletionTimestamp != nil {
continue
} else {
info.IP = p.Status.HostIP
info.Hostname = p.Spec.NodeName
}
}
log.Debugf("Machine with lablel %s is Running on %s with IP %s", label, info.Hostname, info.IP)
return info, nil
}

// GetPods searches for a specific set of pods from DeploymentParms
// It returns a PodList if the deployment is found.
// NOTE : Since we can update the replicas to be > 1, is why I return a PodList.
func GetPods(c *kubernetes.Clientset, dp DeploymentParams) (corev1.PodList, error) {
d, err := c.AppsV1().Deployments(dp.Namespace).Get(context.TODO(), dp.Name, metav1.GetOptions{})
npl := corev1.PodList{}
if err != nil {
return npl, fmt.Errorf("❌ Failure to capture deployment: %v", err)
}
selector, err := metav1.LabelSelectorAsSelector(d.Spec.Selector)
info.NodeName = pods.Items[0].Spec.NodeName
rsevilla87 marked this conversation as resolved.
Show resolved Hide resolved
info.IP = pods.Items[0].Status.HostIP
node, err := c.CoreV1().Nodes().Get(context.TODO(), info.NodeName, metav1.GetOptions{})
if err != nil {
return npl, fmt.Errorf("❌ Failure to capture deployment label: %v", err)
return info, err
}
pods, err := c.CoreV1().Pods(dp.Namespace).List(context.TODO(), metav1.ListOptions{LabelSelector: selector.String(), FieldSelector: "status.phase=Running"})
if err != nil {
return npl, fmt.Errorf("❌ Failure to capture pods: %v", err)
}
for pod := range pods.Items {
if pods.Items[pod].DeletionTimestamp != nil {
continue
} else {
npl.Items = append(npl.Items, pods.Items[pod])
}
}
return npl, nil
info.NodeSystemInfo = node.Status.NodeInfo
log.Debugf("Machine with label %s is Running on %s with IP %s", label, info.NodeName, info.IP)
return info, nil
}

// GetNakedPods when we deploy pods without a higher-level controller like deployment
func GetNakedPods(c *kubernetes.Clientset, label string) (corev1.PodList, error) {
npl := corev1.PodList{}
// GetPods returns pods with a specific label
func GetPods(c *kubernetes.Clientset, label string) (corev1.PodList, error) {
listOpt := metav1.ListOptions{
LabelSelector: label,
FieldSelector: "status.phase=Running",
}
log.Infof("Looking for pods with label %s", fmt.Sprint(label))
pods, err := c.CoreV1().Pods(namespace).List(context.TODO(), listOpt)
if err != nil {
return npl, fmt.Errorf("❌ Failure to capture pods: %v", err)
}
for pod := range pods.Items {
if pods.Items[pod].DeletionTimestamp != nil {
jtaleric marked this conversation as resolved.
Show resolved Hide resolved
continue
} else {
npl.Items = append(npl.Items, pods.Items[pod])
}
return *pods, fmt.Errorf("❌ Failure to capture pods: %v", err)
}
return npl, nil
return *pods, nil

}

Expand Down
9 changes: 5 additions & 4 deletions pkg/metrics/system.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,15 @@ import (
"github.com/cloud-bulldozer/go-commons/prometheus"
"github.com/cloud-bulldozer/k8s-netperf/pkg/logging"
"github.com/prometheus/common/model"
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/tools/clientcmd"
)

// NodeInfo stores the node metadata like IP and Hostname
type NodeInfo struct {
IP string
Hostname string
NodeName string
IP string `json:"ip"`
NodeName string `json:"nodeName"`
corev1.NodeSystemInfo
}

// NodeCPU stores CPU information for a specific Node
Expand Down Expand Up @@ -127,7 +128,7 @@ func QueryNodeCPU(node NodeInfo, conn PromConnect, start time.Time, end time.Tim
query := fmt.Sprintf("(avg by(mode) (rate(node_cpu_seconds_total{instance=~\"%s:.*\"}[2m])) * 100)", node.IP)
if conn.OpenShift {
// OpenShift changes the instance in its metrics.
query = fmt.Sprintf("(avg by(mode) (rate(node_cpu_seconds_total{instance=~\"%s\"}[2m])) * 100)", node.Hostname)
query = fmt.Sprintf("(avg by(mode) (rate(node_cpu_seconds_total{instance=~\"%s\"}[2m])) * 100)", node.NodeName)
}
logging.Debugf("Prom Query : %s", query)
val, err := conn.Client.QueryRange(query, start, end, time.Minute)
Expand Down
2 changes: 0 additions & 2 deletions pkg/results/result.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,6 @@ type Data struct {
ServerMetrics metrics.NodeCPU
ClientPodCPU metrics.PodValues
ServerPodCPU metrics.PodValues
ClientNodeLabels map[string]string
ServerNodeLabels map[string]string
}

// ScenarioResults each scenario could have multiple results
Expand Down
Loading