Skip to content

Commit

Permalink
Rework log collector for GKE clusters (#222)
Browse files Browse the repository at this point in the history
Rework log collector for services
  • Loading branch information
ib-steffen authored Dec 1, 2018
1 parent 780c872 commit c54247a
Show file tree
Hide file tree
Showing 9 changed files with 287 additions and 50 deletions.
2 changes: 1 addition & 1 deletion ib.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
{'name': 'gerrit-review', 'depends_on': ['images-base']},
{'name': 'github-trigger', 'depends_on': ['images-base']},
{'name': 'github-review', 'depends_on': ['images-base']},
{'name': 'collector-api'},
{'name': 'collector-api', 'depends_on': ['images-base']},
{'name': 'job'},
{'name': 'opa'},
{'name': 'gc', 'depends_on': ['images-base']},
Expand Down
10 changes: 10 additions & 0 deletions src/api/handlers/job_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -480,6 +480,16 @@ class Archive(Resource):
def post(self):
job_id = g.token['job']['id']

j = g.db.execute_one_dict('''
SELECT id
FROM job
WHERE id = %s
AND (state = 'running' OR end_date > NOW() - INTERVAL '5 minutes')
''', [job_id])

if not j:
abort(401, 'Unauthorized')

for f in request.files:
stream = request.files[f].stream
key = '%s/%s' % (job_id, f)
Expand Down
4 changes: 0 additions & 4 deletions src/api/handlers/projects/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -307,15 +307,11 @@ def get(self, project_id, job_id):
# First restart
j['name'] = j['name'] + '.1'

logger.error(json.dumps(old_id_job, indent=4))

for j in jobs:
for dep in j['dependencies']:
if dep['job-id'] in old_id_job:
dep['job'] = old_id_job[dep['job-id']]['name']
dep['job-id'] = old_id_job[dep['job-id']]['id']
else:
logger.error('%s not found', dep['job'])

for j in jobs:
g.db.execute('''
Expand Down
12 changes: 9 additions & 3 deletions src/collector-api/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ def get(self):
return {'status': 200}

def handle_entry(entry):
if 'kubernetes' not in entry:
return

e = entry['kubernetes']
pod_path = os.path.join(storage_path, e['pod_id'])

Expand All @@ -35,11 +38,9 @@ def handle_entry(entry):
metadata_path = os.path.join(pod_path, "metadata.json")
log_path = os.path.join(pod_path, e['container_name'] +".log")


if not os.path.exists(metadata_path):
with open(metadata_path, 'w+') as metadata_file:
md = {
'namespace_id': e['namespace_id'],
'namespace_name': e['namespace_name'],
'pod_id': e['pod_id'],
'pod_name': e['pod_name'],
Expand All @@ -58,7 +59,9 @@ def handle_entry(entry):

if 'log' in entry:
with open(log_path, 'a+') as log_file:
log_file.write(entry['log'])
log = entry['log']
log = log.replace('\x00', '\n')
log_file.write(log)

@api.route('/api/log')
class Console(Resource):
Expand Down Expand Up @@ -114,6 +117,9 @@ def get(self, pod_id, container_name):
def main(): # pragma: no cover
app.config['MAX_CONTENT_LENGTH'] = 1024 * 1024 * 1024 * 4

if not os.path.exists(storage_path):
os.makedirs(storage_path)

port = int(os.environ.get('INFRABOX_PORT', 8080))
logger.info('Starting Server on port %s', port)
app.run(host='0.0.0.0', port=port)
Expand Down
1 change: 1 addition & 0 deletions src/dashboard-client/src/models/Job.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import Notification from '../models/Notification'
import NotificationService from '../services/NotificationService'
import NewAPIService from '../services/NewAPIService'
import store from '../store'
import router from '../router'
const Convert = require('ansi-to-html')

class Section {
Expand Down
11 changes: 9 additions & 2 deletions src/openpolicyagent/policies/job.rego
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,15 @@ allow {
allow {
api.method = "POST"
api.path = ["api", "job", suffix]
job_suffix := {"cache", "archive", "output", "create_jobs", "consoleupdate", "stats", "markup", "badge", "testresult"}
job_suffix := {"cache", "output", "create_jobs", "consoleupdate", "stats", "markup", "badge", "testresult"}
suffix = job_suffix[_]
api.token.type = "job"
api.token.job.state = job_state[_]
}
}

# Allow POST access to /api/job/archive for valid job tokens (for service uploads)
allow {
api.method = "POST"
api.path = ["api", "job", "archive"]
api.token.type = "job"
}
96 changes: 82 additions & 14 deletions src/services/aks/pkg/controller/akscluster/akscluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -514,8 +514,6 @@ func retrieveLogs(cr *v1alpha1.AKSCluster, cluster *RemoteCluster, log *logrus.E
return
}

log.Info(string(*data))

err = json.Unmarshal(*data, &pods)
if err != nil {
log.Errorf("Failed to collected pod list: %v", err)
Expand All @@ -532,7 +530,7 @@ func retrieveLogs(cr *v1alpha1.AKSCluster, cluster *RemoteCluster, log *logrus.E
continue
}

filename := "pod_" + pod.Namespace + "_" + pod.Pod + "_" + pod.PodID + ".txt"
filename := "pod_" + pod.Namespace + "_" + pod.Pod + "_" + container + ".txt"
err = uploadToArchive(cr, log, data, filename)
if err != nil {
log.Warningf("Failed to upload log to archive: %v", err)
Expand Down Expand Up @@ -568,6 +566,12 @@ func injectCollector(cluster *RemoteCluster, log *logrus.Entry) error {
return err
}

err = kubectlApply(cluster, newFluentbitConfigMap(), log)
if err != nil {
log.Errorf("Failed to create fluent bit config map: %v", err)
return err
}

err = kubectlApply(cluster, newCollectorDaemonSet(), log)
if err != nil {
log.Errorf("Failed to create collector daemon set: %v", err)
Expand Down Expand Up @@ -691,30 +695,83 @@ func newCollectorDeployment() *appsv1.Deployment {
}
}

func newFluentbitConfigMap() *v1.ConfigMap{
return &v1.ConfigMap{
TypeMeta: metav1.TypeMeta{
Kind: "ConfigMap",
APIVersion: "v1",
},
ObjectMeta: metav1.ObjectMeta{
Name: "infrabox-fluent-bit",
Namespace: "infrabox-collector",
},
Data: map[string]string {
"parsers.conf": `
[PARSER]
Name docker_utf8
Format json
Time_Key time
Time_Format %Y-%m-%dT%H:%M:%S.%L
Time_Keep On
Decode_Field_as escaped_utf8 log do_next
Decode_Field_as escaped log
`,
"fluent-bit.conf": `
[SERVICE]
Flush 2
Daemon Off
Log_Level info
Parsers_File parsers.conf
[INPUT]
Name tail
Path /var/log/containers/*.log
Parser docker_utf8
Tag kube.*
Refresh_Interval 2
Mem_Buf_Limit 50MB
Skip_Long_Lines On
[FILTER]
Name kubernetes
Match kube.*
Kube_URL https://kubernetes.default.svc.cluster.local:443
Kube_CA_File /var/run/secrets/kubernetes.io/serviceaccount/ca.crt
Kube_Token_File /var/run/secrets/kubernetes.io/serviceaccount/token
[OUTPUT]
Name http
Match *
Host infrabox-collector-api.infrabox-collector
Port 80
URI /api/log
Format json
`,
},
}
}

func newCollectorDaemonSet() *appsv1.DaemonSet {
return &appsv1.DaemonSet{
TypeMeta: metav1.TypeMeta{
Kind: "DaemonSet",
APIVersion: "extensions/v1beta1",
},
ObjectMeta: metav1.ObjectMeta{
Name: "infrabox-collector-fluentd",
Name: "infrabox-collector-fluent-bit",
Namespace: "infrabox-collector",
},
Spec: appsv1.DaemonSetSpec{
Template: v1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
"app": "fluentd.collector.infrabox.net",
"app": "fluentbit.collector.infrabox.net",
},
},
Spec: v1.PodSpec{
Containers: []v1.Container{{
Name: "fluentd",
Image: "quay.io/infrabox/collector-fluentd",
Name: "fluent-bit",
Image: "fluent/fluent-bit:0.13",
Resources: v1.ResourceRequirements{
Limits: v1.ResourceList{
"memory": resource.MustParse("200Mi"),
"memory": resource.MustParse("100Mi"),
},
Requests: v1.ResourceList{
"cpu": resource.MustParse("100m"),
Expand All @@ -728,10 +785,14 @@ func newCollectorDaemonSet() *appsv1.DaemonSet {
Name: "varlibdockercontainers",
MountPath: "/var/lib/docker/containers",
ReadOnly: true,
}},
Env: []v1.EnvVar{{
Name: "INFRABOX_COLLECTOR_ENDPOINT",
Value: "http://infrabox-collector-api.infrabox-collector/api/log",
}, {
Name: "config",
MountPath: "/fluent-bit/etc/parsers.conf",
SubPath: "parsers.conf",
}, {
Name: "config",
MountPath: "/fluent-bit/etc/fluent-bit.conf",
SubPath: "fluent-bit.conf",
}},
}},
Volumes: []v1.Volume{{
Expand All @@ -748,11 +809,18 @@ func newCollectorDaemonSet() *appsv1.DaemonSet {
Path: "/var/log",
},
},
}, {
Name: "config",
VolumeSource: v1.VolumeSource{
ConfigMap: &v1.ConfigMapVolumeSource{
LocalObjectReference: v1.LocalObjectReference{
Name: "infrabox-fluent-bit",
},
},
},
}},
},
},
},
}
}

// newPodForCR returns a busybox pod with the same name/namespace as the cr
Loading

0 comments on commit c54247a

Please sign in to comment.