Skip to content

Commit

Permalink
Merge pull request #69 from data-catering/insta-integration
Browse files Browse the repository at this point in the history
Insta integration
  • Loading branch information
pflooky authored Jul 8, 2024
2 parents 53ad9da + c4acccc commit 7bfcde2
Show file tree
Hide file tree
Showing 14 changed files with 205 additions and 153 deletions.
17 changes: 12 additions & 5 deletions .github/workflows/check.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
name: Check if build passes
name: Check if build passes and integration tests are successful

on:
push:
Expand All @@ -8,10 +8,17 @@ on:
jobs:
build:
runs-on: ubuntu-latest

steps:
- uses: actions/checkout@v4
with:
fetch-depth: 2
- name: Run build
run: "./gradlew build"
run: "./gradlew clean :app:shadowJar"
- name: Run integration tests
id: tests
uses: data-catering/insta-integration@v1
- name: Print results
run: |
echo "Records generated: ${{ steps.tests.outputs.num_records_generated }}"
echo "Successful validations: ${{ steps.tests.outputs.num_success_validations }}"
echo "Failed validations: ${{ steps.tests.outputs.num_failed_validations }}"
echo "Number of validations: ${{ steps.tests.outputs.num_validations }}"
echo "Validation success rate: ${{ steps.tests.outputs.validation_success_rate }}"
6 changes: 3 additions & 3 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@ FROM amazoncorretto:22-alpine
USER root
RUN addgroup -S app \
&& adduser -S app -G app --uid 1001 \
&& apk update \
&& apk add --no-cache libc6-compat \
&& apk update --no-cache \
&& apk update --no-cache openssl \
&& apk add --no-cache libc6-compat bash \
&& mkdir -p /opt/app /opt/DataCaterer/connection /opt/DataCaterer/plan /opt/DataCaterer/execution /opt/DataCaterer/report \
&& chown -R app:app /opt/app /opt/DataCaterer/connection /opt/DataCaterer/plan /opt/DataCaterer/execution /opt/DataCaterer/report \
&& apk add --no-cache bash
COPY --chown=app:app script app/src/main/resources app/build/libs /opt/app/

