Skip to content

Commit

Permalink
refactor(controller): refine the abstract layer between job and k8s (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
anda-ren authored Aug 11, 2023
1 parent 69d71b9 commit 34eac83
Show file tree
Hide file tree
Showing 101 changed files with 3,152 additions and 678 deletions.
8 changes: 8 additions & 0 deletions server/controller/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,14 @@
<groupId>io.kubernetes</groupId>
<artifactId>client-java</artifactId>
</dependency>
<dependency>
<groupId>com.github.docker-java</groupId>
<artifactId>docker-java</artifactId>
</dependency>
<dependency>
<groupId>com.github.docker-java</groupId>
<artifactId>docker-java-transport-httpclient5</artifactId>
</dependency>
<dependency>
<groupId>org.xerial.snappy</groupId>
<artifactId>snappy-java</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
package ai.starwhale.mlops.api;

import ai.starwhale.mlops.common.IdConverter;
import ai.starwhale.mlops.schedule.k8s.log.CancellableJobLogCollector;
import ai.starwhale.mlops.schedule.k8s.log.CancellableJobLogK8sCollectorFactory;
import ai.starwhale.mlops.schedule.impl.k8s.log.CancellableJobLogK8sCollectorFactory;
import ai.starwhale.mlops.schedule.log.TaskLogStreamingCollector;
import io.kubernetes.client.openapi.ApiException;
import java.io.IOException;
import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -51,7 +51,7 @@ public class DatasetBuildLogWsServer {

private Long id;

private CancellableJobLogCollector logCollector;
private TaskLogStreamingCollector logCollector;


@Autowired
Expand Down Expand Up @@ -79,7 +79,7 @@ public void onOpen(Session session, @PathParam("name") String name, @PathParam("
String line;
while (true) {
try {
if ((line = logCollector.readLine()) == null) {
if ((line = logCollector.readLine(10L)) == null) {
break;
}
sendMessage(line);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@
package ai.starwhale.mlops.api;

import ai.starwhale.mlops.common.IdConverter;
import ai.starwhale.mlops.schedule.k8s.log.CancellableJobLogCollector;
import ai.starwhale.mlops.schedule.k8s.log.CancellableJobLogK8sCollectorFactory;
import io.kubernetes.client.openapi.ApiException;
import ai.starwhale.mlops.domain.job.step.bo.Step;
import ai.starwhale.mlops.domain.task.bo.Task;
import ai.starwhale.mlops.exception.StarwhaleException;
import ai.starwhale.mlops.schedule.log.TaskLogCollectorFactory;
import ai.starwhale.mlops.schedule.log.TaskLogStreamingCollector;
import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
Expand All @@ -43,15 +45,15 @@ public class TaskLogWsServer {

private static IdConverter idConvertor;

private static CancellableJobLogK8sCollectorFactory logCollectorFactory;
private static TaskLogCollectorFactory taskLogCollectorFactory;

private Session session;

private String readerId;

private Long id;

private CancellableJobLogCollector logCollector;
private TaskLogStreamingCollector logCollector;


@Autowired
Expand All @@ -60,26 +62,27 @@ public void setIdConvertor(IdConverter idConvertor) {
}

@Autowired
public void setLogCollectorFactory(CancellableJobLogK8sCollectorFactory factory) {
TaskLogWsServer.logCollectorFactory = factory;
public void setTaskLogCollectorFactory(TaskLogCollectorFactory taskLogCollectorFactory) {
TaskLogWsServer.taskLogCollectorFactory = taskLogCollectorFactory;
}


@OnOpen
public void onOpen(Session session, @PathParam("taskId") String taskId) {
this.session = session;
this.readerId = session.getId();
this.id = idConvertor.revert(taskId);
try {
logCollector = logCollectorFactory.make(taskId);
} catch (IOException | ApiException e) {
logCollector = taskLogCollectorFactory.streamingCollector(Task.builder().id(id).step(new Step()).build());
} catch (StarwhaleException e) {
log.error("make k8s log collector failed", e);
}
log.info("Task log ws opened. reader={}, task={}", readerId, id);
executorService.submit(() -> {
String line;
while (true) {
try {
if ((line = logCollector.readLine()) == null) {
if ((line = logCollector.readLine(null)) == null) {
break;
}
sendMessage(line);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

package ai.starwhale.mlops.api.protocol.job;

import ai.starwhale.mlops.schedule.k8s.ResourceEventHolder;
import ai.starwhale.mlops.schedule.impl.k8s.ResourceEventHolder;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.swagger.v3.oas.annotations.media.Schema;
import java.io.Serializable;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
public class ScheduleConfig {

/**
* when system receive kill signal when there are scheduled tasks running, system should wait for running tasks to
* when system receive kill signal and there are scheduled tasks running, system should wait for running tasks to
* be done to exit
*
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@

package ai.starwhale.mlops.domain.dataset;

import static ai.starwhale.mlops.schedule.k8s.ResourceOverwriteSpec.RESOURCE_CPU;
import static ai.starwhale.mlops.schedule.k8s.ResourceOverwriteSpec.RESOURCE_MEMORY;
import static ai.starwhale.mlops.schedule.impl.k8s.ResourceOverwriteSpec.RESOURCE_CPU;
import static ai.starwhale.mlops.schedule.impl.k8s.ResourceOverwriteSpec.RESOURCE_MEMORY;
import static cn.hutool.core.util.BooleanUtil.toInt;

import ai.starwhale.mlops.api.protocol.dataset.DatasetInfoVo;
Expand Down Expand Up @@ -75,10 +75,10 @@
import ai.starwhale.mlops.exception.SwValidationException;
import ai.starwhale.mlops.exception.SwValidationException.ValidSubject;
import ai.starwhale.mlops.exception.api.StarwhaleApiException;
import ai.starwhale.mlops.schedule.k8s.ContainerOverwriteSpec;
import ai.starwhale.mlops.schedule.k8s.K8sClient;
import ai.starwhale.mlops.schedule.k8s.K8sJobTemplate;
import ai.starwhale.mlops.schedule.k8s.ResourceOverwriteSpec;
import ai.starwhale.mlops.schedule.impl.k8s.ContainerOverwriteSpec;
import ai.starwhale.mlops.schedule.impl.k8s.K8sClient;
import ai.starwhale.mlops.schedule.impl.k8s.K8sJobTemplate;
import ai.starwhale.mlops.schedule.impl.k8s.ResourceOverwriteSpec;
import ai.starwhale.mlops.storage.StorageAccessService;
import cn.hutool.core.util.StrUtil;
import cn.hutool.json.JSONUtil;
Expand Down Expand Up @@ -607,7 +607,7 @@ var record = buildRecordMapper.selectById(id);
return new String(inputStream.readAllBytes(), StandardCharsets.UTF_8);
} catch (IOException e) {
throw new SwProcessException(ErrorType.DB,
MessageFormat.format("read build log path failed {}", id), e
MessageFormat.format("read build log path failed {0}", id), e
);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
import ai.starwhale.mlops.exception.StarwhaleException;
import ai.starwhale.mlops.exception.SwProcessException;
import ai.starwhale.mlops.exception.SwProcessException.ErrorType;
import ai.starwhale.mlops.schedule.k8s.K8sClient;
import ai.starwhale.mlops.schedule.k8s.K8sJobTemplate;
import ai.starwhale.mlops.schedule.impl.k8s.K8sClient;
import ai.starwhale.mlops.schedule.impl.k8s.K8sJobTemplate;
import ai.starwhale.mlops.storage.StorageAccessService;
import io.kubernetes.client.openapi.ApiException;
import io.kubernetes.client.openapi.models.V1Pod;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package ai.starwhale.mlops.domain.job;

import ai.starwhale.mlops.api.protocol.runtime.DeviceVo;
import ai.starwhale.mlops.schedule.k8s.ResourceOverwriteSpec;
import ai.starwhale.mlops.schedule.impl.k8s.ResourceOverwriteSpec;
import java.util.ArrayList;
import java.util.List;
import org.springframework.stereotype.Service;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,10 @@
import ai.starwhale.mlops.exception.SwProcessException;
import ai.starwhale.mlops.exception.SwValidationException;
import ai.starwhale.mlops.exception.api.StarwhaleApiException;
import ai.starwhale.mlops.schedule.k8s.K8sClient;
import ai.starwhale.mlops.schedule.k8s.K8sJobTemplate;
import ai.starwhale.mlops.schedule.k8s.ResourceEventHolder;
import ai.starwhale.mlops.schedule.k8s.ResourceOverwriteSpec;
import ai.starwhale.mlops.schedule.impl.k8s.K8sClient;
import ai.starwhale.mlops.schedule.impl.k8s.K8sJobTemplate;
import ai.starwhale.mlops.schedule.impl.k8s.ResourceEventHolder;
import ai.starwhale.mlops.schedule.impl.k8s.ResourceOverwriteSpec;
import com.fasterxml.jackson.core.JsonProcessingException;
import io.kubernetes.client.custom.IntOrString;
import io.kubernetes.client.informer.ResourceEventHandler;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import ai.starwhale.mlops.domain.task.status.WatchableTask;
import ai.starwhale.mlops.domain.task.status.WatchableTaskFactory;
import ai.starwhale.mlops.schedule.SwTaskScheduler;
import ai.starwhale.mlops.schedule.reporting.TaskReportReceiver;
import java.util.Collection;
import java.util.List;
import java.util.stream.Collectors;
Expand All @@ -44,11 +45,14 @@ public class JobLoader {

final SwTaskScheduler swTaskScheduler;

final TaskReportReceiver taskReportReceiver;

public JobLoader(HotJobHolder jobHolder, WatchableTaskFactory watchableTaskFactory,
SwTaskScheduler swTaskScheduler) {
SwTaskScheduler swTaskScheduler, TaskReportReceiver taskReportReceiver) {
this.jobHolder = jobHolder;
this.watchableTaskFactory = watchableTaskFactory;
this.swTaskScheduler = swTaskScheduler;
this.taskReportReceiver = taskReportReceiver;
}

public Job load(@NotNull Job job, Boolean resumePausedOrFailTasks) {
Expand Down Expand Up @@ -91,7 +95,7 @@ void scheduleReadyTasks(Collection<Task> tasks) {
if (CollectionUtils.isEmpty(tasks)) {
return;
}
swTaskScheduler.schedule(tasks);
swTaskScheduler.schedule(tasks, taskReportReceiver);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import ai.starwhale.mlops.configuration.DockerSetting;
import ai.starwhale.mlops.domain.system.SystemSetting;
import ai.starwhale.mlops.domain.system.SystemSettingListener;
import ai.starwhale.mlops.schedule.k8s.K8sClient;
import ai.starwhale.mlops.schedule.impl.k8s.K8sClient;
import cn.hutool.json.JSONUtil;
import io.kubernetes.client.openapi.ApiException;
import io.kubernetes.client.openapi.models.V1ObjectMeta;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,9 @@
import ai.starwhale.mlops.exception.SwValidationException;
import ai.starwhale.mlops.exception.SwValidationException.ValidSubject;
import ai.starwhale.mlops.exception.api.StarwhaleApiException;
import ai.starwhale.mlops.schedule.k8s.ContainerOverwriteSpec;
import ai.starwhale.mlops.schedule.k8s.K8sClient;
import ai.starwhale.mlops.schedule.k8s.K8sJobTemplate;
import ai.starwhale.mlops.schedule.impl.k8s.ContainerOverwriteSpec;
import ai.starwhale.mlops.schedule.impl.k8s.K8sClient;
import ai.starwhale.mlops.schedule.impl.k8s.K8sJobTemplate;
import ai.starwhale.mlops.storage.StorageAccessService;
import cn.hutool.core.util.StrUtil;
import cn.hutool.json.JSONUtil;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public LatestVersionVo getLatestVersion() {
}

public List<ResourcePool> listResourcePools() {
return systemSettingService.getResourcePools();
return systemSettingService.getResourcePoolsFromWeb();
}

public void updateResourcePools(List<ResourcePool> resourcePools) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,13 +111,17 @@ public ResourcePool queryResourcePool(String rpName) {
.orElse(ResourcePool.defaults());
}

public List<ResourcePool> getResourcePools() {
public List<ResourcePool> getResourcePoolsFromWeb() {
User user = userService.currentUserDetail();
var pools = CollectionUtils.isEmpty(this.systemSetting.getResourcePoolSetting())
? List.of(ResourcePool.defaults()) : this.systemSetting.getResourcePoolSetting();
var pools = getAllResourcePools();
return pools.stream().filter(rp -> rp.allowUser(user.getId())).collect(Collectors.toList());
}

public List<ResourcePool> getAllResourcePools() {
return CollectionUtils.isEmpty(this.systemSetting.getResourcePoolSetting())
? List.of(ResourcePool.defaults()) : this.systemSetting.getResourcePoolSetting();
}

public void updateResourcePools(List<ResourcePool> resourcePools) {
this.systemSetting.setResourcePoolSetting(resourcePools);
systemSettingMapper.put(querySetting());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

import ai.starwhale.mlops.common.Constants;
import ai.starwhale.mlops.domain.runtime.RuntimeResource;
import ai.starwhale.mlops.schedule.k8s.ResourceOverwriteSpec;
import ai.starwhale.mlops.schedule.impl.k8s.ResourceOverwriteSpec;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ public String logContent(Long taskId, String logFileName) {
return new String(inputStream.readAllBytes(), StandardCharsets.UTF_8);
} catch (IOException e) {
throw new SwProcessException(ErrorType.DB,
MessageFormat.format("read log path from db failed {}", taskId),
MessageFormat.format("read log path from db failed {0}", taskId),
e);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,9 @@ public boolean equals(Object obj) {
return false;
}
Task tsk = (Task) obj;
if (null != id) {
return this.id.equals(tsk.id);
}
return this.uuid.equals(tsk.uuid);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,17 @@ public boolean couldTransfer(TaskStatus statusNow, TaskStatus statusNew) {
return transferMap.get(statusNow).contains(statusNew);
}

public TaskStatus transfer(TaskStatus statusNow, TaskStatus statusNew) {
if (statusNow == CANCELLING) {
if (isFinal(statusNew)) {
return CANCELED;
} else {
return CANCELLING;
}
}
return statusNew;
}

public boolean isFinal(TaskStatus status) {
return transferMap.get(status).isEmpty();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public void updateStatus(TaskStatus status) {
oldStatus, status, originalTask.getId());
return;
}
originalTask.updateStatus(status);
originalTask.updateStatus(taskStatusMachine.transfer(oldStatus, status));
log.debug("task status changed from {} to {} of id {}", oldStatus, status, originalTask.getId());
watchers.stream().filter(w -> {
if (TaskStatusChangeWatcher.SKIPPED_WATCHERS.get() == null) {
Expand Down
Loading

0 comments on commit 34eac83

Please sign in to comment.