Skip to content

Commit

Permalink
Merge pull request #17 from hynix/master
Browse files Browse the repository at this point in the history
get yarn token from webhdfs api
  • Loading branch information
hynix authored Feb 1, 2022
2 parents 213744d + 85f3156 commit 36e8ae5
Show file tree
Hide file tree
Showing 5 changed files with 20 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@
import com.exacaster.lighter.application.ApplicationType;
import com.exacaster.lighter.application.sessions.processors.StatementHandler;
import com.exacaster.lighter.backend.Backend;
import com.exacaster.lighter.configuration.AppConfiguration;
import com.exacaster.lighter.configuration.AppConfiguration.SessionConfiguration;
import com.exacaster.lighter.spark.SubmitParams;
import com.exacaster.lighter.storage.ApplicationStorage;
import jakarta.inject.Singleton;
Expand All @@ -23,14 +21,11 @@ public class SessionService {
private final ApplicationStorage applicationStorage;
private final Backend backend;
private final StatementHandler statementHandler;
private final SessionConfiguration sessionConfiguration;

public SessionService(ApplicationStorage applicationStorage, Backend backend, StatementHandler statementHandler,
AppConfiguration appConfiguration) {
public SessionService(ApplicationStorage applicationStorage, Backend backend, StatementHandler statementHandler) {
this.applicationStorage = applicationStorage;
this.backend = backend;
this.statementHandler = statementHandler;
this.sessionConfiguration = appConfiguration.getSessionConfiguration();
}

public List<Application> fetch(Integer from, Integer size) {
Expand All @@ -43,12 +38,14 @@ public Application createSession(SubmitParams params) {

public Application createSession(SubmitParams params, String sessionId) {
var submitParams = params.withNameAndFile("session_" + UUID.randomUUID(), backend.getSessionJobResources());
var now = LocalDateTime.now();
var entity = ApplicationBuilder.builder()
.setId(sessionId)
.setType(ApplicationType.SESSION)
.setState(ApplicationState.NOT_STARTED)
.setSubmitParams(submitParams)
.setCreatedAt(LocalDateTime.now())
.setCreatedAt(now)
.setContactedAt(now)
.build();
return applicationStorage.saveApplication(entity);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
public class YarnBackend implements Backend {

private static final Logger LOG = getLogger(YarnBackend.class);
private static final String TOKEN_ENDPOINT = "/ws/v1/cluster/delegation-token";
private static final String TOKEN_ENDPOINT = "/webhdfs/v1/?op=GETDELEGATIONTOKEN&renewer=lighter";

private final YarnProperties yarnProperties;
private final YarnClient client;
Expand All @@ -40,8 +40,9 @@ public YarnBackend(YarnProperties yarnProperties, YarnClient client, AppConfigur
this.yarnProperties = yarnProperties;
this.client = client;
this.conf = conf;
this.kerberosRestTemplate = Optional.ofNullable(yarnProperties.getKerberosKeytab())
.map(it -> new KerberosRestTemplate(it, yarnProperties.getKerberosPrincipal()));
this.kerberosRestTemplate = Optional.ofNullable(yarnProperties.getTokenUrl())
.map(it -> new KerberosRestTemplate(yarnProperties.getKerberosKeytab(),
yarnProperties.getKerberosPrincipal()));
}

@Override
Expand Down Expand Up @@ -94,10 +95,8 @@ public void kill(Application application) {
}

private Optional<String> getToken() {
var url = yarnProperties.getUrl() + TOKEN_ENDPOINT;
var body = Map.of("renewer", "lighter");
return kerberosRestTemplate
.map(it -> it.postForObject(url, body, Token.class))
.map(it -> it.getForObject(yarnProperties.getTokenUrl() + TOKEN_ENDPOINT, Token.class))
.map(Token::getToken);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,14 @@ public class YarnProperties {

private final String kerberosPrincipal;
private final String kerberosKeytab;
private final String url;
private final String tokenUrl;

@ConfigurationInject
public YarnProperties(@Nullable String kerberosPrincipal, @Nullable String kerberosKeytab, String url) {
public YarnProperties(@Nullable String kerberosPrincipal, @Nullable String kerberosKeytab,
@Nullable String tokenUrl) {
this.kerberosPrincipal = kerberosPrincipal;
this.kerberosKeytab = kerberosKeytab;
this.url = url;
this.tokenUrl = tokenUrl;
}

public String getKerberosPrincipal() {
Expand All @@ -29,16 +30,16 @@ public String getKerberosKeytab() {
return kerberosKeytab;
}

public String getUrl() {
return url;
public String getTokenUrl() {
return tokenUrl;
}

@Override
public String toString() {
return new StringJoiner(", ", YarnProperties.class.getSimpleName() + "[", "]")
.add("kerberosPrincipal='" + kerberosPrincipal + "'")
.add("kerberosKeytab='" + kerberosKeytab + "'")
.add("url='" + url + "'")
.add("tokenUrl='" + tokenUrl + "'")
.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,18 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonRootName;
import io.micronaut.core.annotation.Introspected;
import io.micronaut.core.annotation.Nullable;

@Introspected
@JsonIgnoreProperties(ignoreUnknown = true)
@JsonRootName("Token")
public class Token {
private final String token;

@JsonCreator
public Token(@Nullable @JsonProperty("token") String token) {
public Token(@Nullable @JsonProperty("urlString") String token) {
this.token = token;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,20 @@ package com.exacaster.lighter.application.sessions
import com.exacaster.lighter.application.ApplicationState
import com.exacaster.lighter.application.sessions.processors.StatementHandler
import com.exacaster.lighter.backend.Backend
import com.exacaster.lighter.configuration.AppConfiguration
import com.exacaster.lighter.storage.ApplicationStorage
import com.exacaster.lighter.test.InMemoryStorage
import spock.lang.Specification
import spock.lang.Subject

import static com.exacaster.lighter.test.Factories.appConfiguration
import static com.exacaster.lighter.test.Factories.submitParams

class SessionServiceTest extends Specification {
ApplicationStorage storage = new InMemoryStorage()
Backend backend = Mock()
StatementHandler statementHandler = Mock()
AppConfiguration conf = appConfiguration()

@Subject
SessionService service = new SessionService(storage, backend, statementHandler, conf)
SessionService service = new SessionService(storage, backend, statementHandler)

def "manage sessions"() {
given:
Expand Down

0 comments on commit 36e8ae5

Please sign in to comment.