Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Programmatically create permanent session #748

Merged
merged 56 commits into from
Nov 22, 2023
Merged
Show file tree
Hide file tree
Changes from 55 commits
Commits
Show all changes
56 commits
Select commit Hold shift + click to select a range
082371a
first attempt
jmilkiewicz Nov 15, 2023
32ecef3
first test
jmilkiewicz Nov 15, 2023
d5252d5
second test
jmilkiewicz Nov 15, 2023
6f813e7
test 3
jmilkiewicz Nov 15, 2023
f158857
simplyfing given
jmilkiewicz Nov 15, 2023
6a74aba
remove unused crap
jmilkiewicz Nov 15, 2023
15135de
preferences over db
jmilkiewicz Nov 15, 2023
d4d5bf0
reverting to old version
jmilkiewicz Nov 15, 2023
152b882
rename
jmilkiewicz Nov 15, 2023
6e4b4e8
adding TODO
jmilkiewicz Nov 15, 2023
fd67da7
renames
jmilkiewicz Nov 15, 2023
c38d99c
adding TODO
jmilkiewicz Nov 15, 2023
6b2038c
reverting to base version
jmilkiewicz Nov 15, 2023
cb0f001
adding TODO
jmilkiewicz Nov 15, 2023
39d1b71
TODO rephrase
jmilkiewicz Nov 15, 2023
86b21dc
adding delegate implementation
jmilkiewicz Nov 15, 2023
9b2f5f1
adding TODO
jmilkiewicz Nov 15, 2023
5d6d77f
deleting only batch
jmilkiewicz Nov 15, 2023
09fc5ab
removing only session
jmilkiewicz Nov 15, 2023
bccd096
extracting commonalities
jmilkiewicz Nov 15, 2023
31bd46a
making protected instead of public so this method shall not be so eas…
jmilkiewicz Nov 15, 2023
183ceee
make it private
jmilkiewicz Nov 15, 2023
92d3b63
adding deleted column
jmilkiewicz Nov 16, 2023
cf1e9cf
soft delete
jmilkiewicz Nov 16, 2023
b850b38
test for findAllPermanentSessions
jmilkiewicz Nov 16, 2023
e0ae54c
adding TODO
jmilkiewicz Nov 16, 2023
a351df8
In starting of perm session there is a flow that first deletes perman…
jmilkiewicz Nov 16, 2023
2e5b308
removing PermanentSessionController
jmilkiewicz Nov 16, 2023
fc6700e
reverting a change
jmilkiewicz Nov 16, 2023
743fe31
to make tests pass :(
jmilkiewicz Nov 16, 2023
ca3778d
adding permanent param to submit params
jmilkiewicz Nov 17, 2023
23b0ae0
introducing SessionParameters as input to REST api creation
jmilkiewicz Nov 17, 2023
5e2bed2
adding horrible annotations
jmilkiewicz Nov 17, 2023
aad1c58
removing PermanentSession class
jmilkiewicz Nov 17, 2023
f208435
removing obsolete method and test
jmilkiewicz Nov 17, 2023
ee9e7a4
rename to findAllApplications
jmilkiewicz Nov 17, 2023
8dc0d10
renames
jmilkiewicz Nov 17, 2023
787ecc9
rename and make it protected
jmilkiewicz Nov 19, 2023
d17ec47
removing filtering
jmilkiewicz Nov 19, 2023
dd7fea6
findApplications accepts Enumset of ApplicationTypes now
jmilkiewicz Nov 19, 2023
6239478
removing one check
jmilkiewicz Nov 20, 2023
1f0cde9
disabling assertion which shall work but interfere with schedullers
jmilkiewicz Nov 20, 2023
12b6874
handling creating session with id that already exists
jmilkiewicz Nov 20, 2023
f95c5f5
adding REST part
jmilkiewicz Nov 20, 2023
cfc9a1f
remove dead code
jmilkiewicz Nov 20, 2023
22db837
solving comment
jmilkiewicz Nov 20, 2023
0d2a583
adding insertApplication
jmilkiewicz Nov 20, 2023
cf1d909
now catching UnableToExecuteStatementException
jmilkiewicz Nov 20, 2023
9f7e876
removing obsolete test
jmilkiewicz Nov 20, 2023
8175d51
we first delete perm session before creating it
jmilkiewicz Nov 21, 2023
7bee523
move down
jmilkiewicz Nov 21, 2023
f84f469
extracting enumset
jmilkiewicz Nov 21, 2023
fa46933
reverting
jmilkiewicz Nov 21, 2023
3c36f24
flaky test
jmilkiewicz Nov 21, 2023
b0a1327
reverting
jmilkiewicz Nov 21, 2023
cf125e1
remove TODO
jmilkiewicz Nov 21, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.exacaster.lighter.application;

import com.exacaster.lighter.storage.Entity;
import com.fasterxml.jackson.annotation.JsonIgnore;
import io.micronaut.core.annotation.Introspected;

import java.time.LocalDateTime;
Expand All @@ -22,9 +23,11 @@ public class Application implements Entity {
private final LocalDateTime createdAt;
private final LocalDateTime contactedAt;

private final boolean deleted;

public Application(String id, ApplicationType type, ApplicationState state, String appId, String appInfo,
SubmitParams submitParams,
LocalDateTime createdAt, LocalDateTime contactedAt) {
LocalDateTime createdAt, LocalDateTime contactedAt, boolean deleted) {
this.id = id;
this.type = type;
this.state = state;
Expand All @@ -34,6 +37,7 @@ public Application(String id, ApplicationType type, ApplicationState state, Stri
this.createdAt = createdAt;
this.contactedAt = contactedAt;
this.kind = "pyspark";
this.deleted = deleted;
}

@Override
Expand All @@ -56,7 +60,7 @@ public String getAppId() {
public String getAppInfo() {
return appInfo;
}

public List<String> getLog() {
return log;
}
Expand All @@ -65,6 +69,16 @@ public SubmitParams getSubmitParams() {
return submitParams;
}

@JsonIgnore
public boolean isDeleted() {
return deleted;
}

@JsonIgnore
public boolean isNotDeleted() {
pdambrauskas marked this conversation as resolved.
Show resolved Hide resolved
return !isDeleted();
}

@Override
public LocalDateTime getCreatedAt() {
return createdAt;
Expand All @@ -89,6 +103,7 @@ public String toString() {
.add("submitParams=" + submitParams)
.add("createdAt=" + createdAt)
.add("contactedAt=" + contactedAt)
.add("deleted=" + deleted)
.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ public class ApplicationBuilder {
private SubmitParams submitParams;
private LocalDateTime createdAt;
private LocalDateTime contactedAt;
private boolean deleted;

private ApplicationBuilder() {

Expand Down Expand Up @@ -75,7 +76,12 @@ public ApplicationBuilder setContactedAt(LocalDateTime contactedAt) {
return this;
}

public ApplicationBuilder setDeleted(boolean deleted) {
this.deleted = deleted;
return this;
}

public Application build() {
return new Application(id, type, state, appId, appInfo, submitParams, createdAt, contactedAt);
return new Application(id, type, state, appId, appInfo, submitParams, createdAt, contactedAt, deleted);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,6 @@

public enum ApplicationType {
BATCH,
SESSION
SESSION,
PERMANENT_SESSION,
}
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
package com.exacaster.lighter.application;

import static io.micronaut.core.convert.format.MapFormat.MapTransformation.FLAT;
import static io.micronaut.core.naming.conventions.StringConvention.RAW;
import static java.util.Optional.ofNullable;

import io.micronaut.core.annotation.Introspected;
import io.micronaut.core.annotation.Nullable;
import io.micronaut.core.convert.format.MapFormat;

import java.util.List;
import java.util.Map;
import java.util.StringJoiner;

import static io.micronaut.core.convert.format.MapFormat.MapTransformation.FLAT;
import static io.micronaut.core.naming.conventions.StringConvention.RAW;
import static java.util.Optional.ofNullable;

@Introspected
public class SubmitParams {

Expand Down Expand Up @@ -148,4 +149,5 @@ public String toString() {
.add("conf=" + conf)
.toString();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,17 @@
import com.exacaster.lighter.application.ApplicationBuilder;
import com.exacaster.lighter.application.ApplicationState;
import com.exacaster.lighter.application.ApplicationType;
import com.exacaster.lighter.backend.Backend;
import com.exacaster.lighter.application.SubmitParams;
import com.exacaster.lighter.backend.Backend;
import com.exacaster.lighter.storage.ApplicationStorage;
import com.exacaster.lighter.storage.SortOrder;
import jakarta.inject.Singleton;

import java.time.LocalDateTime;
import java.util.EnumSet;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import jakarta.inject.Singleton;

@Singleton
public class BatchService {
Expand All @@ -26,7 +28,7 @@ public BatchService(ApplicationStorage applicationStorage, Backend backend) {
}

public List<Application> fetch(Integer from, Integer size) {
return applicationStorage.findApplications(ApplicationType.BATCH, from, size);
return applicationStorage.findApplications(EnumSet.of(ApplicationType.BATCH), from, size);
}

public Application create(SubmitParams batch) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,26 +1,31 @@
package com.exacaster.lighter.application.sessions;

import static java.util.Optional.ofNullable;
import static net.javacrumbs.shedlock.core.LockAssert.assertLocked;
import static org.slf4j.LoggerFactory.getLogger;

import com.exacaster.lighter.application.Application;
import com.exacaster.lighter.application.ApplicationInfo;
import com.exacaster.lighter.application.ApplicationState;
import com.exacaster.lighter.application.ApplicationStatusHandler;
import com.exacaster.lighter.application.SubmitParams;
import com.exacaster.lighter.application.sessions.processors.StatementHandler;
import com.exacaster.lighter.backend.Backend;
import com.exacaster.lighter.concurrency.Waitable;
import com.exacaster.lighter.configuration.AppConfiguration;
import com.exacaster.lighter.storage.SortOrder;
import io.micronaut.scheduling.annotation.Scheduled;
import jakarta.inject.Singleton;
import net.javacrumbs.shedlock.micronaut.SchedulerLock;
import org.apache.hadoop.thirdparty.com.google.common.collect.Sets;
import org.slf4j.Logger;

import java.time.LocalDateTime;
import java.util.List;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import net.javacrumbs.shedlock.micronaut.SchedulerLock;
import org.slf4j.Logger;
import java.util.stream.Stream;

import static java.util.Optional.ofNullable;
import static net.javacrumbs.shedlock.core.LockAssert.assertLocked;
import static org.slf4j.LoggerFactory.getLogger;

@Singleton
public class SessionHandler {
Expand Down Expand Up @@ -51,28 +56,53 @@ public Waitable launch(Application application, Consumer<Throwable> errorHandler
}

@SchedulerLock(name = "keepPermanentSession", lockAtLeastFor = "1m")
@Scheduled(fixedRate = "1m")
@Scheduled(fixedRate = "1m", initialDelay = "2s")
public void keepPermanentSessions() throws InterruptedException {
assertLocked();
LOG.info("Start provisioning permanent sessions.");
for (var sessionConf : appConfiguration.getSessionConfiguration().getPermanentSessions()) {
var session = sessionService.fetchOne(sessionConf.getId());

final var allPermanentSessions = getPermanentSessionToCheck();

for (var perm : allPermanentSessions) {
var session = sessionService.fetchOne(perm.getSessionId());
if (session.map(Application::getState).filter(this::running).isEmpty() ||
session.flatMap(backend::getInfo).map(ApplicationInfo::getState).filter(this::running).isEmpty()) {
LOG.info("Permanent session {} needs to be (re)started.", sessionConf.getId());
var sessionToLaunch = sessionService.createSession(
sessionConf.getSubmitParams(),
sessionConf.getId()
LOG.info("Permanent session {} needs to be (re)started.", perm.getSessionId());
sessionService.deletePermanentSession(perm.sessionId);
var sessionToLaunch = sessionService.createPermanentSession(
perm.getSessionId(),
perm.getSubmitParams()
);

sessionService.deleteOne(sessionToLaunch);
launchSession(sessionToLaunch).waitCompletion();
LOG.info("Permanent session {} (re)started.", sessionConf.getId());
LOG.info("Permanent session {} (re)started.", perm.getSessionId());
}
}
LOG.info("End provisioning permanent sessions.");
}

private List<PermanentSessionParam> getPermanentSessionToCheck() {
final var dbPermanentSessions = sessionService.fetchAllPermanentSessions();

final var configurationPermanentSessions = appConfiguration.getSessionConfiguration().getPermanentSessions().stream().collect(
Collectors.toMap(permanentSession -> permanentSession.getId(), Function.identity()));

final var fromYamlOnly = Sets.difference(configurationPermanentSessions.keySet(), dbPermanentSessions.keySet()).stream().map(
id -> new PermanentSessionParam(id, configurationPermanentSessions.get(id).getSubmitParams())
);

final var intersection = Sets.intersection(configurationPermanentSessions.keySet(), dbPermanentSessions.keySet()).stream()
.filter(id -> dbPermanentSessions.get(id).isNotDeleted())
.map(id -> new PermanentSessionParam(id, dbPermanentSessions.get(id).getSubmitParams()));

final var fromStorageOnly = Sets.difference(dbPermanentSessions.keySet(), configurationPermanentSessions.keySet()).stream().map(
id -> new PermanentSessionParam(id, dbPermanentSessions.get(id).getSubmitParams())
);

return Stream.concat(fromStorageOnly, Stream.concat(fromYamlOnly, intersection)).collect(Collectors.toList());

}

@SchedulerLock(name = "processScheduledSessions")
@Scheduled(fixedRate = "${lighter.session.schedule-interval}")
public void processScheduledSessions() throws InterruptedException {
Expand All @@ -96,7 +126,7 @@ private Waitable launchSession(Application session) {
@Scheduled(fixedRate = "${lighter.session.track-running-interval}")
public void trackRunning() {
assertLocked();
var running = sessionService.fetchRunning();
var running = sessionService.fetchRunningSession();

var idleAndRunning = running.stream()
.collect(Collectors.groupingBy(statementStatusChecker::hasWaitingStatement));
Expand All @@ -112,9 +142,8 @@ public void handleTimeout() {
var sessionConfiguration = appConfiguration.getSessionConfiguration();
var timeoutInterval = sessionConfiguration.getTimeoutInterval();
if (timeoutInterval != null && !timeoutInterval.isZero()) {
sessionService.fetchRunning()
sessionService.fetchRunningSession()
.stream()
.filter(s -> isNotPermanent(sessionConfiguration, s))
.filter(s -> sessionConfiguration.shouldTimeoutActive() || !sessionService.isActive(s))
.filter(s -> sessionService.lastUsed(s.getId()).isBefore(LocalDateTime.now().minus(timeoutInterval)))
.peek(s -> LOG.info("Killing because of timeout {}, session: {}", timeoutInterval, s))
Expand All @@ -135,4 +164,22 @@ private <T> List<T> selfOrEmpty(List<T> list) {
private boolean running(ApplicationState state) {
return !state.isComplete();
}

private static class PermanentSessionParam {
private final String sessionId;
private final SubmitParams submitParams;

public PermanentSessionParam(String sessionId, SubmitParams submitParams) {
this.sessionId = sessionId;
this.submitParams = submitParams;
}

public String getSessionId() {
return sessionId;
}

public SubmitParams getSubmitParams() {
return submitParams;
}
}
}
Loading
Loading