Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(log): make dev-mode work in Knative #280

Merged
merged 1 commit into from
Dec 11, 2018
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 39 additions & 15 deletions pkg/util/log/pod_scraper.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@ import (
"k8s.io/apimachinery/pkg/watch"
)

var commonUserContainerNames = map[string]bool{
// Convention used in Knative and Istio
"user-container": true,
}

// PodScraper scrapes logs of a specific pod
type PodScraper struct {
namespace string
Expand Down Expand Up @@ -62,13 +67,16 @@ func (s *PodScraper) Start(ctx context.Context) *bufio.Reader {
}

func (s *PodScraper) doScrape(ctx context.Context, out *bufio.Writer, clientCloser func() error) {
err := s.waitForPodRunning(ctx, s.namespace, s.name)
containerName, err := s.waitForPodRunning(ctx, s.namespace, s.name)
if err != nil {
s.handleAndRestart(ctx, err, 5*time.Second, out, clientCloser)
return
}

byteReader, err := k8sclient.GetKubeClient().CoreV1().Pods(s.namespace).GetLogs(s.name, &v1.PodLogOptions{Follow: true}).Context(ctx).Stream()
logOptions := v1.PodLogOptions{
Follow: true,
Container: containerName,
}
byteReader, err := k8sclient.GetKubeClient().CoreV1().Pods(s.namespace).GetLogs(s.name, &logOptions).Context(ctx).Stream()
if err != nil {
s.handleAndRestart(ctx, err, 5*time.Second, out, clientCloser)
return
Expand Down Expand Up @@ -100,14 +108,14 @@ func (s *PodScraper) handleAndRestart(ctx context.Context, err error, wait time.
}

if ctx.Err() != nil {
logrus.Info("Pod ", s.name, " will no longer be monitored")
logrus.Debug("Pod ", s.name, " will no longer be monitored")
if err := clientCloser(); err != nil {
logrus.Warn("Unable to close the client", err)
}
return
}

logrus.Info("Retrying to scrape pod ", s.name, " logs in ", wait.Seconds(), " seconds...")
logrus.Debug("Retrying to scrape pod ", s.name, " logs in ", wait.Seconds(), " seconds...")
select {
case <-time.After(wait):
break
Expand All @@ -121,8 +129,9 @@ func (s *PodScraper) handleAndRestart(ctx context.Context, err error, wait time.
s.doScrape(ctx, out, clientCloser)
}

// Waits for a given pod to reach the running state
func (s *PodScraper) waitForPodRunning(ctx context.Context, namespace string, name string) error {
// waitForPodRunning waits for a given pod to reach the running state.
// It may return the internal container to watch if present
func (s *PodScraper) waitForPodRunning(ctx context.Context, namespace string, name string) (string, error) {
pod := v1.Pod{
TypeMeta: metav1.TypeMeta{
Kind: "Pod",
Expand All @@ -135,22 +144,22 @@ func (s *PodScraper) waitForPodRunning(ctx context.Context, namespace string, na
}
resourceClient, _, err := k8sclient.GetResourceClient(pod.APIVersion, pod.Kind, pod.Namespace)
if err != nil {
return err
return "", err
}
watcher, err := resourceClient.Watch(metav1.ListOptions{
FieldSelector: "metadata.name=" + pod.Name,
})
if err != nil {
return err
return "", err
}
events := watcher.ResultChan()
for {
select {
case <-ctx.Done():
return ctx.Err()
return "", ctx.Err()
case e, ok := <-events:
if !ok {
return errors.New("event channel closed")
return "", errors.New("event channel closed")
}

if e.Object != nil {
Expand All @@ -161,19 +170,34 @@ func (s *PodScraper) waitForPodRunning(ctx context.Context, namespace string, na
pcopy := pod.DeepCopy()
err := k8sutil.UnstructuredIntoRuntimeObject(&unstr, pcopy)
if err != nil {
return err
return "", err
}

if pcopy.Status.Phase == v1.PodRunning {
return nil
return s.chooseContainer(pcopy), nil
}
}
} else if e.Type == watch.Deleted || e.Type == watch.Error {
return errors.New("unable to watch pod " + s.name)
return "", errors.New("unable to watch pod " + s.name)
}
case <-time.After(30 * time.Second):
return errors.New("no state change after 30 seconds for pod " + s.name)
return "", errors.New("no state change after 30 seconds for pod " + s.name)
}
}

}

func (s *PodScraper) chooseContainer(p *v1.Pod) string {
if p != nil {
if len(p.Spec.Containers) == 1 {
// Let Kubernetes auto-detect
return ""
}
for _, c := range p.Spec.Containers {
if _, ok := commonUserContainerNames[c.Name]; ok {
return c.Name
}
}
}
return ""
}