Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix k8s client #113

Merged
merged 3 commits into from
May 31, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 0 additions & 7 deletions paddlecloud/paddlecloud/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,13 +251,6 @@
PADDLE_BOOK_IMAGE="yancey1989/book-cloud"
PADDLE_BOOK_PORT=8888

if os.getenv("KUBERNETES_SERVICE_HOST", None):
# init kubernete client with service account
config.load_incluster_config()
else:
# init kubernetes client with ~/.kube/config file
config.load_kube_config()

# ============== Datacenter Storage Config Samples ==============
#if Paddle cloud use CephFS as backend storage, configure CEPHFS_CONFIGURATION
#the following is an example:
Expand Down
59 changes: 40 additions & 19 deletions paddlecloud/paddlejob/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ def get(self, request, format=None):
"""
username = request.user.username
namespace = notebook.utils.email_escape(username)
job_list = client.BatchV1Api().list_namespaced_job(namespace)
api_instance = client.BatchV1Api(api_client=notebook.utils.get_user_api_client(username))
job_list = api_instance.list_namespaced_job(namespace)
return Response(job_list.to_dict())

def post(self, request, format=None):
Expand All @@ -36,6 +37,7 @@ def post(self, request, format=None):
obj = json.loads(request.body)
topology = obj.get("topology", "")
entry = obj.get("entry", "")
api_client = notebook.utils.get_user_api_client(username)
if not topology and not entry:
return utils.simple_response(500, "no topology or entry specified")
if not obj.get("datacenter"):
Expand Down Expand Up @@ -102,7 +104,7 @@ def post(self, request, format=None):
volumes = volumes
)
try:
ret = client.ExtensionsV1beta1Api().create_namespaced_replica_set(
ret = client.ExtensionsV1beta1Api(api_client=api_client).create_namespaced_replica_set(
namespace,
paddle_job.new_pserver_job(),
pretty=True)
Expand All @@ -112,7 +114,7 @@ def post(self, request, format=None):

#submit trainer job, it's Kubernetes Job
try:
ret = client.BatchV1Api().create_namespaced_job(
ret = client.BatchV1Api(api_client=api_client).create_namespaced_job(
namespace,
paddle_job.new_trainer_job(),
pretty=True)
Expand All @@ -129,6 +131,7 @@ def delete(self, request, format=None):
namespace = notebook.utils.email_escape(username)
obj = json.loads(request.body)
jobname = obj.get("jobname")
api_client = notebook.utils.get_user_api_client(username)
if not jobname:
return utils.simple_response(500, "must specify jobname")
# FIXME: options needed: grace_period_seconds, orphan_dependents, preconditions
Expand All @@ -137,34 +140,42 @@ def delete(self, request, format=None):
# delete job
trainer_name = jobname + "-trainer"
try:
u_status = client.BatchV1Api().delete_namespaced_job(trainer_name, namespace, {})
u_status = client.BatchV1Api(api_client=api_client)\
.delete_namespaced_job(trainer_name, namespace, {})
except ApiException, e:
logging.error("error deleting job: %s, %s", jobname, str(e))
delete_status.append(str(e))

# delete job pods
try:
job_pod_list = client.CoreV1Api().list_namespaced_pod(namespace, label_selector="paddle-job=%s"%jobname)
job_pod_list = client.CoreV1Api(api_client=api_client)\
.list_namespaced_pod(namespace,
label_selector="paddle-job=%s"%jobname)
for i in job_pod_list.items:
u_status = client.CoreV1Api().delete_namespaced_pod(i.metadata.name, namespace, {})
u_status = client.CoreV1Api(api_client=api_client)\
.delete_namespaced_pod(i.metadata.name, namespace, {})
except ApiException, e:
logging.error("error deleting job pod: %s", str(e))
delete_status.append(str(e))

# delete pserver rs
pserver_name = jobname + "-pserver"
try:
u_status = client.ExtensionsV1beta1Api().delete_namespaced_replica_set(pserver_name, namespace, {})
u_status = client.ExtensionsV1beta1Api(api_client=api_client)\
.delete_namespaced_replica_set(pserver_name, namespace, {})
except ApiException, e:
logging.error("error deleting pserver: %s" % str(e))
delete_status.append(str(e))

# delete pserver pods
try:
# pserver replica set has label with jobname
job_pod_list = client.CoreV1Api().list_namespaced_pod(namespace, label_selector="paddle-job-pserver=%s"%jobname)
job_pod_list = client.CoreV1Api(api_client=api_client)\
.list_namespaced_pod(namespace,
label_selector="paddle-job-pserver=%s"%jobname)
for i in job_pod_list.items:
u_status = client.CoreV1Api().delete_namespaced_pod(i.metadata.name, namespace, {})
u_status = client.CoreV1Api(api_client=api_client)\
.delete_namespaced_pod(i.metadata.name, namespace, {})
except ApiException, e:
logging.error("error deleting pserver pods: %s" % str(e))
delete_status.append(str(e))
Expand All @@ -184,25 +195,31 @@ def get(self, request, format=None):
"""
username = request.user.username
namespace = notebook.utils.email_escape(username)

