forked from unraveldata-org/tagging
-
Notifications
You must be signed in to change notification settings - Fork 0
/
tag_realuser.py
66 lines (52 loc) · 1.83 KB
/
tag_realuser.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
queue_prefix = "root."
queue_prefix_length = len(queue_prefix)
def get_group_tag(queue):
group = None
if queue and queue.startswith(queue_prefix):
group_end_index = queue.rfind('.')
if group_end_index > -1:
# '.' is before queue_prefix_length, then queue is root.FOO, else root.FOO.<user>
if group_end_index < queue_prefix_length:
group = queue[queue_prefix_length:]
else:
group = queue[queue_prefix_length:group_end_index]
return group
def normalize_username(user):
index = user.find('@')
if index != -1:
user = user[:index]
return user
def get_mr_job_type(app_obj):
if app_obj.getAppConf("pig.version"):
return "mapreduce-pig"
elif app_obj.getAppConf("hive.exec.plan"):
return "mapreduce-hive"
elif app_obj.getAppConf("mapred.map.runner.class") and app_obj.getAppConf("mapred.map.runner.class") == "org.apache.hadoop.streaming.PipeMapRunner":
return "mapreduce-streaming"
elif app_obj.getAppConf("distcp.job.dir"):
return "mapreduce-distcp"
elif app_obj.getAppConf("cascading.app.id"):
return "mapreduce-cascading"
else:
return "mapreduce-other"
def get_tags(app_obj):
if app_obj is None:
return None
tags = {}
realuser = None
job_type = None
group = get_group_tag(app_obj.getQueue())
app_type = app_obj.getAppType()
if app_type == 'mr':
realuser = app_obj.getAppConf("hive.sentry.subject.name")
job_type = get_mr_job_type(app_obj)
if realuser:
tags['realuser'] = realuser
else:
tags['realuser'] = app_obj.getUsername()
tags['realuser'] = normalize_username(tags['realuser'])
if job_type:
tags['jobtype'] = job_type
if group:
tags['group'] = group
return tags