Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Insta integration #69

Merged
merged 2 commits into from
Jul 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 12 additions & 5 deletions .github/workflows/check.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
name: Check if build passes
name: Check if build passes and integration tests are successful

on:
push:
Expand All @@ -8,10 +8,17 @@ on:
jobs:
build:
runs-on: ubuntu-latest

steps:
- uses: actions/checkout@v4
with:
fetch-depth: 2
- name: Run build
run: "./gradlew build"
run: "./gradlew clean :app:shadowJar"
- name: Run integration tests
id: tests
uses: data-catering/insta-integration@v1
- name: Print results
run: |
echo "Records generated: ${{ steps.tests.outputs.num_records_generated }}"
echo "Successful validations: ${{ steps.tests.outputs.num_success_validations }}"
echo "Failed validations: ${{ steps.tests.outputs.num_failed_validations }}"
echo "Number of validations: ${{ steps.tests.outputs.num_validations }}"
echo "Validation success rate: ${{ steps.tests.outputs.validation_success_rate }}"
6 changes: 3 additions & 3 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@ FROM amazoncorretto:22-alpine
USER root
RUN addgroup -S app \
&& adduser -S app -G app --uid 1001 \
&& apk update \
&& apk add --no-cache libc6-compat \
&& apk update --no-cache \
&& apk update --no-cache openssl \
&& apk add --no-cache libc6-compat bash \
&& mkdir -p /opt/app /opt/DataCaterer/connection /opt/DataCaterer/plan /opt/DataCaterer/execution /opt/DataCaterer/report \
&& chown -R app:app /opt/app /opt/DataCaterer/connection /opt/DataCaterer/plan /opt/DataCaterer/execution /opt/DataCaterer/report \
&& apk add --no-cache bash
COPY --chown=app:app script app/src/main/resources app/build/libs /opt/app/