USER app
Expand Down
5 changes: 3 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ and deep dive into issues [from the generated report](https://data.catering/samp
3. [Linux download](https://nightly.link/data-catering/data-caterer/workflows/build/main/data-caterer-linux.zip)
4. Docker
```shell
docker run -d -i -p 9898:9898 -e DEPLOY_MODE=standalone --name datacaterer datacatering/data-caterer-basic:0.10.8
docker run -d -i -p 9898:9898 -e DEPLOY_MODE=standalone --name datacaterer datacatering/data-caterer-basic:0.11.7
```
[Open localhost:9898](http://localhost:9898).

Expand Down Expand Up @@ -206,4 +206,5 @@ jpackage "@misc/jpackage/jpackage.cfg" "@misc/jpackage/jpackage-linux.cfg"

```shell
--add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.lang.invoke=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/sun.nio.cs=ALL-UNNAMED --add-opens=java.base/sun.security.action=ALL-UNNAMED --add-opens=java.base/sun.util.calendar=ALL-UNNAMED --add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED
```
```
-Dlog4j.configurationFile=classpath:log4j2.properties
7 changes: 7 additions & 0 deletions app/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ dependencies {
basicImpl("org.apache.spark:spark-sql_$scalaVersion:$sparkVersion") {
exclude(group = "com.google.protobuf")
exclude(module = "netty-codec-http")
exclude(module = "netty-codec-http2")
exclude(module = "woodstox-core")
exclude(module = "nimbus-jose-jwt")
exclude(module = "commons-net")
Expand All @@ -71,12 +72,16 @@ dependencies {
exclude(module = "wildfly-openssl")
exclude(module = "wildfly-openssl-java")
exclude(module = "xnio-api")
exclude(module = "aircompressor")
exclude(module = "zookeeper")
exclude(module = "guava")
}
basicImpl(project(":api"))

// vulnerabilities in Spark
basicImpl("com.google.protobuf:protobuf-java:3.25.3")
basicImpl("io.netty:netty-codec-http:4.1.110.Final")
basicImpl("io.netty:netty-codec-http2:4.1.110.Final")
basicImpl("com.fasterxml.woodstox:woodstox-core:6.6.2")
basicImpl("com.nimbusds:nimbus-jose-jwt:9.39.3")
basicImpl("commons-net:commons-net:3.11.0")
Expand All @@ -96,6 +101,8 @@ dependencies {
basicImpl("org.wildfly.openssl:wildfly-openssl:1.1.3.Final")
basicImpl("org.wildfly.openssl:wildfly-openssl-java:1.1.3.Final")
basicImpl("org.jboss.xnio:xnio-api:3.8.15.Final")
basicImpl("io.airlift:aircompressor:0.27")
basicImpl("org.apache.zookeeper:zookeeper:3.9.2")
//basicImpl("software.amazon.ion:ion-java:1.5.1") //should use: basicImpl("com.amazon.ion:ion-java:1.11.8")

// connectors
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,26 +52,21 @@ class DataGeneratorProcessor(dataCatererConfiguration: DataCatererConfiguration)
}
val stepNames = summaryWithTask.map(t => s"task=${t._2.name}, num-steps=${t._2.steps.size}, steps=${t._2.steps.map(_.name).mkString(",")}").mkString("||")

if (summaryWithTask.isEmpty) {
LOGGER.warn("No tasks found or no tasks enabled. No data will be generated or validated")
PlanRunResults(List(), List())
} else {
val generationResult = if (flagsConfig.enableGenerateData) {
LOGGER.debug(s"Following tasks are enabled and will be executed: num-tasks=${summaryWithTask.size}, tasks=$stepNames")
batchDataProcessor.splitAndProcess(plan, summaryWithTask)
} else List()
val generationResult = if (flagsConfig.enableGenerateData && summaryWithTask.nonEmpty) {
LOGGER.debug(s"Following tasks are enabled and will be executed: num-tasks=${summaryWithTask.size}, tasks=$stepNames")
batchDataProcessor.splitAndProcess(plan, summaryWithTask)
} else List()

val validationResults = if (flagsConfig.enableValidation) {
new ValidationProcessor(connectionConfigsByName, optValidations, dataCatererConfiguration.validationConfig, foldersConfig)
.executeValidations
} else List()
val validationResults = if (flagsConfig.enableValidation) {
new ValidationProcessor(connectionConfigsByName, optValidations, dataCatererConfiguration.validationConfig, foldersConfig)
.executeValidations
} else List()

applyPostPlanProcessors(plan, sparkRecordListener, generationResult, validationResults)
val optReportPath = if (flagsConfig.enableSaveReports) {
plan.runId.map(id => s"${foldersConfig.generatedReportsFolderPath}/$id").orElse(Some(foldersConfig.generatedReportsFolderPath))
} else None
PlanRunResults(generationResult, validationResults, optReportPath)
}
applyPostPlanProcessors(plan, sparkRecordListener, generationResult, validationResults)
val optReportPath = if (flagsConfig.enableSaveReports) {
plan.runId.map(id => s"${foldersConfig.generatedReportsFolderPath}/$id").orElse(Some(foldersConfig.generatedReportsFolderPath))
} else None
PlanRunResults(generationResult, validationResults, optReportPath)
}

private def applyPostPlanProcessors(plan: Plan, sparkRecordListener: SparkRecordListener,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,75 +1,75 @@
package io.github.datacatering.datacaterer.core.util

import com.google.protobuf.DescriptorProtos
import com.google.protobuf.DescriptorProtos.FieldDescriptorProto
import com.google.protobuf.Descriptors.FieldDescriptor
import com.google.protobuf.Descriptors.FieldDescriptor.JavaType
import org.apache.spark.sql.types.{DataType, DataTypes, StructField, StructType}

import java.io.{BufferedInputStream, FileInputStream}
import scala.collection.JavaConverters.asScalaBufferConverter

object ProtobufUtil {

def toStructType(descriptorFile: String): Map[String, StructType] = {
val file = new BufferedInputStream(new FileInputStream(descriptorFile))
val fileDescriptorSet = DescriptorProtos.FileDescriptorSet.parseFrom(file)
fileDescriptorSet.getFileList.asScala
.flatMap(fd => {
fd.getMessageTypeList.asScala.toList.map(message => {
(message.getName, StructType(getSchemaFromFieldsProto(message.getFieldList.asScala.toList)))
})
// (fd.getName, StructType(getSchemaFromFields(fd.getMessageTypeList.asScala.toList)))
}).toMap
}

private def getSchemaFromFields(fields: List[FieldDescriptor]): Array[StructField] = {
fields.map(field => {
val dataType = getDataTypeForField(field)
StructField(field.getName, dataType, !field.isRequired)
}).toArray
}

private def getSchemaFromFieldsProto(fields: List[FieldDescriptorProto]): Array[StructField] = {
fields.map(field => {
val dataType = getDataTypeForField(field)
StructField(field.getName, dataType)
}).toArray
}

private def getDataTypeForField(fieldDescriptor: FieldDescriptor): DataType = {
fieldDescriptor.getJavaType match {
case JavaType.BOOLEAN => DataTypes.BooleanType
case JavaType.INT => DataTypes.IntegerType
case JavaType.LONG => DataTypes.LongType
case JavaType.DOUBLE => DataTypes.DoubleType
case JavaType.FLOAT => DataTypes.FloatType
case JavaType.STRING => DataTypes.StringType
case JavaType.ENUM => DataTypes.StringType
case JavaType.BYTE_STRING => DataTypes.BinaryType
case JavaType.MESSAGE => {
new StructType(getSchemaFromFields(fieldDescriptor.getMessageType.getFields.asScala.toList))
}
case _ => throw new RuntimeException(s"Unable to parse proto type, type=${fieldDescriptor.getType}")
}
}

private def getDataTypeForField(fieldDescriptor: FieldDescriptorProto): DataType = {
// val nonProtoField = FieldDescriptor.Type.valueOf(fieldDescriptor.getType)
FieldDescriptor.Type.valueOf(fieldDescriptor.getType).getJavaType match {
case JavaType.BOOLEAN => DataTypes.BooleanType
case JavaType.INT => DataTypes.IntegerType
case JavaType.LONG => DataTypes.LongType
case JavaType.DOUBLE => DataTypes.DoubleType
case JavaType.FLOAT => DataTypes.FloatType
case JavaType.STRING => DataTypes.StringType
case JavaType.ENUM => DataTypes.StringType
case JavaType.BYTE_STRING => DataTypes.BinaryType
case JavaType.MESSAGE => {
new StructType(getSchemaFromFields(fieldDescriptor.getDescriptorForType.getFields.asScala.toList))
}
case _ => throw new RuntimeException(s"Unable to parse proto type, type=${fieldDescriptor}")
}
}

}
//package io.github.datacatering.datacaterer.core.util
//
//import com.google.protobuf.DescriptorProtos
//import com.google.protobuf.DescriptorProtos.FieldDescriptorProto
//import com.google.protobuf.Descriptors.FieldDescriptor
//import com.google.protobuf.Descriptors.FieldDescriptor.JavaType
//import org.apache.spark.sql.types.{DataType, DataTypes, StructField, StructType}
//
//import java.io.{BufferedInputStream, FileInputStream}
//import scala.collection.JavaConverters.asScalaBufferConverter
//
//object ProtobufUtil {
//
// def toStructType(descriptorFile: String): Map[String, StructType] = {
// val file = new BufferedInputStream(new FileInputStream(descriptorFile))
// val fileDescriptorSet = DescriptorProtos.FileDescriptorSet.parseFrom(file)
// fileDescriptorSet.getFileList.asScala
// .flatMap(fd => {
// fd.getMessageTypeList.asScala.toList.map(message => {
// (message.getName, StructType(getSchemaFromFieldsProto(message.getFieldList.asScala.toList)))
// })
// // (fd.getName, StructType(getSchemaFromFields(fd.getMessageTypeList.asScala.toList)))
// }).toMap
// }
//
// private def getSchemaFromFields(fields: List[FieldDescriptor]): Array[StructField] = {
// fields.map(field => {
// val dataType = getDataTypeForField(field)
// StructField(field.getName, dataType, !field.isRequired)
// }).toArray
// }
//
// private def getSchemaFromFieldsProto(fields: List[FieldDescriptorProto]): Array[StructField] = {
// fields.map(field => {
// val dataType = getDataTypeForField(field)
// StructField(field.getName, dataType)
// }).toArray
// }
//
// private def getDataTypeForField(fieldDescriptor: FieldDescriptor): DataType = {
// fieldDescriptor.getJavaType match {
// case JavaType.BOOLEAN => DataTypes.BooleanType
// case JavaType.INT => DataTypes.IntegerType
// case JavaType.LONG => DataTypes.LongType
// case JavaType.DOUBLE => DataTypes.DoubleType
// case JavaType.FLOAT => DataTypes.FloatType
// case JavaType.STRING => DataTypes.StringType
// case JavaType.ENUM => DataTypes.StringType
// case JavaType.BYTE_STRING => DataTypes.BinaryType
// case JavaType.MESSAGE => {
// new StructType(getSchemaFromFields(fieldDescriptor.getMessageType.getFields.asScala.toList))
// }
// case _ => throw new RuntimeException(s"Unable to parse proto type, type=${fieldDescriptor.getType}")
// }
// }
//
// private def getDataTypeForField(fieldDescriptor: FieldDescriptorProto): DataType = {
// // val nonProtoField = FieldDescriptor.Type.valueOf(fieldDescriptor.getType)
// FieldDescriptor.Type.valueOf(fieldDescriptor.getType).getJavaType match {
// case JavaType.BOOLEAN => DataTypes.BooleanType
// case JavaType.INT => DataTypes.IntegerType
// case JavaType.LONG => DataTypes.LongType
// case JavaType.DOUBLE => DataTypes.DoubleType
// case JavaType.FLOAT => DataTypes.FloatType
// case JavaType.STRING => DataTypes.StringType
// case JavaType.ENUM => DataTypes.StringType
// case JavaType.BYTE_STRING => DataTypes.BinaryType
// case JavaType.MESSAGE => {
// new StructType(getSchemaFromFields(fieldDescriptor.getDescriptorForType.getFields.asScala.toList))
// }
// case _ => throw new RuntimeException(s"Unable to parse proto type, type=${fieldDescriptor}")
// }
// }
//
//}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
name: "account_balance_and_transactions_create_plan"
description: "Create balances and transactions in Postgres"
tasks:
- name: "jdbc_customer_balance_and_transactions"
dataSourceName: "postgres"

sinkOptions:
foreignKeys:
- - "postgres.balances.account_number"
- - "postgres.transactions.account_number"
- []
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
name: "jdbc_customer_balance_and_transactions"
steps:
- name: "balances"
type: "postgres"
count:
records: 1000
options:
dbtable: "account.balances"
schema:
fields:
- name: "account_number"
generator:
type: "regex"
options:
regex: "ACC1[0-9]{5,10}"
isUnique: true
- name: "create_time"
type: "timestamp"
- name: "account_status"
type: "string"
generator:
type: "oneOf"
options:
oneOf:
- "open"
- "closed"
- "suspended"
- name: "balance"
type: "double"
- name: "transactions"
type: "postgres"
count:
perColumn:
columnNames:
- "account_number"
count: 5
options:
dbtable: "account.transactions"
schema:
fields:
- name: "account_number"
- name: "create_time"
type: "timestamp"
- name: "transaction_id"
generator:
options:
regex: "txn-[0-9]{10}"
- name: "amount"
type: "double"
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ class PlanParserTest extends SparkSuite {
test("Can parse task in YAML file") {
val result = PlanParser.parseTasks("src/test/resources/sample/task")

assert(result.length == 12)
assert(result.length == 13)
}

test("Can parse plan in YAML file with foreign key") {
Expand Down
2 changes: 0 additions & 2 deletions docker-action.sh
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,4 @@ docker buildx create --use --name builder
docker buildx inspect --bootstrap builder

docker buildx build --platform $platforms \
--build-arg "APP_VERSION=$version" \
--build-arg "SPARK_VERSION=$sparkVersion" \
-t datacatering/data-caterer-basic:$version --push .
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
groupId=io.github.data-catering
version=0.11.7
version=0.11.8

scalaVersion=2.12
scalaSpecificVersion=2.12.19
Expand Down
26 changes: 26 additions & 0 deletions insta-integration.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
services:
- name: postgres
data: app/src/test/resources/sample/sql/postgres
run:
- command: java -jar app/build/libs/data-caterer.jar
env:
PLAN_FILE_PATH: app/src/test/resources/sample/plan/account-balance-transaction-plan.yaml
TASK_FOLDER_PATH: app/src/test/resources/sample/task
test:
validation:
postgres:
- options:
dbtable: account.balances
validations:
- expr: ISNOTNULL(account_id)
- aggType: count
aggExpr: count == 1000
- options:
dbtable: account.transactions
validations:
- expr: ISNOTNULL(account_id)
- aggType: count
aggExpr: count == 5000
- groupByCols: [account_number]
aggType: count
aggExpr: count == 5
Loading

0 comments on commit 7bfcde2

Please sign in to comment.