Skip to content

Commit

Permalink
Make models free of XContent
Browse files Browse the repository at this point in the history
Signed-off-by: Vamsi Manohar <reddyvam@amazon.com>
  • Loading branch information
vmmusings committed Apr 17, 2024
1 parent 204c7da commit 012be35
Show file tree
Hide file tree
Showing 21 changed files with 1,076 additions and 323 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,9 @@

package org.opensearch.sql.spark.asyncquery.model;

import static org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken;
import static org.opensearch.sql.spark.execution.session.SessionModel.DATASOURCE_NAME;
import static org.opensearch.sql.spark.execution.statement.StatementModel.QUERY_ID;

import com.google.gson.Gson;
import java.io.IOException;
import java.util.Locale;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.SneakyThrows;
import org.opensearch.core.common.Strings;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.index.seqno.SequenceNumbers;
import org.opensearch.sql.spark.dispatcher.model.JobType;
import org.opensearch.sql.spark.execution.statestore.StateModel;
Expand Down Expand Up @@ -134,29 +124,6 @@ public String toString() {
return new Gson().toJson(this);
}

/**
* Converts JobMetadata to XContentBuilder.
*
* @return XContentBuilder {@link XContentBuilder}
* @throws Exception Exception.
*/
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder
.startObject()
.field(QUERY_ID, queryId.getId())
.field("type", TYPE_JOBMETA)
.field("jobId", jobId)
.field("applicationId", applicationId)
.field("resultIndex", resultIndex)
.field("sessionId", sessionId)
.field(DATASOURCE_NAME, datasourceName)
.field(JOB_TYPE, jobType.getText().toLowerCase(Locale.ROOT))
.field(INDEX_NAME, indexName)
.endObject();
return builder;
}

/** copy builder. update seqNo and primaryTerm */
public static AsyncQueryJobMetadata copy(
AsyncQueryJobMetadata copy, long seqNo, long primaryTerm) {
Expand All @@ -173,72 +140,6 @@ public static AsyncQueryJobMetadata copy(
primaryTerm);
}

/**
* Convert xcontent parser to JobMetadata.
*
* @param parser parser.
* @return JobMetadata {@link AsyncQueryJobMetadata}
* @throws IOException IOException.
*/
@SneakyThrows
public static AsyncQueryJobMetadata fromXContent(
XContentParser parser, long seqNo, long primaryTerm) {
AsyncQueryId queryId = null;
String jobId = null;
String applicationId = null;
String resultIndex = null;
String sessionId = null;
String datasourceName = null;
String jobTypeStr = null;
String indexName = null;
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.currentToken(), parser);
while (!XContentParser.Token.END_OBJECT.equals(parser.nextToken())) {
String fieldName = parser.currentName();
parser.nextToken();
switch (fieldName) {
case QUERY_ID:
queryId = new AsyncQueryId(parser.textOrNull());
break;
case "jobId":
jobId = parser.textOrNull();
break;
case "applicationId":
applicationId = parser.textOrNull();
break;
case "resultIndex":
resultIndex = parser.textOrNull();
break;
case "sessionId":
sessionId = parser.textOrNull();
break;
case DATASOURCE_NAME:
datasourceName = parser.textOrNull();
case JOB_TYPE:
jobTypeStr = parser.textOrNull();
case INDEX_NAME:
indexName = parser.textOrNull();
case "type":
break;
default:
throw new IllegalArgumentException("Unknown field: " + fieldName);
}
}
if (jobId == null || applicationId == null) {
throw new IllegalArgumentException("jobId and applicationId are required fields.");
}
return new AsyncQueryJobMetadata(
queryId,
applicationId,
jobId,
resultIndex,
sessionId,
datasourceName,
Strings.isNullOrEmpty(jobTypeStr) ? null : JobType.fromString(jobTypeStr),
indexName,
seqNo,
primaryTerm);
}

