Skip to content

Commit

Permalink
Add support-2.x
Browse files Browse the repository at this point in the history
  • Loading branch information
alisakotliarova authored Sep 15, 2021
1 parent 617dfb3 commit 2dcd041
Show file tree
Hide file tree
Showing 11 changed files with 574 additions and 3 deletions.
10 changes: 10 additions & 0 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# Guiding Principles for Contribution
First of all, thank you for taking the time to contribute! The HRI Team values your contribution.

In general, contributions can be made using the standard fork and pull request process. We use the [Git Flow](https://nvie.com/posts/a-successful-git-branching-model/) branching model, so branch off of and submit the pull request back to the `develop` branch. If updating an older release, submit a pull request against the associated `support-<major>.x` branch. If one does not exist, contact us, and we will create one.

The GitHub actions may not run successfully in your forked repository without several secrets and external resources used for integration testing. You can ignore this and rely on the actions that will run in our repository when you create the pull request, but you should be able to run local unit tests to test your changes.

Once the pull request is reviewed and approved and all the integration tests pass, we will merge it and handle releasing the updates.

If making a significant contribution, please reach out to the development team's Slack channel, [#health-record-ingestion](https://alvearie.slack.com/archives/C01GM43LFJ6), so that we can coordinate the desired changes.
4 changes: 2 additions & 2 deletions LICENSE
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@
same "printed page" as the copyright notice for easier
identification within third-party archives.

Copyright [yyyy] [name of copyright owner]
Copyright 2021 IBM

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand All @@ -198,4 +198,4 @@
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
limitations under the License.
4 changes: 4 additions & 0 deletions MAINTAINERS.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# MAINTAINERS

- David N Perkins - david.n.perkins@ibm.com
- Aram Openden - aram.openden1@ibm.com
78 changes: 77 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,2 +1,78 @@
# hri-flink-pipeline-core
HRI Flink pipeline processing core library
This repo contains the Alvearie Health Record Ingestion Flink pipeline processing core library. It defines a Flink stream processing job for validating records as they flow through the HRI, with an interface for the actual validation of records. This enables the creation of validation jobs that are specific to a data type or use-case, while maintaining a consistent processing architecture. Here is a diagram of the logical process graph.

![Flink pipeline architecture](flink-progess-diagram.png)
*_This is a draw.io diagram embedded in a png. Use draw.io to make changes._

### Input/Sources
Records are read from both the `*.in` and `*.notification` Kafka topics. The `*.notification` topic supplies Batch metadata information to the `Validation Processor` and the `Tracker`.

### Validation Processor
The Validation processor stores Batch notification messages in it's broadcast state, which is used for initial checks for every record. It then calls a provided record validator and outputs the record to the valid or invalid downstream Kafka sinks. Additionally, a message is sent to the Tracker indicating if the record is valid.

Initial record checks:
* record has `batchId` header
* `batchId` exists
* Batch status is not `failed`, `terminated`, or `completed`
* provided Validator checks record

The Validation processor uses the job parallelism, same as both Kafka sources. It uses a Broadcast state, because the stream of records is not keyed, so every instance needs to know about every batch. Keying the stream by batch id would limit the parallelism to 1 for each batch. The parallelism should be less than or equal to the number of Kafka partitions for the `*.in` topic.

When the validation processor attempts to get a batch by batchId or tenantId, if an HTTP error status code of `404` is returned indicating that the batch ID or tenant ID was not found, a warning message will be logged and the records will be written to the `*.invalid` topic. No other errors will be raised. This is because such an error is likely due to a mistake by a data integrator, and does not warrant a more aggressive failure by the validation processor. If any other HTTP status code in the `400`s (e.g. `400-Bad Request`, `401-Unauthorized`) is returned, the validation will immediately fail. This indicates something is wrong with the HTTP requests, and there is little point in retrying. For any other HTTP error status code, the validation processor will retry for 24 hours, with an exponentially increasing amount of time between each attempt (with a maximum wait time of 5 minutes.)

### Tracker
The Tracker keeps track of every batch's progress and calls the HRI Management API when it is complete or fails. Batches are failed if the number of invalid records reaches the threshold, which is set per batch, or if more records are received then expected. Batches are completed once all the records are received and after a configurable delay. The Batch completion delay is used to wait for any additional records, which would fail the batch. Once the batch is completed, additional records are sent to the invalid topic and do not affect the status of the batch.

Because both Batch notification messages and Validation valid count messages could trigger a Batch status change, both of these input streams have to be keyed by the Batch ID. This ensures that all the messages for a given batch are always processed by the same Tracker instance, which enables it to store the total count and Batch notification together.

This however does limit the parallelism of the Tracker. Every Batch can only be processed by one Tracker instance. We don't believe this will be a significant performance/throughput bottleneck since the computation is very minimal. Currently, the Tracker's parallelism is set to one quarter of the job's parallelism.

### Output/Sinks
Records that pass validation are written to `*.out` topic. For invalid records, an Invalid record is written to the `*.invalid` topic. The Invalid records contains the error message and a pointer (topic, partition, offset) to the original record, and is defined in the `hri-api-spec` [repo](https://github.com/Alvearie/hri-api-spec/blob/main/notifications/invalidRecord.json). Once a Batch is failed or terminated, the Validation processor stops forwarding valid or invalid records down stream.

The Tracker uses a HRI Management API sink to complete or fail a Batch, which is described above. The sink calls the Batch `processingComplete` or `fail` endpoints respectively. Note that the HRI Management API will write a new Batch notification reflecting the status change to the `*.notification` topic. This creates an external loop in our flow, but is necessary for the Validation processor to pick up these changes. The Tracker keeps additional state about whether it has completed or failed a batch to prevent infinite loops.

If, when updating a batch as either `processingComplete` or `fail`, the returned HTTP status code is a `409` (Conflict), indicating that the batch is already in the desired state, a warning message will be logged but no other errors will be raised. For all other HTTP status codes in the `400`s (e.g. `400-Bad Request`, `401-Unauthorized`) returned, the attempt to update the status of the batch will immediately fail. This scenario indicates something is wrong with the HTTP requests, and there is little point in retrying. For any other HTTP error status code, the HRI Management API sink will retry the status update for 24 hours, with an exponentially increasing amount of time between each attempt (with a maximum wait time of 5 minutes.)

### Testing
There are three constructors for the Base Validation Job class, one for production and two for testing purposes. One test constructor takes mock sources and sinks for end-to-end unit tests of the entire job. Any changes to the job will require updating and or adding additional end-to-end unit tests to `ValidationJobIntTest.scala`. The other test constructor is for stand-alone integration testing without the HRI Management API. Instead of using the HRI Management API sink, it uses a Kafka sink that writes Batch notification messages directly to the `*.notification` topic. This enables testing error scenarios that are difficult to simulate with the HRI Management API.

## Integration Testing hri-flink-pipeline-core
In order to fully test your branch of hri-flink-pipeline-core, you must create a branch of a hri-flink-validation repo, set it to point to your branch and run the test on that branch as well.

### Set up the hri-flink-validation branch
First, create and set up your hri-flink-validation branch. You only have to do this once per hri-flink-pipeline-core branch. The purpose of this set-up is to configure the flink validation code to use your branch of hri-flink-pipeline-core instead of `develop`, so you can verify the integration tests still pass with your changes.

1. If you haven't already, create a branch in one of the hri-flink-validation repo's, such as `hri-flink-validation-fhir`, and name it the same as this branch. Check out your new branch.

1. In the hri-flink-validation branch, find the file `./build.gradle`. Look for the line similar to `pipelineCoreVersion = 'develop-SNAPSHOT'`. In this line, replace the word `develop` with the name of your branch. For example, if you are working in the branch `WHFHRI-000`, you would change the line to `pipelineCoreVersion = 'WHFHRI-000-SNAPSHOT'`.

1. Save, commit and push the changes you just made.

### Test your branch
Next, these are the steps to actively test your work in hri-flink-pipeline-core branch. You may have to go through these steps multiple times.

1. Make/Fix your changes.

1. `cd` into this repo, then run `./gradlew clean build`. If there are errors, go back to the first step.

1. Commit your changes and push them to your branch.

1. Go to GitHub actions and wait for it to rebuild and publish your branch.

1. For the hri-flink-validation repo you decided to test with in the first section, go to its GitHub actions and re-run the integration tests. This time they should run using your hri-flink-pipeline-core branch's build.

1. If all builds pass, your branch is properly tested against the validation library.

### Final Steps
After you pass all the tests, create a PR, have the PR approved, merge/rebase this branch back into develop, and then delete this branch... The very last step is to delete the hri-flink-validation branch you made.

## Releases
Releases are created by creating Git tags, which trigger a GitHub Actions build that publishes a release version in GitHub packages, see [Overall strategy](https://github.com/Alvearie/HRI/wiki/Overall-Project-Branching,-Test,-and-Release-Strategy) for more details.

## Contribution Guide
Please read [CONTRIBUTING.md](CONTRIBUTING.md) for details on our code of conduct, and the process for submitting pull requests to us.

# Communication
Please [join](https://alvearie.io/contributions/requestSlackAccess/) our Slack channel for further questions: `#health-record-ingestion`
Please see recent contributors or [maintainers](MAINTAINERS.md)
172 changes: 172 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
/**
* (C) Copyright IBM Corp. 2021
*
* SPDX-License-Identifier: Apache-2.0
*/

plugins {
// Apply the scala plugin to add support for Scala
id 'scala'
id 'maven-publish'
id "com.github.maiflai.scalatest" version "0.25"
id "org.scoverage" version "4.0.1"
}

group = 'org.alvearie.hri.flink'
version = '2.1-1.0.1'
description = """HRI Flink Pipeline Core Library"""

ext {
javaVersion = '1.8'
flinkVersion = '1.10.0'
scalaBinaryVersion = '2.12'
scalaVersion = '2.12.11'
scalaTestVersion = '3.1.1'
slf4jVersion = '1.7.7'
log4jVersion = '1.2.17'
jacksonVersion = '2.12.0'
branch = System.getenv('TRAVIS_BRANCH') != null
? System.getenv('TRAVIS_BRANCH')
: getWorkingBranch()

}

// If not running in travis add 'local' to the version to support local development
if (System.getenv('TRAVIS_BRANCH') == null || System.getenv('TRAVIS_BRANCH') == "") {
version = "${branch}-local-SNAPSHOT"
} else if (System.getenv('TRAVIS_TAG') == null || System.getenv('TRAVIS_TAG') == "") {
version = "${branch}-SNAPSHOT"
} else if (System.getenv('TRAVIS_TAG') == "v${version}") {
version = "${version}"
} else {
throw new InvalidUserDataException(String.format("The tag '%s' does not match with the current release version '%s'",System.getenv('TRAVIS_TAG'),"${version}"));
}

task jarTests(type: Jar) {
from sourceSets.test.output
archiveClassifier = 'tests'
archiveName "hri-flink-pipeline-core-tests.jar"
}

sourceCompatibility = javaVersion
targetCompatibility = javaVersion
tasks.withType(JavaCompile) {
options.encoding = 'UTF-8'
}

// declare where to find the dependencies of your project
repositories {
['TBD'].each { repo ->
maven {
credentials {
username = findProperty('user') ?: System.getenv('user')
password = findProperty('password') ?: System.getenv('password')
}

url "TBD/$repo/"
}
}
mavenCentral()
mavenLocal()
}

dependencies {
// Scala lib
implementation "org.scala-lang:scala-library:${scalaVersion}"

// --------------------------------------------------------------
// Flink dependencies that should not be included as transitive
// dependencies of this library, because they should not be
// included in Flink job jars.
// --------------------------------------------------------------
compileOnly "org.apache.flink:flink-scala_${scalaBinaryVersion}:${flinkVersion}"
compileOnly "org.apache.flink:flink-streaming-scala_${scalaBinaryVersion}:${flinkVersion}"
compileOnly "log4j:log4j:${log4jVersion}"
compileOnly "org.slf4j:slf4j-log4j12:${slf4jVersion}"
testImplementation "org.apache.flink:flink-scala_${scalaBinaryVersion}:${flinkVersion}"
testImplementation "org.apache.flink:flink-streaming-scala_${scalaBinaryVersion}:${flinkVersion}"
testImplementation "log4j:log4j:${log4jVersion}"
testImplementation "org.slf4j:slf4j-log4j12:${slf4jVersion}"

// --------------------------------------------------------------
// Dependencies that library users should include in their job
// shadow jar
// --------------------------------------------------------------
implementation "org.alvearie.hri:hri-api-batch-notification:2.1-2.0.1"
implementation "org.apache.flink:flink-connector-kafka_${scalaBinaryVersion}:${flinkVersion}"
implementation "com.fasterxml.jackson.core:jackson-databind:${jacksonVersion}"
implementation "com.fasterxml.jackson.module:jackson-module-scala_${scalaBinaryVersion}:${jacksonVersion}"
implementation "com.fasterxml.jackson.datatype:jackson-datatype-jsr310:${jacksonVersion}"
implementation "org.apache.httpcomponents:httpclient:4.5.13"
implementation "commons-codec:commons-codec:1.15" // force upgade of httpclient dependency due to vulnerability

//Test dependencies here:
testImplementation "org.scalactic:scalactic_${scalaBinaryVersion}:${scalaTestVersion}"
testImplementation "org.scalatest:scalatest_${scalaBinaryVersion}:${scalaTestVersion}"
testImplementation "org.mockito:mockito-scala_${scalaBinaryVersion}:1.14.4"
testImplementation "org.mockito:mockito-scala-scalatest_${scalaBinaryVersion}:1.14.4"
testImplementation "org.apache.flink:flink-tests:${flinkVersion}:tests"
testImplementation "org.apache.flink:flink-test-utils_${scalaBinaryVersion}:${flinkVersion}"
testImplementation "org.apache.flink:flink-runtime_${scalaBinaryVersion}:${flinkVersion}:tests"
testImplementation "org.apache.flink:flink-streaming-java_${scalaBinaryVersion}:${flinkVersion}:tests"

testRuntimeOnly "com.vladsch.flexmark:flexmark-all:0.35.10"
testRuntime 'org.pegdown:pegdown:1.4.2'
testCompile "info.picocli:picocli:4.2.0"
}

publishing {
publications {
mavenJava(MavenPublication) {
from components.java
artifactId = 'hri-flink-pipeline-core'
}
mavenJava(MavenPublication) {
artifact jarTests
}
}

repositories {
maven {
url 'TBD'
credentials {
username findProperty('user') ?: System.getenv('user')
password findProperty('password') ?: System.getenv('password')
}
}
}
}

// this is for App Scan
task copyDependencies(type: Copy) {
from configurations.default
into 'dependencies'
}

scoverage {
scoverageScalaVersion = scalaBinaryVersion
coverageOutputXML = true
coverageOutputHTML = true
}
reportScoverage.doLast {
println "Scoverage report:\n file:///$buildDir/reports/scoverage/index.html"
}

reportScoverage.mustRunAfter test
jarTests.dependsOn 'test'
build.dependsOn reportScoverage

/**
* Get the name of the working branch of the project
*
* @return Name of the working branch
*/
def getWorkingBranch() {
// Triple double-quotes for the breaklines
def workingBranch = """git --git-dir=${rootDir}/.git
--work-tree=${rootDir}
rev-parse --abbrev-ref HEAD""".execute().text.trim()
println "Working branch: " + workingBranch
return workingBranch
}

Binary file added flink-progess-diagram.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added gradle/wrapper/gradle-wrapper.jar
Binary file not shown.
5 changes: 5 additions & 0 deletions gradle/wrapper/gradle-wrapper.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-5.5.1-bin.zip
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists
Loading

0 comments on commit 2dcd041

Please sign in to comment.