Skip to content

Commit

Permalink
[INLONG-10459][Manager] Support schedule instance callback to submit …
Browse files Browse the repository at this point in the history
…Flink batch job (#10510)

* [INLONG-10459][Manager] Support schedule instance callback to submit Flink batch job
  • Loading branch information
aloyszhang authored and Aloys Zhang committed Jul 9, 2024
1 parent 4cd124b commit 1c37054
Show file tree
Hide file tree
Showing 15 changed files with 242 additions and 71 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@
import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.enums.GroupOperateType;
import org.apache.inlong.manager.common.enums.TaskEvent;
import org.apache.inlong.manager.plugin.util.FlinkUtils;
import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.pojo.stream.InlongStreamExtInfo;
import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.pojo.workflow.form.process.GroupResourceProcessForm;
Expand All @@ -32,9 +30,9 @@

import lombok.extern.slf4j.Slf4j;

import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;

import static org.apache.inlong.manager.plugin.util.FlinkUtils.submitFlinkJobs;

/**
* Listener of startup sort.
Expand Down Expand Up @@ -62,8 +60,7 @@ public boolean accept(WorkflowContext workflowContext) {
}

log.info("add startup group listener for groupId [{}]", groupId);
return (InlongConstants.DATASYNC_REALTIME_MODE.equals(groupProcessForm.getGroupInfo().getInlongGroupMode())
|| InlongConstants.DATASYNC_OFFLINE_MODE.equals(groupProcessForm.getGroupInfo().getInlongGroupMode()));
return (InlongConstants.DATASYNC_REALTIME_MODE.equals(groupProcessForm.getGroupInfo().getInlongGroupMode()));
}

@Override
Expand All @@ -77,36 +74,9 @@ public ListenerResult listen(WorkflowContext context) throws Exception {
}

GroupResourceProcessForm groupResourceForm = (GroupResourceProcessForm) processForm;
InlongGroupInfo groupInfo = groupResourceForm.getGroupInfo();
// do not build sort config if the group mode is offline
if (InlongConstants.DATASYNC_OFFLINE_MODE.equals(groupInfo.getInlongGroupMode())) {
log.info("no need to launching sort job for groupId={} as the mode is offline",
groupId);
return ListenerResult.success();
}
List<InlongStreamInfo> streamInfos = groupResourceForm.getStreamInfos();
int sinkCount = streamInfos.stream()
.map(s -> s.getSinkList() == null ? 0 : s.getSinkList().size())
.reduce(0, Integer::sum);
if (sinkCount == 0) {
log.warn("not any sink configured for group {}, skip launching sort job", groupId);
return ListenerResult.success();
}

List<ListenerResult> listenerResults = new ArrayList<>();
for (InlongStreamInfo streamInfo : streamInfos) {
listenerResults.add(FlinkUtils.submitFlinkJob(streamInfo,
FlinkUtils.genFlinkJobName(processForm, streamInfo)));
}

// only one stream in group for now
// we can return the list of ListenerResult if support multi-stream in the future
List<ListenerResult> failedStreams = listenerResults.stream()
.filter(t -> !t.isSuccess()).collect(Collectors.toList());
if (failedStreams.isEmpty()) {
return ListenerResult.success();
}
return ListenerResult.fail(failedStreams.get(0).getRemark());
return submitFlinkJobs(groupId, streamInfos);
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.manager.plugin.offline;

import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.workflow.processor.OfflineJobOperator;

import lombok.NoArgsConstructor;

import java.util.List;

import static org.apache.inlong.manager.plugin.util.FlinkUtils.submitFlinkJobs;

@NoArgsConstructor
public class FlinkOfflineJobOperator implements OfflineJobOperator {

@Override
public void submitOfflineJob(String groupId, List<InlongStreamInfo> streamInfoList) throws Exception {
submitFlinkJobs(groupId, streamInfoList);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,33 @@ public static FlinkConfig getFlinkConfigFromFile() throws Exception {
return flinkConfig;
}

public static ListenerResult submitFlinkJobs(String groupId, List<InlongStreamInfo> streamInfoList)
throws Exception {
int sinkCount = streamInfoList.stream()
.map(s -> s.getSinkList() == null ? 0 : s.getSinkList().size())
.reduce(0, Integer::sum);
if (sinkCount == 0) {
log.warn("Not any sink configured for group {} and stream list {}, skip launching sort job", groupId,
streamInfoList.stream()
.map(s -> s.getInlongGroupId() + ":" + s.getName()).collect(Collectors.toList()));
return ListenerResult.success();
}

List<ListenerResult> listenerResults = new ArrayList<>();
for (InlongStreamInfo streamInfo : streamInfoList) {
listenerResults.add(FlinkUtils.submitFlinkJob(streamInfo, FlinkUtils.genFlinkJobName(streamInfo)));
}

// only one stream in group for now
// we can return the list of ListenerResult if support multi-stream in the future
List<ListenerResult> failedStreams = listenerResults.stream()
.filter(t -> !t.isSuccess()).collect(Collectors.toList());
if (failedStreams.isEmpty()) {
return ListenerResult.success();
}
return ListenerResult.fail(failedStreams.get(0).getRemark());
}

public static ListenerResult submitFlinkJob(InlongStreamInfo streamInfo, String jobName) throws Exception {
List<StreamSink> sinkList = streamInfo.getSinkList();
List<String> sinkTypes = sinkList.stream().map(StreamSink::getSinkType).collect(Collectors.toList());
Expand Down Expand Up @@ -295,4 +322,9 @@ public static String genFlinkJobName(ProcessForm processForm, InlongStreamInfo s
return Constants.SORT_JOB_NAME_GENERATOR.apply(processForm) + InlongConstants.HYPHEN
+ streamInfo.getInlongStreamId();
}

public static String genFlinkJobName(InlongStreamInfo streamInfo) {
return String.format(Constants.SORT_JOB_NAME_TEMPLATE, streamInfo.getInlongGroupId()) + InlongConstants.HYPHEN
+ streamInfo.getInlongStreamId();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -89,16 +89,16 @@ public boolean equals(Object o) {
return false;
}
ScheduleInfo that = (ScheduleInfo) o;
return Objects.equals(id, that.id) && Objects.equals(inlongGroupId, that.inlongGroupId)
&& Objects.equals(scheduleType, that.scheduleType) && Objects.equals(scheduleUnit,
that.scheduleUnit)
return Objects.equals(inlongGroupId, that.inlongGroupId)
&& Objects.equals(scheduleType, that.scheduleType)
&& Objects.equals(scheduleUnit, that.scheduleUnit)
&& Objects.equals(scheduleInterval, that.scheduleInterval)
&& Objects.equals(startTime, that.startTime) && Objects.equals(endTime, that.endTime)
&& Objects.equals(delayTime, that.delayTime) && Objects.equals(selfDepend,
that.selfDepend)
&& Objects.equals(startTime, that.startTime)
&& Objects.equals(endTime, that.endTime)
&& Objects.equals(delayTime, that.delayTime)
&& Objects.equals(selfDepend, that.selfDepend)
&& Objects.equals(taskParallelism, that.taskParallelism)
&& Objects.equals(crontabExpression, that.crontabExpression) && Objects.equals(version,
that.version);
&& Objects.equals(crontabExpression, that.crontabExpression);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,16 +83,16 @@ public boolean equals(Object o) {
return false;
}
ScheduleInfoRequest that = (ScheduleInfoRequest) o;
return Objects.equals(id, that.id) && Objects.equals(inlongGroupId, that.inlongGroupId)
&& Objects.equals(scheduleType, that.scheduleType) && Objects.equals(scheduleUnit,
that.scheduleUnit)
return Objects.equals(inlongGroupId, that.inlongGroupId)
&& Objects.equals(scheduleType, that.scheduleType)
&& Objects.equals(scheduleUnit, that.scheduleUnit)
&& Objects.equals(scheduleInterval, that.scheduleInterval)
&& Objects.equals(startTime, that.startTime) && Objects.equals(endTime, that.endTime)
&& Objects.equals(delayTime, that.delayTime) && Objects.equals(selfDepend,
that.selfDepend)
&& Objects.equals(startTime, that.startTime)
&& Objects.equals(endTime, that.endTime)
&& Objects.equals(delayTime, that.delayTime)
&& Objects.equals(selfDepend, that.selfDepend)
&& Objects.equals(taskParallelism, that.taskParallelism)
&& Objects.equals(crontabExpression, that.crontabExpression) && Objects.equals(version,
that.version);
&& Objects.equals(crontabExpression, that.crontabExpression);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public class NoopScheduleClient implements ScheduleEngineClient {

@Override
public boolean accept(String engineType) {
return ScheduleEngineType.NONE.getType().equals(engineType);
return ScheduleEngineType.NONE.getType().equalsIgnoreCase(engineType);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,4 +218,11 @@ void updateAfterApprove(
*/
List<GroupFullInfo> getGroupByBackUpClusterTag(String clusterTag);

/**
* Submitting offline job for the given group.
* @param groupId the inlong group to submit offline job
*
* */
Boolean submitOfflineJob(String groupId);

}
Original file line number Diff line number Diff line change
Expand Up @@ -931,4 +931,22 @@ public List<GroupFullInfo> getGroupByBackUpClusterTag(String clusterTag) {
return groupInfoList;
}

