From 52fbe11956ac7af518a9c34496f47f289d29b96b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Roland=20Hu=C3=9F?= Date: Mon, 21 Jan 2019 10:50:16 +0100 Subject: [PATCH] fix(kamel log): Use integration name for looking up containers Use integration name for looking up containers as a fallback if no container could be found. If no container could be identified even with the integration name, use the first container for the log, assuming its the "main" container. Fixes #347 --- pkg/util/log/annotation_scraper.go | 28 ++++++++++--------- pkg/util/log/pod_scraper.go | 43 +++++++++++++++++------------ pkg/util/log/util.go | 2 +- test/log_scrape_integration_test.go | 4 +-- 4 files changed, 43 insertions(+), 34 deletions(-) diff --git a/pkg/util/log/annotation_scraper.go b/pkg/util/log/annotation_scraper.go index 31aaba7f01..697c20e210 100644 --- a/pkg/util/log/annotation_scraper.go +++ b/pkg/util/log/annotation_scraper.go @@ -34,19 +34,21 @@ import ( // SelectorScraper scrapes all pods with a given selector type SelectorScraper struct { - client kubernetes.Interface - namespace string - labelSelector string - podScrapers sync.Map - counter uint64 + client kubernetes.Interface + namespace string + defaultContainerName string + labelSelector string + podScrapers sync.Map + counter uint64 } // NewSelectorScraper creates a new SelectorScraper -func NewSelectorScraper(client kubernetes.Interface, namespace string, labelSelector string) *SelectorScraper { +func NewSelectorScraper(client kubernetes.Interface, namespace string, defaultContainerName string, labelSelector string) *SelectorScraper { return &SelectorScraper{ - client: client, - namespace: namespace, - labelSelector: labelSelector, + client: client, + namespace: namespace, + defaultContainerName: defaultContainerName, + labelSelector: labelSelector, } } @@ -122,17 +124,17 @@ func (s *SelectorScraper) synchronize(ctx context.Context, out *bufio.Writer) er return nil } -func (s *SelectorScraper) addPodScraper(ctx context.Context, name string, out *bufio.Writer) { - podScraper := NewPodScraper(s.client, s.namespace, name) +func (s *SelectorScraper) addPodScraper(ctx context.Context, podName string, out *bufio.Writer) { + podScraper := NewPodScraper(s.client, s.namespace, podName, s.defaultContainerName) podCtx, podCancel := context.WithCancel(ctx) id := atomic.AddUint64(&s.counter, 1) prefix := "[" + strconv.FormatUint(id, 10) + "] " podReader := podScraper.Start(podCtx) - s.podScrapers.Store(name, podCancel) + s.podScrapers.Store(podName, podCancel) go func() { defer podCancel() - if _, err := out.WriteString(prefix + "Monitoring pod " + name); err != nil { + if _, err := out.WriteString(prefix + "Monitoring pod " + podName); err != nil { logrus.Error("Cannot write to output: ", err) return } diff --git a/pkg/util/log/pod_scraper.go b/pkg/util/log/pod_scraper.go index a0c3c16cd9..93ebc1845e 100644 --- a/pkg/util/log/pod_scraper.go +++ b/pkg/util/log/pod_scraper.go @@ -41,17 +41,19 @@ var commonUserContainerNames = map[string]bool{ // PodScraper scrapes logs of a specific pod type PodScraper struct { - namespace string - name string - client kubernetes.Interface + namespace string + podName string + defaultContainerName string + client kubernetes.Interface } // NewPodScraper creates a new pod scraper -func NewPodScraper(c kubernetes.Interface, namespace string, name string) *PodScraper { +func NewPodScraper(c kubernetes.Interface, namespace string, podName string, defaultContainerName string) *PodScraper { return &PodScraper{ - namespace: namespace, - name: name, - client: c, + namespace: namespace, + podName: podName, + defaultContainerName: defaultContainerName, + client: c, } } @@ -69,7 +71,7 @@ func (s *PodScraper) Start(ctx context.Context) *bufio.Reader { } func (s *PodScraper) doScrape(ctx context.Context, out *bufio.Writer, clientCloser func() error) { - containerName, err := s.waitForPodRunning(ctx, s.namespace, s.name) + containerName, err := s.waitForPodRunning(ctx, s.namespace, s.podName, s.defaultContainerName) if err != nil { s.handleAndRestart(ctx, err, 5*time.Second, out, clientCloser) return @@ -78,7 +80,7 @@ func (s *PodScraper) doScrape(ctx context.Context, out *bufio.Writer, clientClos Follow: true, Container: containerName, } - byteReader, err := s.client.CoreV1().Pods(s.namespace).GetLogs(s.name, &logOptions).Context(ctx).Stream() + byteReader, err := s.client.CoreV1().Pods(s.namespace).GetLogs(s.podName, &logOptions).Context(ctx).Stream() if err != nil { s.handleAndRestart(ctx, err, 5*time.Second, out, clientCloser) return @@ -106,18 +108,18 @@ func (s *PodScraper) doScrape(ctx context.Context, out *bufio.Writer, clientClos func (s *PodScraper) handleAndRestart(ctx context.Context, err error, wait time.Duration, out *bufio.Writer, clientCloser func() error) { if err != nil { - logrus.Warn(errors.Wrap(err, "error caught during log scraping for pod "+s.name)) + logrus.Warn(errors.Wrap(err, "error caught during log scraping for pod "+s.podName)) } if ctx.Err() != nil { - logrus.Debug("Pod ", s.name, " will no longer be monitored") + logrus.Debug("Pod ", s.podName, " will no longer be monitored") if err := clientCloser(); err != nil { logrus.Warn("Unable to close the client", err) } return } - logrus.Debug("Retrying to scrape pod ", s.name, " logs in ", wait.Seconds(), " seconds...") + logrus.Debug("Retrying to scrape pod ", s.podName, " logs in ", wait.Seconds(), " seconds...") select { case <-time.After(wait): break @@ -133,14 +135,14 @@ func (s *PodScraper) handleAndRestart(ctx context.Context, err error, wait time. // waitForPodRunning waits for a given pod to reach the running state. // It may return the internal container to watch if present -func (s *PodScraper) waitForPodRunning(ctx context.Context, namespace string, name string) (string, error) { +func (s *PodScraper) waitForPodRunning(ctx context.Context, namespace string, podName string, defaultContainerName string) (string, error) { pod := v1.Pod{ TypeMeta: metav1.TypeMeta{ Kind: "Pod", APIVersion: v1.SchemeGroupVersion.String(), }, ObjectMeta: metav1.ObjectMeta{ - Name: name, + Name: podName, Namespace: namespace, }, } @@ -181,29 +183,34 @@ func (s *PodScraper) waitForPodRunning(ctx context.Context, namespace string, na } if recvPod != nil && recvPod.Status.Phase == v1.PodRunning { - return s.chooseContainer(recvPod), nil + return s.chooseContainer(recvPod, defaultContainerName), nil } } else if e.Type == watch.Deleted || e.Type == watch.Error { - return "", errors.New("unable to watch pod " + s.name) + return "", errors.New("unable to watch pod " + s.podName) } case <-time.After(30 * time.Second): - return "", errors.New("no state change after 30 seconds for pod " + s.name) + return "", errors.New("no state change after 30 seconds for pod " + s.podName) } } } -func (s *PodScraper) chooseContainer(p *v1.Pod) string { +func (s *PodScraper) chooseContainer(p *v1.Pod, defaultContainerName string) string { if p != nil { if len(p.Spec.Containers) == 1 { // Let Kubernetes auto-detect return "" } + // Fallback to first container name + containerNameFound := p.Spec.Containers[0].Name for _, c := range p.Spec.Containers { if _, ok := commonUserContainerNames[c.Name]; ok { return c.Name + } else if c.Name == defaultContainerName { + containerNameFound = defaultContainerName } } + return containerNameFound } return "" } diff --git a/pkg/util/log/util.go b/pkg/util/log/util.go index eb20eb9aee..2dc91fc4bb 100644 --- a/pkg/util/log/util.go +++ b/pkg/util/log/util.go @@ -30,7 +30,7 @@ import ( // Print prints integrations logs to the stdout func Print(ctx context.Context, client kubernetes.Interface, integration *v1alpha1.Integration) error { - scraper := NewSelectorScraper(client, integration.Namespace, "camel.apache.org/integration="+integration.Name) + scraper := NewSelectorScraper(client, integration.Namespace, integration.Name,"camel.apache.org/integration="+integration.Name) reader := scraper.Start(ctx) if _, err := io.Copy(os.Stdout, ioutil.NopCloser(reader)); err != nil { diff --git a/test/log_scrape_integration_test.go b/test/log_scrape_integration_test.go index 9a74b57261..f233e2921a 100644 --- a/test/log_scrape_integration_test.go +++ b/test/log_scrape_integration_test.go @@ -39,7 +39,7 @@ func TestPodLogScrape(t *testing.T) { ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(30*time.Second)) defer cancel() - scraper := log.NewPodScraper(testClient, pod.Namespace, pod.Name) + scraper := log.NewPodScraper(testClient, pod.Namespace, pod.Name, "scraped") in := scraper.Start(ctx) res := make(chan bool) @@ -74,7 +74,7 @@ func TestSelectorLogScrape(t *testing.T) { ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(30*time.Second)) defer cancel() - scraper := log.NewSelectorScraper(testClient, deployment.Namespace, "scrape=me") + scraper := log.NewSelectorScraper(testClient, deployment.Namespace, "main", "scrape=me") in := scraper.Start(ctx) res := make(chan string)