api_client = notebook.utils.get_user_api_client(username)
jobname = request.query_params.get("jobname")
num_lines = request.query_params.get("n")
worker = request.query_params.get("w")
job_pod_list = client.CoreV1Api().list_namespaced_pod(namespace, label_selector="paddle-job=%s"%jobname)
job_pod_list = client.CoreV1Api(api_client=api_client)\
.list_namespaced_pod(namespace, label_selector="paddle-job=%s"%jobname)
total_job_log = ""
if not worker:
for i in job_pod_list.items:
total_job_log = "".join((total_job_log, "==========================%s==========================" % i.metadata.name))
if num_lines:
pod_log = client.CoreV1Api().read_namespaced_pod_log(i.metadata.name, namespace, tail_lines=int(num_lines))
pod_log = client.CoreV1Api(api_client=api_client)\
.read_namespaced_pod_log(
i.metadata.name, namespace, tail_lines=int(num_lines))
else:
pod_log = client.CoreV1Api().read_namespaced_pod_log(i.metadata.name, namespace)
pod_log = client.CoreV1Api(api_client=api_client)\
.read_namespaced_pod_log(i.metadata.name, namespace)
total_job_log = "\n".join((total_job_log, pod_log))
else:
if num_lines:
pod_log = client.CoreV1Api().read_namespaced_pod_log(worker, namespace, tail_lines=int(num_lines))
pod_log = client.CoreV1Api(api_client=api_client)\
.read_namespaced_pod_log(worker, namespace, tail_lines=int(num_lines))
else:
pod_log = client.CoreV1Api().read_namespaced_pod_log(worker, namespace)
pod_log = client.CoreV1Api(api_client=api_client)\
.read_namespaced_pod_log(worker, namespace)
total_job_log = pod_log
return utils.simple_response(200, total_job_log)

Expand All @@ -217,11 +234,14 @@ def get(self, request, format=None):
namespace = notebook.utils.email_escape(username)
jobname = request.query_params.get("jobname")
job_pod_list = None
api_client = notebook.utils.get_user_api_client(username)
if not jobname:
job_pod_list = client.CoreV1Api().list_namespaced_pod(namespace)
job_pod_list = client.CoreV1Api(api_client=api_client)\
.list_namespaced_pod(namespace)
else:
selector = "paddle-job=%s"%jobname
job_pod_list = client.CoreV1Api().list_namespaced_pod(namespace, label_selector=selector)
job_pod_list = client.CoreV1Api(api_client=api_client)\
.list_namespaced_pod(namespace, label_selector=selector)
return Response(job_pod_list.to_dict())

class QuotaView(APIView):
Expand All @@ -233,6 +253,7 @@ def get(self, request, format=None):
"""
username = request.user.username
namespace = notebook.utils.email_escape(username)

quota_list = client.CoreV1Api().list_namespaced_resource_quota(namespace)
api_client = notebook.utils.get_user_api_client(username)
quota_list = api_client.CoreV1Api(api_cilent=api_client)\
.list_namespaced_resource_quota(namespace)
return Response(quota_list.to_dict())