Skip to content

Commit

Permalink
[INLONG-10514][Manager] Support built-in schedule engine trigger subm…
Browse files Browse the repository at this point in the history
…itting of Flink batch job (apache#10515)
  • Loading branch information
aloyszhang committed Jul 9, 2024
1 parent 23e7dd0 commit b2bc33b
Show file tree
Hide file tree
Showing 14 changed files with 156 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -320,4 +320,9 @@ public Boolean finishTagSwitch(String groupId) {
return response.getData();
}

public Boolean submitOfflineJob(String groupId) {
Response<Boolean> responseBody = ClientUtils.executeHttpCall(inlongGroupApi.submitOfflineJob(groupId));
ClientUtils.assertRespSuccess(responseBody);
return responseBody.getData();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -98,4 +98,7 @@ public interface InlongGroupApi {

@GET("group/switch/finish/{groupId}")
Call<Response<Boolean>> finishTagSwitch(@Path("groupId") String groupId);

@POST("group/submitOfflineJob/{groupId}")
Call<Response<Boolean>> submitOfflineJob(@Path("groupId") String groupId);
}
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public class InlongConstants {
public static final Integer DATASYNC_REALTIME_MODE = 1;
public static final Integer DATASYNC_OFFLINE_MODE = 2;

public static final String RUNTIME_EXECUTION_MODE_STREAMING = "streaming";
public static final String RUNTIME_EXECUTION_MODE_STREAM = "stream";
public static final String RUNTIME_EXECUTION_MODE_BATCH = "batch";

public static final Integer DISABLE_ZK = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,6 @@ public class FlinkOfflineJobOperator implements OfflineJobOperator {

@Override
public void submitOfflineJob(String groupId, List<InlongStreamInfo> streamInfoList) throws Exception {
submitFlinkJobs(groupId, streamInfoList);
submitFlinkJobs(groupId, streamInfoList, true);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@
import java.util.regex.Pattern;
import java.util.stream.Collectors;

import static org.apache.inlong.manager.common.consts.InlongConstants.RUNTIME_EXECUTION_MODE_BATCH;
import static org.apache.inlong.manager.common.consts.InlongConstants.RUNTIME_EXECUTION_MODE_STREAM;
import static org.apache.inlong.manager.plugin.flink.enums.Constants.ADDRESS;
import static org.apache.inlong.manager.plugin.flink.enums.Constants.DRAIN;
import static org.apache.inlong.manager.plugin.flink.enums.Constants.FLINK_VERSION;
Expand Down Expand Up @@ -220,6 +222,11 @@ public static FlinkConfig getFlinkConfigFromFile() throws Exception {

public static ListenerResult submitFlinkJobs(String groupId, List<InlongStreamInfo> streamInfoList)
throws Exception {
return submitFlinkJobs(groupId, streamInfoList, false);
}

public static ListenerResult submitFlinkJobs(String groupId, List<InlongStreamInfo> streamInfoList,
boolean isBatchJob) throws Exception {
int sinkCount = streamInfoList.stream()
.map(s -> s.getSinkList() == null ? 0 : s.getSinkList().size())
.reduce(0, Integer::sum);
Expand All @@ -232,7 +239,8 @@ public static ListenerResult submitFlinkJobs(String groupId, List<InlongStreamIn

List<ListenerResult> listenerResults = new ArrayList<>();
for (InlongStreamInfo streamInfo : streamInfoList) {
listenerResults.add(FlinkUtils.submitFlinkJob(streamInfo, FlinkUtils.genFlinkJobName(streamInfo)));
listenerResults.add(
FlinkUtils.submitFlinkJob(streamInfo, FlinkUtils.genFlinkJobName(streamInfo), isBatchJob));
}

// only one stream in group for now
Expand All @@ -246,6 +254,11 @@ public static ListenerResult submitFlinkJobs(String groupId, List<InlongStreamIn
}

public static ListenerResult submitFlinkJob(InlongStreamInfo streamInfo, String jobName) throws Exception {
return submitFlinkJob(streamInfo, jobName, false);
}

public static ListenerResult submitFlinkJob(InlongStreamInfo streamInfo, String jobName, boolean isBatchJob)
throws Exception {
List<StreamSink> sinkList = streamInfo.getSinkList();
List<String> sinkTypes = sinkList.stream().map(StreamSink::getSinkType).collect(Collectors.toList());
if (CollectionUtils.isEmpty(sinkList) || !SinkType.containSortFlinkSink(sinkTypes)) {
Expand Down Expand Up @@ -283,6 +296,11 @@ public static ListenerResult submitFlinkJob(InlongStreamInfo streamInfo, String
String sortUrl = kvConf.get(InlongConstants.SORT_URL);
flinkInfo.setEndpoint(sortUrl);
flinkInfo.setInlongStreamInfoList(Collections.singletonList(streamInfo));
if (isBatchJob) {
flinkInfo.setRuntimeExecutionMode(RUNTIME_EXECUTION_MODE_BATCH);
} else {
flinkInfo.setRuntimeExecutionMode(RUNTIME_EXECUTION_MODE_STREAM);
}
FlinkOperation flinkOperation = FlinkOperation.getInstance();
try {
flinkOperation.genPath(flinkInfo, dataflow);
Expand Down
5 changes: 5 additions & 0 deletions inlong-manager/manager-schedule/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@
<artifactId>manager-pojo</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.inlong</groupId>
<artifactId>manager-client</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.quartz-scheduler</groupId>
<artifactId>quartz</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,67 @@

package org.apache.inlong.manager.schedule.quartz;

import org.apache.inlong.manager.pojo.schedule.ScheduleInfo;
import org.apache.inlong.manager.client.api.ClientConfiguration;
import org.apache.inlong.manager.client.api.impl.InlongClientImpl;
import org.apache.inlong.manager.client.api.inner.client.ClientFactory;
import org.apache.inlong.manager.client.api.inner.client.InlongGroupClient;
import org.apache.inlong.manager.client.api.util.ClientUtils;
import org.apache.inlong.manager.common.auth.DefaultAuthentication;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.quartz.Job;
import org.quartz.JobDataMap;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;

import static org.apache.inlong.manager.schedule.util.ScheduleUtils.MANAGER_HOST;
import static org.apache.inlong.manager.schedule.util.ScheduleUtils.MANAGER_PORT;
import static org.apache.inlong.manager.schedule.util.ScheduleUtils.SECRETE_ID;
import static org.apache.inlong.manager.schedule.util.ScheduleUtils.SECRETE_KEY;

@Data
@NoArgsConstructor
@AllArgsConstructor
@Service
public class QuartzOfflineSyncJob implements Job {

private ScheduleInfo scheduleInfo;
private static final Logger LOGGER = LoggerFactory.getLogger(QuartzOfflineSyncJob.class);

private volatile InlongGroupClient groupClient;

@Override
public void execute(JobExecutionContext context) throws JobExecutionException {
// TODO: complete the offline sync logic
LOGGER.info("QuartzOfflineSyncJob run once");
JobDataMap jobDataMap = context.getJobDetail().getJobDataMap();
initGroupClientIfNeeded(jobDataMap);
String inlongGroupId = context.getJobDetail().getKey().getName();
LOGGER.info("Starting submit offline job for group {}", inlongGroupId);
if (groupClient.submitOfflineJob(inlongGroupId)) {
LOGGER.info("Successfully submitting offline job for group {}", inlongGroupId);
} else {
LOGGER.warn("Failed to submit offline job for group {}", inlongGroupId);
}
}

private void initGroupClientIfNeeded(JobDataMap jobDataMap) {
if (groupClient == null) {
String host = (String) jobDataMap.get(MANAGER_HOST);
int port = (int) jobDataMap.get(MANAGER_PORT);
String secreteId = (String) jobDataMap.get(SECRETE_ID);
String secreteKey = (String) jobDataMap.get(SECRETE_KEY);
LOGGER.info("Initializing Inlong group client, with host: {}, port: {}, userName : {}",
host, port, secreteId);
ClientConfiguration configuration = new ClientConfiguration();
configuration.setAuthentication(new DefaultAuthentication(secreteId, secreteKey));
String serviceUrl = host + ":" + port;
InlongClientImpl inlongClient = new InlongClientImpl(serviceUrl, configuration);
ClientFactory clientFactory = ClientUtils.getClientFactory(inlongClient.getConfiguration());
groupClient = clientFactory.getGroupClient();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.quartz.impl.StdSchedulerFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;

import java.util.HashSet;
Expand All @@ -50,6 +51,18 @@ public class QuartzScheduleEngine implements ScheduleEngine {

private static final Logger LOGGER = LoggerFactory.getLogger(QuartzScheduleEngine.class);

@Value("${server.host:127.0.0.1}")
private String host;

@Value("${server.port:8083}")
private int port;

@Value("${inlong.inner.secrete.id:admin}")
private String secretId;

@Value("${inlong.inner.secrete.key:87haw3VYTPqK5fK0}")
private String secretKey;

private final Scheduler scheduler;
private final Set<String> scheduledJobSet = new HashSet<>();

Expand All @@ -60,6 +73,7 @@ public QuartzScheduleEngine() {
} catch (SchedulerException e) {
throw new QuartzScheduleException("Failed to init quartz scheduler ", e);
}
start();
}

@Override
Expand All @@ -68,7 +82,8 @@ public void start() {
// add listener
scheduler.getListenerManager().addSchedulerListener(new QuartzSchedulerListener(this));
scheduler.start();
LOGGER.info("Quartz scheduler engine started");
LOGGER.info("Quartz scheduler engine started, inlong manager host {}, port {}, secretId {}",
host, port, secretId);
} catch (SchedulerException e) {
throw new QuartzScheduleException("Failed to start quartz scheduler ", e);
}
Expand All @@ -79,7 +94,7 @@ public void start() {
* */
public boolean triggerFinalized(Trigger trigger) {
String jobName = trigger.getJobKey().getName();
LOGGER.info("Trigger finalized for job {}", jobName);
LOGGER.info("Quartz trigger finalized for job {}", jobName);
return scheduledJobSet.remove(jobName);
}

Expand All @@ -97,12 +112,12 @@ public boolean handleRegister(ScheduleInfo scheduleInfo, Class<? extends Job> cl
if (scheduledJobSet.contains(scheduleInfo.getInlongGroupId())) {
throw new QuartzScheduleException("Group " + scheduleInfo.getInlongGroupId() + " is already registered");
}
JobDetail jobDetail = genQuartzJobDetail(scheduleInfo, clz);
JobDetail jobDetail = genQuartzJobDetail(scheduleInfo, clz, host, port, secretId, secretKey);
Trigger trigger = genQuartzTrigger(jobDetail, scheduleInfo);
try {
scheduler.scheduleJob(jobDetail, trigger);
scheduledJobSet.add(scheduleInfo.getInlongGroupId());
LOGGER.info("Registered new schedule info for {}", scheduleInfo.getInlongGroupId());
LOGGER.info("Registered new quartz schedule info for {}", scheduleInfo.getInlongGroupId());
} catch (SchedulerException e) {
throw new QuartzScheduleException(e.getMessage());
}
Expand All @@ -123,7 +138,7 @@ public boolean handleUnregister(String groupId) {
}
}
scheduledJobSet.remove(groupId);
LOGGER.info("Un-registered schedule info for {}", groupId);
LOGGER.info("Un-registered quartz schedule info for {}", groupId);
return true;
}

Expand All @@ -140,7 +155,7 @@ public boolean handleUpdate(ScheduleInfo scheduleInfo) {
public boolean handleUpdate(ScheduleInfo scheduleInfo, Class<? extends Job> clz) {
handleUnregister(scheduleInfo.getInlongGroupId());
handleRegister(scheduleInfo, clz);
LOGGER.info("Updated schedule info for {}", scheduleInfo.getInlongGroupId());
LOGGER.info("Updated quartz schedule info for {}", scheduleInfo.getInlongGroupId());
return false;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,36 +23,42 @@
import org.quartz.SchedulerListener;
import org.quartz.Trigger;
import org.quartz.TriggerKey;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Default implementation for quartz scheduler listener.
* */
public class QuartzSchedulerListener implements SchedulerListener {

QuartzScheduleEngine quartzScheduleEngine;
private static final Logger LOGGER = LoggerFactory.getLogger(QuartzSchedulerListener.class);

private QuartzScheduleEngine quartzScheduleEngine;

public QuartzSchedulerListener(QuartzScheduleEngine quartzScheduleEngine) {
this.quartzScheduleEngine = quartzScheduleEngine;
}

@Override
public void jobScheduled(Trigger trigger) {

LOGGER.info("Quartz job with key {} scheduled", trigger.getKey().getName());
}

@Override
public void jobUnscheduled(TriggerKey triggerKey) {

LOGGER.info("Quartz job with key {} un-scheduled", triggerKey.getName());
}

@Override
public void triggerFinalized(Trigger trigger) {
quartzScheduleEngine.triggerFinalized(trigger);
LOGGER.info("Quartz trigger with key {} startTime {} ande endTime {} is finalized",
trigger.getKey().getName(), trigger.getStartTime(), trigger.getEndTime());
}

@Override
public void triggerPaused(TriggerKey triggerKey) {

LOGGER.info("Quartz trigger with key {} paused", triggerKey.getName());
}

@Override
Expand All @@ -62,7 +68,7 @@ public void triggersPaused(String triggerGroup) {

@Override
public void triggerResumed(TriggerKey triggerKey) {

LOGGER.info("Quartz trigger with key {} Resume", triggerKey.getName());
}

@Override
Expand All @@ -72,17 +78,17 @@ public void triggersResumed(String triggerGroup) {

@Override
public void jobAdded(JobDetail jobDetail) {

LOGGER.info("New quartz job added, name {}", jobDetail.getKey().getName());
}

@Override
public void jobDeleted(JobKey jobKey) {

LOGGER.info("Quartz job deleted, name {}", jobKey.getName());
}

@Override
public void jobPaused(JobKey jobKey) {

LOGGER.info("Quartz job paused, name {}", jobKey.getName());
}

@Override
Expand All @@ -92,7 +98,7 @@ public void jobsPaused(String jobGroup) {

@Override
public void jobResumed(JobKey jobKey) {

LOGGER.info("Quartz job resumed, name {}", jobKey.getName());
}

@Override
Expand All @@ -102,7 +108,7 @@ public void jobsResumed(String jobGroup) {

@Override
public void schedulerError(String msg, SchedulerException cause) {

LOGGER.warn("Quartz schedule exception, errorMsg {}", msg, cause);
}

@Override
Expand All @@ -112,26 +118,26 @@ public void schedulerInStandbyMode() {

@Override
public void schedulerStarted() {

LOGGER.warn("Quartz scheduler started");
}

@Override
public void schedulerStarting() {

LOGGER.warn("Quartz scheduler starting");
}

@Override
public void schedulerShutdown() {

LOGGER.warn("Quartz scheduler shutdown");
}

@Override
public void schedulerShuttingdown() {

LOGGER.warn("Quartz scheduler shutting down");
}

@Override
public void schedulingDataCleared() {

LOGGER.warn("Quartz scheduler data cleared");
}
}
Loading

0 comments on commit b2bc33b

Please sign in to comment.