@Override
public String getId() {
return queryId.docId();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,19 @@

package org.opensearch.sql.spark.dispatcher.model;

import static org.opensearch.sql.spark.execution.session.SessionModel.DATASOURCE_NAME;

import com.google.common.collect.ImmutableList;
import java.io.IOException;
import lombok.Data;
import lombok.EqualsAndHashCode;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.index.seqno.SequenceNumbers;
import org.opensearch.sql.spark.execution.statestore.StateModel;

/** Plugin create Index DML result. */
@Data
@EqualsAndHashCode(callSuper = false)
public class IndexDMLResult extends StateModel {
private static final String QUERY_ID = "queryId";
private static final String QUERY_RUNTIME = "queryRunTime";
private static final String UPDATE_TIME = "updateTime";
private static final String DOC_ID_PREFIX = "index";
public static final String QUERY_ID = "queryId";
public static final String QUERY_RUNTIME = "queryRunTime";
public static final String UPDATE_TIME = "updateTime";
public static final String DOC_ID_PREFIX = "index";

private final String queryId;
private final String status;
Expand Down Expand Up @@ -55,20 +50,4 @@ public long getSeqNo() {
public long getPrimaryTerm() {
return SequenceNumbers.UNASSIGNED_PRIMARY_TERM;
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder
.startObject()
.field(QUERY_ID, queryId)
.field("status", status)
.field("error", error)
.field(DATASOURCE_NAME, datasourceName)
.field(QUERY_RUNTIME, queryRunTime)
.field(UPDATE_TIME, updateTime)
.field("result", ImmutableList.of())
.field("schema", ImmutableList.of())
.endObject();
return builder;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,8 @@
import static org.opensearch.sql.spark.execution.session.SessionState.NOT_STARTED;
import static org.opensearch.sql.spark.execution.session.SessionType.INTERACTIVE;

import java.io.IOException;
import lombok.Builder;
import lombok.Data;
import lombok.SneakyThrows;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.core.xcontent.XContentParserUtils;
import org.opensearch.index.seqno.SequenceNumbers;
import org.opensearch.sql.spark.execution.statestore.StateModel;

Expand Down Expand Up @@ -48,24 +43,6 @@ public class SessionModel extends StateModel {
private final long seqNo;
private final long primaryTerm;

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder
.startObject()
.field(VERSION, version)
.field(TYPE, SESSION_DOC_TYPE)
.field(SESSION_TYPE, sessionType.getSessionType())
.field(SESSION_ID, sessionId.getSessionId())
.field(SESSION_STATE, sessionState.getSessionState())
.field(DATASOURCE_NAME, datasourceName)
.field(APPLICATION_ID, applicationId)
.field(JOB_ID, jobId)
.field(LAST_UPDATE_TIME, lastUpdateTime)
.field(ERROR, error)
.endObject();
return builder;
}

public static SessionModel of(SessionModel copy, long seqNo, long primaryTerm) {
return builder()
.version(copy.version)
Expand Down Expand Up @@ -99,52 +76,6 @@ public static SessionModel copyWithState(
.build();
}

@SneakyThrows
public static SessionModel fromXContent(XContentParser parser, long seqNo, long primaryTerm) {
SessionModelBuilder builder = new SessionModelBuilder();
XContentParserUtils.ensureExpectedToken(
XContentParser.Token.START_OBJECT, parser.currentToken(), parser);
while (!XContentParser.Token.END_OBJECT.equals(parser.nextToken())) {
String fieldName = parser.currentName();
parser.nextToken();
switch (fieldName) {
case VERSION:
builder.version(parser.text());
break;
case SESSION_TYPE:
builder.sessionType(SessionType.fromString(parser.text()));
break;
case SESSION_ID:
builder.sessionId(new SessionId(parser.text()));
break;
case SESSION_STATE:
builder.sessionState(SessionState.fromString(parser.text()));
break;
case DATASOURCE_NAME:
builder.datasourceName(parser.text());
break;
case ERROR:
builder.error(parser.text());
break;
case APPLICATION_ID:
builder.applicationId(parser.text());
break;
case JOB_ID:
builder.jobId(parser.text());
break;
case LAST_UPDATE_TIME:
builder.lastUpdateTime(parser.longValue());
break;
case TYPE:
// do nothing.
break;
}
}
builder.seqNo(seqNo);
builder.primaryTerm(primaryTerm);
return builder.build();
}

public static SessionModel initInteractiveSession(
String applicationId, String jobId, SessionId sid, String datasourceName) {
return builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,10 @@

package org.opensearch.sql.spark.execution.statement;

import static org.opensearch.sql.spark.execution.session.SessionModel.APPLICATION_ID;
import static org.opensearch.sql.spark.execution.session.SessionModel.DATASOURCE_NAME;
import static org.opensearch.sql.spark.execution.session.SessionModel.JOB_ID;
import static org.opensearch.sql.spark.execution.statement.StatementState.WAITING;

import java.io.IOException;
import lombok.Builder;
import lombok.Data;
import lombok.SneakyThrows;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.core.xcontent.XContentParserUtils;
import org.opensearch.index.seqno.SequenceNumbers;
import org.opensearch.sql.spark.execution.session.SessionId;
import org.opensearch.sql.spark.execution.statestore.StateModel;
Expand Down Expand Up @@ -55,27 +47,6 @@ public class StatementModel extends StateModel {
private final long seqNo;
private final long primaryTerm;

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder
.startObject()
.field(VERSION, version)
.field(TYPE, STATEMENT_DOC_TYPE)
.field(STATEMENT_STATE, statementState.getState())
.field(STATEMENT_ID, statementId.getId())
.field(SESSION_ID, sessionId.getSessionId())
.field(APPLICATION_ID, applicationId)
.field(JOB_ID, jobId)
.field(LANG, langType.getText())
.field(DATASOURCE_NAME, datasourceName)
.field(QUERY, query)
.field(QUERY_ID, queryId)
.field(SUBMIT_TIME, submitTime)
.field(ERROR, error)
.endObject();
return builder;
}

public static StatementModel copy(StatementModel copy, long seqNo, long primaryTerm) {
return builder()
.version("1.0")
Expand Down Expand Up @@ -115,61 +86,6 @@ public static StatementModel copyWithState(
.build();
}

@SneakyThrows
public static StatementModel fromXContent(XContentParser parser, long seqNo, long primaryTerm) {
StatementModel.StatementModelBuilder builder = StatementModel.builder();
XContentParserUtils.ensureExpectedToken(
XContentParser.Token.START_OBJECT, parser.currentToken(), parser);
while (!XContentParser.Token.END_OBJECT.equals(parser.nextToken())) {
String fieldName = parser.currentName();
parser.nextToken();
switch (fieldName) {
case VERSION:
builder.version(parser.text());
break;
case TYPE:
// do nothing
break;
case STATEMENT_STATE:
builder.statementState(StatementState.fromString(parser.text()));
break;
case STATEMENT_ID:
builder.statementId(new StatementId(parser.text()));
break;
case SESSION_ID:
builder.sessionId(new SessionId(parser.text()));
break;
case APPLICATION_ID:
builder.applicationId(parser.text());
break;
case JOB_ID:
builder.jobId(parser.text());
break;
case LANG:
builder.langType(LangType.fromString(parser.text()));
break;
case DATASOURCE_NAME:
builder.datasourceName(parser.text());
break;
case QUERY:
builder.query(parser.text());
break;
case QUERY_ID:
builder.queryId(parser.text());
break;
case SUBMIT_TIME:
builder.submitTime(parser.longValue());
break;
case ERROR:
builder.error(parser.text());
break;
}
}
builder.seqNo(seqNo);
builder.primaryTerm(primaryTerm);
return builder.build();
}

public static StatementModel submitStatement(
SessionId sid,
String applicationId,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package org.opensearch.sql.spark.execution.statestore;

public interface CopyBuilder<T> {
T of(T copy, long seqNo, long primaryTerm);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package org.opensearch.sql.spark.execution.statestore;

import org.opensearch.core.xcontent.XContentParser;

public interface FromXContent<T extends StateModel> {
T fromXContent(XContentParser parser, long seqNo, long primaryTerm);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package org.opensearch.sql.spark.execution.statestore;

public interface StateCopyBuilder<T extends StateModel, S> {
T of(T copy, S state, long seqNo, long primaryTerm);
}
Loading

0 comments on commit 012be35

Please sign in to comment.