Skip to content

Commit

Permalink
Merge pull request #27 from RedPillAnalytics/develop
Browse files Browse the repository at this point in the history
Corrected script-generation logic.
  • Loading branch information
stewartbryson authored Oct 9, 2018
2 parents 3677f26 + 0a5eaba commit 0a481f1
Show file tree
Hide file tree
Showing 7 changed files with 276 additions and 138 deletions.
87 changes: 87 additions & 0 deletions src/deployTest/groovy/ExecuteTest.groovy
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
import groovy.util.logging.Slf4j
import org.gradle.testkit.runner.GradleRunner
import spock.lang.Ignore
import spock.lang.Shared
import spock.lang.Specification
import spock.lang.Title

@Slf4j
@Title("Execute :tasks")
class ExecuteTest extends Specification {

@Shared
File projectDir, buildDir, buildFile, resourcesDir

@Shared
String taskName

@Shared
def result, taskList

def setupSpec() {

projectDir = new File("${System.getProperty("projectDir")}/execute-test")
buildDir = new File(projectDir, 'build')
buildFile = new File(projectDir, 'build.gradle')
taskList = ['pipelineExecute']

resourcesDir = new File('src/test/resources')

copySource()

buildFile.write("""
|plugins {
| id 'com.redpillanalytics.gradle-confluent'
|}
|
|archivesBaseName = 'test'
|group = 'com.redpillanalytics'
|version = '1.0.0'
""".stripMargin())
}

def setup() {

copySource()
}

def copySource() {

new AntBuilder().copy(todir: projectDir) {
fileset(dir: resourcesDir)
}
}

// helper method
def executeSingleTask(String taskName, List otherArgs, Boolean logOutput = true) {

otherArgs.add(0, taskName)

log.warn "runner arguments: ${otherArgs.toString()}"

// execute the Gradle test build
result = GradleRunner.create()
.withProjectDir(projectDir)
.withArguments(otherArgs)
.withPluginClasspath()
.build()

// log the results
if (logOutput) log.warn result.getOutput()

return result

}

@Ignore
def "Execute :pipelineExecute task with default values"() {

given:
taskName = 'pipelineExecute'
result = executeSingleTask(taskName, ['-Si','--rerun-tasks'])

expect:
result.task(":${taskName}").outcome.name() != 'FAILED'

}
}
28 changes: 14 additions & 14 deletions src/main/groovy/com/redpillanalytics/KsqlRest.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -24,21 +24,22 @@ class KsqlRest {
*/
def execKsql(String sql, Map properties) {

def prepared = (sql + ';').replace('\n', '').replace(';;', ';')

log.info "Executing statement: $prepared"

def client = new RESTClient(baseUrl)
def response = client.post(path: '/ksql') {
client.defaultContentTypeHeader = "application/json"
def response

type ContentType.JSON
response = client.post(path: '/ksql') {
//type ContentType.JSON
// accept statements with either a ';' or not. Do that by replacing ';;' with ';'
json ksql: "$sql;".replace(';;',';'), streamsProperties: properties

json ksql: prepared, streamsProperties: properties
}

log.debug "response: ${response.toString()}"

def data = new JsonSlurper().parse(response.data)

log.debug "data: ${data.dump()}"
return data
log.info "response: ${response.json}"
return response.json
}

/**
Expand Down Expand Up @@ -66,7 +67,7 @@ class KsqlRest {
*
* @return JSON representation of the KSQL response payload.
*/
def execKsql(String sql, Boolean earliest = true) {
def execKsql(String sql, Boolean earliest = false) {

def data = execKsql(sql, (earliest ? ["ksql.streams.auto.offset.reset": "earliest"] : [:]))
return data
Expand All @@ -81,7 +82,7 @@ class KsqlRest {
*
* @return JSON representation of the KSQL response payload.
*/
def execKsql(List sql, Boolean earliest = true) {
def execKsql(List sql, Boolean earliest = false) {

sql.each {
execKsql(it, earliest)
Expand All @@ -95,11 +96,10 @@ class KsqlRest {
*/
def getProperties() {

def data = execKsql('LIST PROPERTIES;')
def data = execKsql('LIST PROPERTIES')
def properties = data[0].properties
log.debug "properties: ${properties.dump()}"
return properties

}

/**
Expand Down
57 changes: 0 additions & 57 deletions src/main/groovy/com/redpillanalytics/KsqlUtils.groovy

This file was deleted.

18 changes: 14 additions & 4 deletions src/main/groovy/com/redpillanalytics/gradle/ConfluentPlugin.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,6 @@ class ConfluentPlugin implements Plugin<Project> {

def (extension, property) = key.toString().split(/\./)

//log.warn "Setting configuration property for extension: $extension, property: $property, value: $value"

if (extension == 'confluent' && project.confluent.hasProperty(property)) {

log.debug "Setting configuration property for extension: $extension, property: $property, value: $value"
Expand Down Expand Up @@ -148,7 +146,7 @@ class ConfluentPlugin implements Plugin<Project> {
description('Build a single KSQL deployment script with all the individual pipeline processes ordered.'
+ ' Primarily used for building a server start script.')

dirPath pipelineDir.canonicalPath
pipelinePath pipelineDir.canonicalPath
onlyIf { dir.exists() }

}
Expand All @@ -166,7 +164,9 @@ class ConfluentPlugin implements Plugin<Project> {
}

project.build.dependsOn tg.getTaskName('pipelineZip')
}

if (enablePipelines && tg.isDeployEnv) {
if (isUsableConfiguration('archives', pipelinePattern)) {

project.task(tg.getTaskName('pipelineExtract'), type: Copy) {
Expand All @@ -179,6 +179,14 @@ class ConfluentPlugin implements Plugin<Project> {

project.deploy.dependsOn tg.getTaskName('pipelineExtract')
}

//todo Add this when the Rest problem has been figured out
// project.task(tg.getTaskName('pipelineExecute'), type: ExecutePipelineTask) {
// group taskGroup
// description = "Execute all the KSQL pipelines--in hierarchical order--in the provided directory (recursively)."
// pipelinePath pipelineDir.canonicalPath
// onlyIf { dir.exists() }
// }
}

if (isUsableConfiguration('archives', functionPattern) && enableFunctions && tg.isDeployEnv) {
Expand All @@ -188,7 +196,9 @@ class ConfluentPlugin implements Plugin<Project> {
description = "Copy the KSQL custom function deployment dependency (or JAR file) into the deployment directory."
from getDependency('archives', functionPattern)
into { functionDeployDir }
if (project.extensions.confluent.functionArtifactName) rename {project.extensions.confluent.functionArtifactName}
if (project.extensions.confluent.functionArtifactName) rename {
project.extensions.confluent.functionArtifactName
}

}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package com.redpillanalytics.gradle.tasks

import com.redpillanalytics.KsqlRest
import groovy.util.logging.Slf4j
import org.gradle.api.tasks.Internal
import org.gradle.api.tasks.TaskAction

/**
* Use the KSQL RESTful API to execute all pipelines in a particular directory. Note: not functioning yet.
*/
@Slf4j
class ExecutePipelineTask extends PipelineTask {

/**
* The KsqlRest Object for interacting with the KSQL REST Server.
*/
@Internal
KsqlRest rest = new KsqlRest()

/**
* The main Gradle Task method.
*/
@TaskAction
def executePipelines() {


//rest.execKsql(getDropSql(pipelines))

rest.execKsql(pipelineSql)
}
}
Original file line number Diff line number Diff line change
@@ -1,65 +1,14 @@
package com.redpillanalytics.gradle.tasks

import com.redpillanalytics.KsqlUtils
import groovy.util.logging.Slf4j
import org.gradle.api.DefaultTask
import org.gradle.api.file.FileTree
import org.gradle.api.tasks.*
import org.gradle.api.tasks.options.Option

@Slf4j
class PipelineScriptTask extends DefaultTask {

/**
* The top-level directory containing the subdirectories--ordered alphanumerically--of pipeline processes.
*/
@Input
@Option(option = "dirpath",
description = "The top-level directory containing the subdirectories--ordered alphanumerically--of pipeline processes."
)
String dirPath

/**
* When defined, the DROPS script is not constructed in reverse order.
*/
@Input
@Option(option = 'reverse-drops-disabled',
description = 'When defined, the DROPS script is not constructed in reverse order.'
)
boolean notReverseDrops

/**
* Gets the hierarchical collection of pipeline files, referred to in Gradle as a FileTree.
*
* @return The FileTree of pipeline KSQL statements.
*/
@Internal
FileTree getPipelineTree() {
return project.fileTree(dir)
}

/**
* Gets the hierarchical collection of pipeline files, sorted using folder structure and alphanumeric logic.
*
* @return The List of pipeline SQL files.
*/
@Internal
List getPipelines() {

def sorted = pipelineTree.sort()
return sorted
}

/**
* Returns a File object representation of the {@filePath} parameter.
*
* @return The File object representation of the {@filePath} parameter.
*/
@InputDirectory
File getDir() {

return project.file(dirPath)
}
/**
* Generate CREATE and DROP scripts used for deployment to KSQL Servers. Note: the DROP script is not currently being used... slated for future enhancements.
*/
@Slf4j
class PipelineScriptTask extends PipelineTask {

/**
* Returns a File object representation of the {@project.extensions.confluent.pipelineBuildName} parameter.
Expand Down Expand Up @@ -100,8 +49,8 @@ class PipelineScriptTask extends DefaultTask {
@OutputFile
File dropScript() {

createScript.delete()
KsqlUtils.getDropSql(pipelines, notReverseDrops).each {
dropScript.delete()
getDropSql(notReverseDrops).each {
dropScript.append(it)
}

Expand All @@ -115,12 +64,9 @@ class PipelineScriptTask extends DefaultTask {
File createScript() {

createScript.delete()

KsqlUtils.getCreateSql(pipelines).each { sql ->

createScript.append(sql)
pipelineSql.each { sql ->
createScript.append(sql + ";\n")
}

return createScript
}

Expand Down
Loading

0 comments on commit 0a481f1

Please sign in to comment.