Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(kamel log): Use integration name for looking up containers #348

Merged
merged 1 commit into from
Jan 21, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 15 additions & 13 deletions pkg/util/log/annotation_scraper.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,19 +34,21 @@ import (

// SelectorScraper scrapes all pods with a given selector
type SelectorScraper struct {
client kubernetes.Interface
namespace string
labelSelector string
podScrapers sync.Map
counter uint64
client kubernetes.Interface
namespace string
defaultContainerName string
labelSelector string
podScrapers sync.Map
counter uint64
}

// NewSelectorScraper creates a new SelectorScraper
func NewSelectorScraper(client kubernetes.Interface, namespace string, labelSelector string) *SelectorScraper {
func NewSelectorScraper(client kubernetes.Interface, namespace string, defaultContainerName string, labelSelector string) *SelectorScraper {
return &SelectorScraper{
client: client,
namespace: namespace,
labelSelector: labelSelector,
client: client,
namespace: namespace,
defaultContainerName: defaultContainerName,
labelSelector: labelSelector,
}
}

Expand Down Expand Up @@ -122,17 +124,17 @@ func (s *SelectorScraper) synchronize(ctx context.Context, out *bufio.Writer) er
return nil
}

func (s *SelectorScraper) addPodScraper(ctx context.Context, name string, out *bufio.Writer) {
podScraper := NewPodScraper(s.client, s.namespace, name)
func (s *SelectorScraper) addPodScraper(ctx context.Context, podName string, out *bufio.Writer) {
podScraper := NewPodScraper(s.client, s.namespace, podName, s.defaultContainerName)
podCtx, podCancel := context.WithCancel(ctx)
id := atomic.AddUint64(&s.counter, 1)
prefix := "[" + strconv.FormatUint(id, 10) + "] "
podReader := podScraper.Start(podCtx)
s.podScrapers.Store(name, podCancel)
s.podScrapers.Store(podName, podCancel)
go func() {
defer podCancel()

if _, err := out.WriteString(prefix + "Monitoring pod " + name); err != nil {
if _, err := out.WriteString(prefix + "Monitoring pod " + podName); err != nil {
logrus.Error("Cannot write to output: ", err)
return
}
Expand Down
43 changes: 25 additions & 18 deletions pkg/util/log/pod_scraper.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,17 +41,19 @@ var commonUserContainerNames = map[string]bool{

// PodScraper scrapes logs of a specific pod
type PodScraper struct {
namespace string
name string
client kubernetes.Interface
namespace string
podName string
defaultContainerName string
client kubernetes.Interface
}

// NewPodScraper creates a new pod scraper
func NewPodScraper(c kubernetes.Interface, namespace string, name string) *PodScraper {
func NewPodScraper(c kubernetes.Interface, namespace string, podName string, defaultContainerName string) *PodScraper {
return &PodScraper{
namespace: namespace,
name: name,
client: c,
namespace: namespace,
podName: podName,
defaultContainerName: defaultContainerName,
client: c,
}
}

Expand All @@ -69,7 +71,7 @@ func (s *PodScraper) Start(ctx context.Context) *bufio.Reader {
}

func (s *PodScraper) doScrape(ctx context.Context, out *bufio.Writer, clientCloser func() error) {
containerName, err := s.waitForPodRunning(ctx, s.namespace, s.name)
containerName, err := s.waitForPodRunning(ctx, s.namespace, s.podName, s.defaultContainerName)
if err != nil {
s.handleAndRestart(ctx, err, 5*time.Second, out, clientCloser)
return
Expand All @@ -78,7 +80,7 @@ func (s *PodScraper) doScrape(ctx context.Context, out *bufio.Writer, clientClos
Follow: true,
Container: containerName,
}
byteReader, err := s.client.CoreV1().Pods(s.namespace).GetLogs(s.name, &logOptions).Context(ctx).Stream()
byteReader, err := s.client.CoreV1().Pods(s.namespace).GetLogs(s.podName, &logOptions).Context(ctx).Stream()
if err != nil {
s.handleAndRestart(ctx, err, 5*time.Second, out, clientCloser)
return
Expand Down Expand Up @@ -106,18 +108,18 @@ func (s *PodScraper) doScrape(ctx context.Context, out *bufio.Writer, clientClos

func (s *PodScraper) handleAndRestart(ctx context.Context, err error, wait time.Duration, out *bufio.Writer, clientCloser func() error) {
if err != nil {
logrus.Warn(errors.Wrap(err, "error caught during log scraping for pod "+s.name))
logrus.Warn(errors.Wrap(err, "error caught during log scraping for pod "+s.podName))
}

if ctx.Err() != nil {
logrus.Debug("Pod ", s.name, " will no longer be monitored")
logrus.Debug("Pod ", s.podName, " will no longer be monitored")
if err := clientCloser(); err != nil {
logrus.Warn("Unable to close the client", err)
}
return
}

logrus.Debug("Retrying to scrape pod ", s.name, " logs in ", wait.Seconds(), " seconds...")
logrus.Debug("Retrying to scrape pod ", s.podName, " logs in ", wait.Seconds(), " seconds...")
select {
case <-time.After(wait):
break
Expand All @@ -133,14 +135,14 @@ func (s *PodScraper) handleAndRestart(ctx context.Context, err error, wait time.

// waitForPodRunning waits for a given pod to reach the running state.
// It may return the internal container to watch if present
func (s *PodScraper) waitForPodRunning(ctx context.Context, namespace string, name string) (string, error) {
func (s *PodScraper) waitForPodRunning(ctx context.Context, namespace string, podName string, defaultContainerName string) (string, error) {
pod := v1.Pod{
TypeMeta: metav1.TypeMeta{
Kind: "Pod",
APIVersion: v1.SchemeGroupVersion.String(),
},
ObjectMeta: metav1.ObjectMeta{
Name: name,
Name: podName,
Namespace: namespace,
},
}
Expand Down Expand Up @@ -181,29 +183,34 @@ func (s *PodScraper) waitForPodRunning(ctx context.Context, namespace string, na
}

if recvPod != nil && recvPod.Status.Phase == v1.PodRunning {
return s.chooseContainer(recvPod), nil
return s.chooseContainer(recvPod, defaultContainerName), nil
}
} else if e.Type == watch.Deleted || e.Type == watch.Error {
return "", errors.New("unable to watch pod " + s.name)
return "", errors.New("unable to watch pod " + s.podName)
}
case <-time.After(30 * time.Second):
return "", errors.New("no state change after 30 seconds for pod " + s.name)
return "", errors.New("no state change after 30 seconds for pod " + s.podName)
}
}

}

func (s *PodScraper) chooseContainer(p *v1.Pod) string {
func (s *PodScraper) chooseContainer(p *v1.Pod, defaultContainerName string) string {
if p != nil {
if len(p.Spec.Containers) == 1 {
// Let Kubernetes auto-detect
return ""
}
// Fallback to first container name
containerNameFound := p.Spec.Containers[0].Name
for _, c := range p.Spec.Containers {
if _, ok := commonUserContainerNames[c.Name]; ok {
return c.Name
} else if c.Name == defaultContainerName {
containerNameFound = defaultContainerName
}
}
return containerNameFound
}
return ""
}
2 changes: 1 addition & 1 deletion pkg/util/log/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (

// Print prints integrations logs to the stdout
func Print(ctx context.Context, client kubernetes.Interface, integration *v1alpha1.Integration) error {
scraper := NewSelectorScraper(client, integration.Namespace, "camel.apache.org/integration="+integration.Name)
scraper := NewSelectorScraper(client, integration.Namespace, integration.Name,"camel.apache.org/integration="+integration.Name)
reader := scraper.Start(ctx)

if _, err := io.Copy(os.Stdout, ioutil.NopCloser(reader)); err != nil {
Expand Down
4 changes: 2 additions & 2 deletions test/log_scrape_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func TestPodLogScrape(t *testing.T) {

ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(30*time.Second))
defer cancel()
scraper := log.NewPodScraper(testClient, pod.Namespace, pod.Name)
scraper := log.NewPodScraper(testClient, pod.Namespace, pod.Name, "scraped")
in := scraper.Start(ctx)

res := make(chan bool)
Expand Down Expand Up @@ -74,7 +74,7 @@ func TestSelectorLogScrape(t *testing.T) {

ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(30*time.Second))
defer cancel()
scraper := log.NewSelectorScraper(testClient, deployment.Namespace, "scrape=me")
scraper := log.NewSelectorScraper(testClient, deployment.Namespace, "main", "scrape=me")
in := scraper.Start(ctx)

res := make(chan string)
Expand Down