USER app
Expand Down
5 changes: 3 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ and deep dive into issues [from the generated report](https://data.catering/samp
3. [Linux download](https://nightly.link/data-catering/data-caterer/workflows/build/main/data-caterer-linux.zip)
4. Docker
```shell
docker run -d -i -p 9898:9898 -e DEPLOY_MODE=standalone --name datacaterer datacatering/data-caterer-basic:0.10.8
docker run -d -i -p 9898:9898 -e DEPLOY_MODE=standalone --name datacaterer datacatering/data-caterer-basic:0.11.7
```
[Open localhost:9898](http://localhost:9898).

Expand Down Expand Up @@ -206,4 +206,5 @@ jpackage "@misc/jpackage/jpackage.cfg" "@misc/jpackage/jpackage-linux.cfg"

```shell
--add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.lang.invoke=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/sun.nio.cs=ALL-UNNAMED --add-opens=java.base/sun.security.action=ALL-UNNAMED --add-opens=java.base/sun.util.calendar=ALL-UNNAMED --add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED
```
```
-Dlog4j.configurationFile=classpath:log4j2.properties
7 changes: 7 additions & 0 deletions app/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ dependencies {
basicImpl("org.apache.spark:spark-sql_$scalaVersion:$sparkVersion") {
exclude(group = "com.google.protobuf")
exclude(module = "netty-codec-http")
exclude(module = "netty-codec-http2")
exclude(module = "woodstox-core")
exclude(module = "nimbus-jose-jwt")
exclude(module = "commons-net")
Expand All @@ -71,12 +72,16 @@ dependencies {
exclude(module = "wildfly-openssl")
exclude(module = "wildfly-openssl-java")
exclude(module = "xnio-api")
exclude(module = "aircompressor")
exclude(module = "zookeeper")
exclude(module = "guava")
}
basicImpl(project(":api"))

// vulnerabilities in Spark
basicImpl("com.google.protobuf:protobuf-java:3.25.3")
basicImpl("io.netty:netty-codec-http:4.1.110.Final")
basicImpl("io.netty:netty-codec-http2:4.1.110.Final")
basicImpl("com.fasterxml.woodstox:woodstox-core:6.6.2")
basicImpl("com.nimbusds:nimbus-jose-jwt:9.39.3")
basicImpl("commons-net:commons-net:3.11.0")
Expand All @@ -96,6 +101,8 @@ dependencies {
basicImpl("org.wildfly.openssl:wildfly-openssl:1.1.3.Final")
basicImpl("org.wildfly.openssl:wildfly-openssl-java:1.1.3.Final")
basicImpl("org.jboss.xnio:xnio-api:3.8.15.Final")
basicImpl("io.airlift:aircompressor:0.27")
basicImpl("org.apache.zookeeper:zookeeper:3.9.2")
//basicImpl("software.amazon.ion:ion-java:1.5.1") //should use: basicImpl("com.amazon.ion:ion-java:1.11.8")

// connectors
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,26 +52,21 @@ class DataGeneratorProcessor(dataCatererConfiguration: DataCatererConfiguration)
}
val stepNames = summaryWithTask.map(t => s"task=${t._2.name}, num-steps=${t._2.steps.size}, steps=${t._2.steps.map(_.name).mkString(",")}").mkString("||")

if (summaryWithTask.isEmpty) {
LOGGER.warn("No tasks found or no tasks enabled. No data will be generated or validated")
PlanRunResults(List(), List())
} else {
val generationResult = if (flagsConfig.enableGenerateData) {
LOGGER.debug(s"Following tasks are enabled and will be executed: num-tasks=${summaryWithTask.size}, tasks=$stepNames")
batchDataProcessor.splitAndProcess(plan, summaryWithTask)
} else List()
val generationResult = if (flagsConfig.enableGenerateData && summaryWithTask.nonEmpty) {
LOGGER.debug(s"Following tasks are enabled and will be executed: num-tasks=${summaryWithTask.size}, tasks=$stepNames")
batchDataProcessor.splitAndProcess(plan, summaryWithTask)
} else List()

val validationResults = if (flagsConfig.enableValidation) {
new ValidationProcessor(connectionConfigsByName, optValidations, dataCatererConfiguration.validationConfig, foldersConfig)
.executeValidations
} else List()
val validationResults = if (flagsConfig.enableValidation) {
new ValidationProcessor(connectionConfigsByName, optValidations, dataCatererConfiguration.validationConfig, foldersConfig)
.executeValidations
} else List()

applyPostPlanProcessors(plan, sparkRecordListener, generationResult, validationResults)
val optReportPath = if (flagsConfig.enableSaveReports) {
plan.runId.map(id => s"${foldersConfig.generatedReportsFolderPath}/$id").orElse(Some(foldersConfig.generatedReportsFolderPath))
} else None
PlanRunResults(generationResult, validationResults, optReportPath)
}
applyPostPlanProcessors(plan, sparkRecordListener, generationResult, validationResults)
val optReportPath = if (flagsConfig.enableSaveReports) {
plan.runId.map(id => s"${foldersConfig.generatedReportsFolderPath}/$id").orElse(Some(foldersConfig.generatedReportsFolderPath))
} else None
PlanRunResults(generationResult, validationResults, optReportPath)
}

private def applyPostPlanProcessors(plan: Plan, sparkRecordListener: SparkRecordListener,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,75 +1,75 @@
package io.github.datacatering.datacaterer.core.util

import com.google.protobuf.DescriptorProtos
import com.google.protobuf.DescriptorProtos.FieldDescriptorProto
import com.google.protobuf.Descriptors.FieldDescriptor
import com.google.protobuf.Descriptors.FieldDescriptor.JavaType
import org.apache.spark.sql.types.{DataType, DataTypes, StructField, StructType}

import java.io.{BufferedInputStream, FileInputStream}
import scala.collection.JavaConverters.asScalaBufferConverter

object ProtobufUtil {

def toStructType(descriptorFile: String): Map[String, StructType] = {
val file = new BufferedInputStream(new FileInputStream(descriptorFile))
val fileDescriptorSet = DescriptorProtos.FileDescriptorSet.parseFrom(file)
fileDescriptorSet.getFileList.asScala
.flatMap(fd => {
fd.getMessageTypeList.asScala.toList.map(message => {
(message.getName, StructType(getSchemaFromFieldsProto(message.getFieldList.asScala.toList)))
})
// (fd.getName, StructType(getSchemaFromFields(fd.getMessageTypeList.asScala.toList)))
}).toMap
}

private def getSchemaFromFields(fields: List[FieldDescriptor]): Array[StructField] = {
fields.map(field => {
val dataType = getDataTypeForField(field)
StructField(field.getName, dataType, !field.isRequired)
}).toArray
}

private def getSchemaFromFieldsProto(fields: List[FieldDescriptorProto]): Array[StructField] = {
fields.map(field => {
val dataType = getDataTypeForField(field)
StructField(field.getName, dataType)
}).toArray
}

private def getDataTypeForField(fieldDescriptor: FieldDescriptor): DataType = {
fieldDescriptor.getJavaType match {
case JavaType.BOOLEAN => DataTypes.BooleanType
case JavaType.INT => DataTypes.IntegerType
case JavaType.LONG => DataTypes.LongType
case JavaType.DOUBLE => DataTypes.DoubleType
case JavaType.FLOAT => DataTypes.FloatType
case JavaType.STRING => DataTypes.StringType
case JavaType.ENUM => DataTypes.StringType
case JavaType.BYTE_STRING => DataTypes.BinaryType
case JavaType.MESSAGE => {
new StructType(getSchemaFromFields(fieldDescriptor.getMessageType.getFields.asScala.toList))
}
case _ => throw new RuntimeException(s"Unable to parse proto type, type=${fieldDescriptor.getType}")
}
}

private def getDataTypeForField(fieldDescriptor: FieldDescriptorProto): DataType = {
// val nonProtoField = FieldDescriptor.Type.valueOf(fieldDescriptor.getType)
FieldDescriptor.Type.valueOf(fieldDescriptor.getType).getJavaType match {
case JavaType.BOOLEAN => DataTypes.BooleanType
case JavaType.INT => DataTypes.IntegerType
case JavaType.LONG => DataTypes.LongType
case JavaType.DOUBLE => DataTypes.DoubleType
case JavaType.FLOAT => DataTypes.FloatType
case JavaType.STRING => DataTypes.StringType
case JavaType.ENUM => DataTypes.StringType
case JavaType.BYTE_STRING => DataTypes.BinaryType
case JavaType.MESSAGE => {
new StructType(getSchemaFromFields(fieldDescriptor.getDescriptorForType.getFields.asScala.toList))
}
case _ => throw new RuntimeException(s"Unable to parse proto type, type=${fieldDescriptor}")
}
}

}
//package io.github.datacatering.datacaterer.core.util
//
//import com.google.protobuf.DescriptorProtos
//import com.google.protobuf.DescriptorProtos.FieldDescriptorProto
//import com.google.protobuf.Descriptors.FieldDescriptor
//import com.google.protobuf.Descriptors.FieldDescriptor.JavaType
//import org.apache.spark.sql.types.{DataType, DataTypes, StructField, StructType}
//
//import java.io.{BufferedInputStream, FileInputStream}
//import scala.collection.JavaConverters.asScalaBufferConverter
//
//object ProtobufUtil {
//
// def toStructType(descriptorFile: String): Map[String, StructType] = {
// val file = new BufferedInputStream(new FileInputStream(descriptorFile))
// val fileDescriptorSet = DescriptorProtos.FileDescriptorSet.parseFrom(file)
// fileDescriptorSet.getFileList.asScala
// .flatMap(fd => {
// fd.getMessageTypeList.asScala.toList.map(message => {
// (message.getName, StructType(getSchemaFromFieldsProto(message.getFieldList.asScala.toList)))
// })
// // (fd.getName, StructType(getSchemaFromFields(fd.getMessageTypeList.asScala.toList)))
// }).toMap
// }
//
// private def getSchemaFromFields(fields: List[FieldDescriptor]): Array[StructField] = {
// fields.map(field => {
// val dataType = getDataTypeForField(field)
// StructField(field.getName, dataType, !field.isRequired)
// }).toArray
// }
//
// private def getSchemaFromFieldsProto(fields: List[FieldDescriptorProto]): Array[StructField] = {
// fields.map(field => {
// val dataType = getDataTypeForField(field)
// StructField(field.getName, dataType)
// }).toArray
// }
//
// private def getDataTypeForField(fieldDescriptor: FieldDescriptor): DataType = {
// fieldDescriptor.getJavaType match {
// case JavaType.BOOLEAN => DataTypes.BooleanType
// case JavaType.INT => DataTypes.IntegerType
// case JavaType.LONG => DataTypes.LongType
// case JavaType.DOUBLE => DataTypes.DoubleType
// case JavaType.FLOAT => DataTypes.FloatType
// case JavaType.STRING => DataTypes.StringType
// case JavaType.ENUM => DataTypes.StringType
// case JavaType.BYTE_STRING => DataTypes.BinaryType
// case JavaType.MESSAGE => {
// new StructType(getSchemaFromFields(fieldDescriptor.getMessageType.getFields.asScala.toList))
// }
// case _ => throw new RuntimeException(s"Unable to parse proto type, type=${fieldDescriptor.getType}")
// }
// }
//
// private def getDataTypeForField(fieldDescriptor: FieldDescriptorProto): DataType = {
// // val nonProtoField = FieldDescriptor.Type.valueOf(fieldDescriptor.getType)
// FieldDescriptor.Type.valueOf(fieldDescriptor.getType).getJavaType match {
// case JavaType.BOOLEAN => DataTypes.BooleanType
// case JavaType.INT => DataTypes.IntegerType
// case JavaType.LONG => DataTypes.LongType
// case JavaType.DOUBLE => DataTypes.DoubleType
// case JavaType.FLOAT => DataTypes.FloatType
// case JavaType.STRING => DataTypes.StringType
// case JavaType.ENUM => DataTypes.StringType
// case JavaType.BYTE_STRING => DataTypes.BinaryType
// case JavaType.MESSAGE => {
// new StructType(getSchemaFromFields(fieldDescriptor.getDescriptorForType.getFields.asScala.toList))
// }
// case _ => throw new RuntimeException(s"Unable to parse proto type, type=${fieldDescriptor}")
// }
// }
//
//}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
name: "account_balance_and_transactions_create_plan"
description: "Create balances and transactions in Postgres"
tasks:
- name: "jdbc_customer_balance_and_transactions"
dataSourceName: "postgres"

sinkOptions:
foreignKeys:
- - "postgres.balances.account_number"
- - "postgres.transactions.account_number"
- []
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
name: "jdbc_customer_balance_and_transactions"
steps:
- name: "balances"
type: "postgres"
count:
records: 1000
options:
dbtable: "account.balances"
schema:
fields:
- name: "account_number"
generator:
type: "regex"
options:
regex: "ACC1[0-9]{5,10}"
isUnique: true
- name: "create_time"
type: "timestamp"
- name: "account_status"
type: "string"
generator:
type: "oneOf"
options:
oneOf:
- "open"
- "closed"
- "suspended"
- name: "balance"
type: "double"
- name: "transactions"
type: "postgres"
count:
perColumn:
columnNames:
- "account_number"
count: 5
options:
dbtable: "account.transactions"
schema:
fields:
- name: "account_number"
- name: "create_time"
type: "timestamp"
- name: "transaction_id"
generator:
options:
regex: "txn-[0-9]{10}"
- name: "amount"
type: "double"
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ class PlanParserTest extends SparkSuite {
test("Can parse task in YAML file") {
val result = PlanParser.parseTasks("src/test/resources/sample/task")

assert(result.length == 12)
assert(result.length == 13)
}

test("Can parse plan in YAML file with foreign key") {
Expand Down
2 changes: 0 additions & 2 deletions docker-action.sh
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,4 @@ docker buildx create --use --name builder
docker buildx inspect --bootstrap builder

docker buildx build --platform $platforms \
--build-arg "APP_VERSION=$version" \
--build-arg "SPARK_VERSION=$sparkVersion" \
-t datacatering/data-caterer-basic:$version --push .
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
groupId=io.github.data-catering
version=0.11.7
version=0.11.8

scalaVersion=2.12
scalaSpecificVersion=2.12.19
Expand Down
26 changes: 26 additions & 0 deletions insta-integration.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
services:
- name: postgres
data: app/src/test/resources/sample/sql/postgres
run:
- command: java -jar app/build/libs/data-caterer.jar
env:
PLAN_FILE_PATH: app/src/test/resources/sample/plan/account-balance-transaction-plan.yaml
TASK_FOLDER_PATH: app/src/test/resources/sample/task
test:
validation:
postgres:
- options:
dbtable: account.balances
validations:
- expr: ISNOTNULL(account_id)
- aggType: count
aggExpr: count == 1000
- options:
dbtable: account.transactions
validations:
- expr: ISNOTNULL(account_id)
- aggType: count
aggExpr: count == 5000
- groupByCols: [account_number]
aggType: count
aggExpr: count == 5
Loading
Loading