Skip to content

Commit

Permalink
Merge pull request #33 from RedPillAnalytics/develop
Browse files Browse the repository at this point in the history
Corrected deployment issues. Added relative path option for --pipeline-dir.
  • Loading branch information
stewartbryson authored Oct 18, 2018
2 parents 5e60ab9 + 2a349fe commit e12f8d8
Show file tree
Hide file tree
Showing 8 changed files with 172 additions and 93 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@
This is a Gradle Plugin for building and deploying content to the Confluent Platform. Unfortunately, I've been struggling to find time to write a proper README, but it's coming. Until then, you can refer to the [API documentation](https://s3.amazonaws.com/documentation.redpillanalytics.com/gradle-confluent/latest/index.html).

You can get this plugin from the [Gradle Plugin Portal](https://plugins.gradle.org/plugin/com.redpillanalytics.gradle-confluent).

34 changes: 24 additions & 10 deletions src/ksqlPipelinesTest/groovy/ExecuteTest.groovy
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import groovy.util.logging.Slf4j
import org.gradle.testkit.runner.GradleRunner
import spock.lang.Ignore
import spock.lang.Shared
import spock.lang.Specification
import spock.lang.Stepwise
import spock.lang.Title

@Slf4j
@Stepwise
@Title("Execute :tasks")
class ExecuteTest extends Specification {

Expand Down Expand Up @@ -33,8 +34,8 @@ class ExecuteTest extends Specification {
|plugins {
| id 'com.redpillanalytics.gradle-confluent'
|}
confluent.pipelineEndpoint = 'http://localhost:8088'
""".stripMargin())
|confluent.pipelineEndpoint = 'http://localhost:8088'
|""".stripMargin())
}

def setup() {
Expand Down Expand Up @@ -92,31 +93,32 @@ class ExecuteTest extends Specification {

}

def "Execute :pipelineExecute task with --no-drop"() {
def "Execute :pipelineExecute task with --no-create"() {

given:
taskName = 'pipelineExecute'
result = executeSingleTask(taskName, ['--no-drop','-Si','--rerun-tasks'])
result = executeSingleTask(taskName, ['--no-create','-Si','--rerun-tasks'])

expect:
result.task(":${taskName}").outcome.name() != 'FAILED'
!result.output.toLowerCase().contains('drop table')
!result.output.toLowerCase().contains('create table')
!result.output.toLowerCase().contains('insert into')

}

def "Execute :pipelineExecute task with --no-terminate"() {
def "Execute :pipelineExecute task with --no-drop"() {

given:
taskName = 'pipelineExecute'
result = executeSingleTask(taskName, ['--no-terminate','-Si','--rerun-tasks'])
result = executeSingleTask(taskName, ['--no-drop','-Si','--rerun-tasks'])

expect:
result.task(":${taskName}").outcome.name() != 'FAILED'
!result.output.toLowerCase().contains('terminate')
!result.output.toLowerCase().contains('drop table')

}

def "Execute :pipelineExecute task with --no-create"() {
def "Execute :pipelineExecute task with --no-create again"() {

given:
taskName = 'pipelineExecute'
Expand All @@ -129,6 +131,18 @@ class ExecuteTest extends Specification {

}

def "Execute :pipelineExecute task with --no-terminate"() {

given:
taskName = 'pipelineExecute'
result = executeSingleTask(taskName, ['--no-terminate','-Si','--rerun-tasks'])

expect:
result.task(":${taskName}").outcome.name() != 'FAILED'
!result.output.toLowerCase().contains('terminate')

}

def "Execute :pipelineExecute task with --from-beginning"() {

given:
Expand Down
187 changes: 133 additions & 54 deletions src/main/groovy/com/redpillanalytics/KsqlRest.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import com.mashape.unirest.http.Unirest
import groovy.json.JsonOutput
import groovy.json.JsonSlurper
import groovy.util.logging.Slf4j
import org.gradle.api.GradleException

@Slf4j
/**
Expand All @@ -24,7 +25,7 @@ class KsqlRest {
*
* @param properties Any KSQL parameters to include with the KSQL execution.
*
* @return Map with meaningful elements from the JSON payload elevated as attributes, plus a 'body' key will the full JSON payload.
* @return Map with meaningful elements returned in the REST call, plus a 'body' key with the full JSON payload.
*/
def execKsql(String sql, Map properties) {

Expand All @@ -42,28 +43,17 @@ class KsqlRest {
def body = new JsonSlurper().parseText(response.body)

def result = [
status : response.status,
statusText : response.statusText,
message : body.message,
statementText : body.statementText,
commandStatus : body.commandStatus ? body.commandStatus.status[0] : '',
commandMessage: body.commandStatus ? body.commandStatus.message[0] : '',
body : body
status : response.status,
statusText: response.statusText,
body : body
]
log.info "status: ${result.status}, statusText: ${result.statusText}"

if (result.statusText != 'OK' && (!result.message.toLowerCase()?.contains('cannot drop') || result.commandMessage?.toLowerCase()?.contains('does not exist') )) {
def newResult = [
status: result.status,
statusText: result.statusText,
message: result.message,
commandStatus: result.commandStatus,
commandMessage: result.commandMessage
]

log.info "result: $newResult"
}
log.debug "body: $result.body"
//log.warn "Status: ${response.commandStatus}"

return result

}

/**
Expand All @@ -83,81 +73,170 @@ class KsqlRest {
}

/**
* Executes a List of KSQL DROP statements using the KSQL RESTful API. Manages issuing TERMINATE statements as part of the DROP, if desired.
* Executes a KSQL statement using the KSQL RESTful API.
*
* @param sql the List of SQL DROP statements to execute.
* @param sql The SQL statement to execute.
*
* @param properties Any KSQL parameters to include with the KSQL execution.
* @param earliest Boolean dictating that the statement should set 'auto.offset.reset' to 'earliest'.
*
* @param terminate Determines whether TERMINATE statements are issued, along with a retry of the DROP.
* @return Map with meaningful elements from the JSON payload elevated as attributes, plus a 'body' key will the full JSON payload.
*/
def execKsql(String sql, Boolean earliest = false) {

def data = execKsql(sql, (earliest ? ["ksql.streams.auto.offset.reset": "earliest"] : [:]))
return data
}

/**
* Executes a List of KSQL statements using the KSQL RESTful API.
*
* @param sql the List of SQL statements to execute.
*
* @param earliest Boolean dictating that the statement should set 'auto.offset.reset' to 'earliest'.
*
* @return Map with meaningful elements from the JSON payload elevated as attributes, plus a 'body' key will the full JSON payload.
*/
def dropKsql(List sql, Map properties, Boolean terminate = true) {
def execKsql(List sql, Boolean earliest = false) {

sql.each {
execKsql(it, earliest)
}
}

def result = execKsql(it, properties)
/**
* Executes a KSQL statement using the KSQL RESTful API. Optimized for issuing CREATE TABLE/STREAM statements.
*
* @param sql the SQL statement to execute.
*
* @param properties Any KSQL parameters to include with the KSQL execution.
*
* @return Map with meaningful elements returned in the REST call, plus a 'body' key with the full JSON payload.
*/
def createKsql(String sql, Map properties) {

log.debug "result: ${result}"
def response = execKsql(sql, properties)

if (result.status == 400 && result.message.toLowerCase().contains('cannot drop') && terminate) {
//log a message first
def result = [
status : response.status,
statusText : response.statusText,
error_code : response.body.error_code,
message : response.body.message,
statementText : response.body.statementText,
commandId : response.body.commandId,
commandStatus : response.body.commandStatus,
commandMessage: response.body.commandStatus,
body : response.body
]

// could also use the DESCRIBE command REST API results to get read and write queries to terminate
// but it's pretty easy to grab it from the DROP command REST API payload
def matches = result.message.findAll(~/(\[)([^\]]*)(\])/) { match, b1, list, b2 ->
list
}
// Two "string lists" are returned first
String read = matches[0]
String write = matches[1]
if (result.error_code.findResult { it }) {
throw new GradleException("error_code: ${result.error_code}: ${result.message}")
}

// Get a list of all queries currently executing
List queries = read.tokenize(',') + write.tokenize(',')
log.debug "queries: ${queries.toString()}"
log.debug "result: $result"

// now terminate with extreme prejudice
queries.each { queryId ->
execKsql("TERMINATE ${queryId}", properties)
}
return result
}

// now drop the table again
// this time using the non-explicit DROP method
// no Infinite Loops here
execKsql(it)
}
/**
* Executes a List of KSQL statements using the KSQL RESTful API. Optimized for issuing CREATE TABLE/STREAM statements.
*
* @param sql the List of SQL statements to execute.
*
* @param properties Any KSQL parameters to include with the KSQL execution.
*
* @return Map with meaningful elements from the JSON payload elevated as attributes, plus a 'body' key will the full JSON payload.
*/
def createKsql(List sql, Map properties) {

sql.each {
createKsql(it, properties)
}
}

/**
* Executes a KSQL statement using the KSQL RESTful API.
* Executes a KSQL statement using the KSQL RESTful API. Optimized for issuing CREATE TABLE/STREAM statements.
*
* @param sql The SQL statement to execute.
*
* @param earliest Boolean dictating that the statement should set 'auto.offset.reset' to 'earliest'.
*
* @return Map with meaningful elements from the JSON payload elevated as attributes, plus a 'body' key will the full JSON payload.
*/
def execKsql(String sql, Boolean earliest = false) {
def createKsql(String sql, Boolean earliest = false) {

def data = execKsql(sql, (earliest ? ["ksql.streams.auto.offset.reset": "earliest"] : [:]))
def data = createKsql(sql, (earliest ? ["ksql.streams.auto.offset.reset": "earliest"] : [:]))
return data
}

/**
* Executes a List of KSQL statements using the KSQL RESTful API.
* Executes a List of KSQL statements using the KSQL RESTful API. Optimized for issuing CREATE TABLE/STREAM statements.
*
* @param sql the List of SQL statements to execute.
*
* @param earliest Boolean dictating that the statement should set 'auto.offset.reset' to 'earliest'.
*
* @return Map with meaningful elements from the JSON payload elevated as attributes, plus a 'body' key will the full JSON payload.
*/
def execKsql(List sql, Boolean earliest = false) {
def createKsql(List sql, Boolean earliest = false) {

sql.each {
execKsql(it, earliest)
createKsql(it, earliest)
}
}

/**
* Executes a List of KSQL DROP statements using the KSQL RESTful API. Manages issuing TERMINATE statements as part of the DROP, if desired.
*
* @param sql the List of SQL DROP statements to execute.
*
* @param properties Any KSQL parameters to include with the KSQL execution.
*
* @param terminate Determines whether TERMINATE statements are issued, along with a retry of the DROP.
*
* @return Map with meaningful elements from the JSON payload elevated as attributes, plus a 'body' key will the full JSON payload.
*/
def dropKsql(List sql, Map properties, Boolean terminate = true) {

sql.each {

def result = execKsql(it, properties)

log.debug "result: ${result}"

if (result.status == 400 && result.body.message.toLowerCase().contains('cannot drop')) {

if (terminate) {
//log a message first
log.warn "Queries exist. Terminating..."

// could also use the DESCRIBE command REST API results to get read and write queries to terminate
// but it's pretty easy to grab it from the DROP command REST API payload
def matches = result.body.message.findAll(~/(\[)([^\]]*)(\])/) { match, b1, list, b2 ->
list
}
// Two "string lists" are returned first
String read = matches[0]
String write = matches[1]

// Get a list of all queries currently executing
List queries = read.tokenize(',') + write.tokenize(',')
log.debug "queries: ${queries.toString()}"

// now terminate with extreme prejudice
queries.each { queryId ->
execKsql("TERMINATE ${queryId}", properties)
}
log.warn "Executing DROP again..."

// now drop the table again
// this time using the non-explicit DROP method
// no Infinite Loops here
execKsql(it)

} else {
log.warn "Queries exist, but '--no-terminate' option provided."
}
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ class PipelineExecuteTask extends PipelineTask {
if (!noDrop) ksqlRest.dropKsql(dropSql, [:], !noTerminate)

// now create the pipelines
if (!noCreate) ksqlRest.execKsql(pipelineSql, fromBeginning)
if (!noCreate) ksqlRest.createKsql(pipelineSql, fromBeginning)

}
}
Loading

0 comments on commit e12f8d8

Please sign in to comment.