@Override
public Boolean submitOfflineJob(String groupId) {
// 1. get stream info list
InlongGroupInfo groupInfo = get(groupId);
if (groupInfo == null) {
String msg = String.format("InLong group not found for group=%s", groupId);
LOGGER.error(msg);
throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND);
}

List<InlongStreamInfo> streamInfoList = streamService.list(groupId);
if (CollectionUtils.isEmpty(streamInfoList)) {
LOGGER.warn("No stream info found for group {}, skip submit offline job", groupId);
return false;
}
return scheduleOperator.submitOfflineJob(groupId, streamInfoList);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -125,12 +125,6 @@ public ListenerResult listen(WorkflowContext context) throws WorkflowListenerExc

try {
for (InlongStreamInfo streamInfo : streamInfos) {
// do not build sort config if the group mode is offline
if (InlongConstants.DATASYNC_OFFLINE_MODE.equals(groupInfo.getInlongGroupMode())) {
LOGGER.info("no need to build sort config for groupId={} streamId={} as the mode is offline",
groupId, streamInfo.getInlongStreamId());
continue;
}
List<StreamSink> sinkList = streamInfo.getSinkList();
if (CollectionUtils.isEmpty(sinkList)) {
continue;
Expand All @@ -143,8 +137,8 @@ public ListenerResult listen(WorkflowContext context) throws WorkflowListenerExc
}
}
} catch (Exception e) {
String msg = String.format("failed to build sort config for groupId=%s, ", groupId);
LOGGER.error(msg + "streamInfos=" + streamInfos, e);
String msg = String.format("Failed to build sort config for group=%s, ", groupId);
LOGGER.error("{} streamInfos={}", msg, streamInfos, e);
throw new WorkflowListenerException(msg + e.getMessage());
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.manager.service.schedule;

import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.workflow.processor.OfflineJobOperator;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class OfflineJobOperatorFactory {

private static final Logger log = LoggerFactory.getLogger(OfflineJobOperatorFactory.class);
public static final String DEFAULT_OPERATOR_CLASS_NAME =
"org.apache.inlong.manager.plugin.offline.FlinkOfflineJobOperator";

public static OfflineJobOperator getOfflineJobOperator() {
return getOfflineJobOperator(DEFAULT_OPERATOR_CLASS_NAME);
}

public static OfflineJobOperator getOfflineJobOperator(String operatorClassName) {
return getOfflineJobOperator(operatorClassName, Thread.currentThread().getContextClassLoader());
}

public static OfflineJobOperator getOfflineJobOperator(String operatorClassName, ClassLoader classLoader) {
try {
Class<?> operatorClass = classLoader.loadClass(operatorClassName);
Object operator = operatorClass.getDeclaredConstructor().newInstance();
return (OfflineJobOperator) operator;
} catch (Throwable e) {
log.error("Failed to get offline job operator: ", e);
throw new BusinessException("Failed to get offline job operator: " + e.getMessage());
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@

import org.apache.inlong.manager.pojo.schedule.ScheduleInfo;
import org.apache.inlong.manager.pojo.schedule.ScheduleInfoRequest;
import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;

import java.util.List;

/**
* Operator for schedule. Including:
Expand Down Expand Up @@ -92,4 +95,12 @@ public interface ScheduleOperator {
* @Return whether succeed
* */
Boolean handleGroupApprove(String groupId);

/**
* Start offline sync job when the schedule instance callback.
* @param groupId groupId to start offline job
* @param streamInfoList stream list to start offline job
* @Return whether succeed
* */
Boolean submitOfflineJob(String groupId, List<InlongStreamInfo> streamInfoList);
}
Loading

0 comments on commit 1c37054

Please sign in to comment.