From 0244cfd02b71b05fe48a10093207f76e8efdae5e Mon Sep 17 00:00:00 2001 From: Yanjun Zhou Date: Tue, 13 Sep 2022 19:38:27 -0700 Subject: [PATCH] Adopt Golang version ClickHouse data schema management Signed-off-by: Yanjun Zhou --- .github/workflows/build.yml | 31 ++ .github/workflows/build_tag.yml | 17 +- .github/workflows/docker_update_theia.yml | 9 - .github/workflows/kind.yml | 42 ++- Makefile | 12 + build/charts/theia/README.md | 2 +- .../theia/provisioning/datasources/init.sh | 16 +- .../theia/provisioning/datasources/migrate.sh | 128 ------- .../migrators/000001_0-1-0.down.sql | 0 .../datasources/migrators/000001_0-1-0.up.sql | 84 +++++ .../{0-2-0_0-1-0.sql => 0-3-0_0-2-0.sql} | 0 .../migrators/upgrade/0-1-0_0-2-0.sql | 262 -------------- build/charts/theia/templates/_helpers.tpl | 48 ++- .../clickhouse/clickhouseinstallation.yaml | 6 +- .../theia/templates/clickhouse/configmap.yaml | 2 +- build/charts/theia/values.yaml | 2 +- .../Dockerfile.clickhouse-server.ubuntu | 14 + build/yamls/flow-visibility.yml | 338 ++---------------- ci/jenkins/test-vmc.sh | 7 +- ci/kind/test-e2e-kind.sh | 4 +- ci/kind/test-upgrade-theia.sh | 12 +- go.mod | 1 + go.sum | 2 + hack/generate-manifest.sh | 5 + plugins/clickhouse-schema-management/main.go | 338 ++++++++++++++++++ .../clickhouse-schema-management/main_test.go | 316 ++++++++++++++++ test/e2e/upgrade_test.go | 16 +- 27 files changed, 968 insertions(+), 746 deletions(-) delete mode 100755 build/charts/theia/provisioning/datasources/migrate.sh create mode 100644 build/charts/theia/provisioning/datasources/migrators/000001_0-1-0.down.sql create mode 100644 build/charts/theia/provisioning/datasources/migrators/000001_0-1-0.up.sql rename build/charts/theia/provisioning/datasources/migrators/downgrade/{0-2-0_0-1-0.sql => 0-3-0_0-2-0.sql} (100%) delete mode 100644 build/charts/theia/provisioning/datasources/migrators/upgrade/0-1-0_0-2-0.sql create mode 100644 build/images/Dockerfile.clickhouse-server.ubuntu create mode 100644 plugins/clickhouse-schema-management/main.go create mode 100644 plugins/clickhouse-schema-management/main_test.go diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index b5850dd0..43097f81 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -43,6 +43,37 @@ jobs: run: | echo "$DOCKER_PASSWORD" | docker login -u "$DOCKER_USERNAME" --password-stdin docker push antrea/theia-clickhouse-monitor:latest + + check-clickhouse-server-changes: + name: Check whether clickhouse-server image needs to be built based on diff + runs-on: [ubuntu-latest] + steps: + - uses: actions/checkout@v3 + with: + fetch-depth: 0 + - uses: antrea-io/has-changes@v2 + id: check_diff + with: + paths: plugins/clickhouse-schema-management/* build/images/Dockerfile.clickhouse-server.ubuntu + outputs: + has_changes: ${{ steps.check_diff.outputs.has_changes }} + + build-clickhouse-server: + needs: check-clickhouse-server-changes + if: ${{ needs.check-clickhouse-server-changes.outputs.has_changes == 'yes' || github.event_name == 'push' }} + runs-on: [ubuntu-latest] + steps: + - uses: actions/checkout@v2 + - name: Build clickhouse-server Docker image + run: make clickhouse-server + - name: Push clickhouse-server Docker image to registry + if: ${{ github.repository == 'antrea-io/theia' && github.event_name == 'push' && github.ref == 'refs/heads/main' }} + env: + DOCKER_USERNAME: ${{ secrets.DOCKER_USERNAME }} + DOCKER_PASSWORD: ${{ secrets.DOCKER_PASSWORD }} + run: | + echo "$DOCKER_PASSWORD" | docker login -u "$DOCKER_USERNAME" --password-stdin + docker push antrea/theia-clickhouse-server:latest check-policy-recommendation-changes: name: Check whether policy-recommendation image needs to be built based on diff diff --git a/.github/workflows/build_tag.yml b/.github/workflows/build_tag.yml index eb1bc780..1ddf6cf4 100644 --- a/.github/workflows/build_tag.yml +++ b/.github/workflows/build_tag.yml @@ -33,7 +33,22 @@ jobs: make clickhouse-monitor echo "$DOCKER_PASSWORD" | docker login -u "$DOCKER_USERNAME" --password-stdin docker push antrea/theia-clickhouse-monitor:"${VERSION}" - + + build-clickhouse-server: + runs-on: [ubuntu-latest] + needs: get-version + steps: + - uses: actions/checkout@v2 + - name: Build clickhouse-server Docker image and push to registry + env: + DOCKER_USERNAME: ${{ secrets.DOCKER_USERNAME }} + DOCKER_PASSWORD: ${{ secrets.DOCKER_PASSWORD }} + VERSION: ${{ needs.get-version.outputs.version }} + run: | + make clickhouse-server + echo "$DOCKER_PASSWORD" | docker login -u "$DOCKER_USERNAME" --password-stdin + docker push antrea/theia-clickhouse-server:"${VERSION}" + build-policy-recommendation: runs-on: [ubuntu-latest] needs: get-version diff --git a/.github/workflows/docker_update_theia.yml b/.github/workflows/docker_update_theia.yml index 4cad3660..0602e746 100644 --- a/.github/workflows/docker_update_theia.yml +++ b/.github/workflows/docker_update_theia.yml @@ -13,9 +13,6 @@ on: grafana-tag: description: 'Tag for grafana Docker image' required: true - ch-server-tag: - description: 'Tag for clickhouse-server Docker image' - required: true spark-operator-tag: description: 'Tag for spark-operator Docker image' required: true @@ -50,12 +47,6 @@ jobs: src: docker.io/grafana/grafana:${{ github.event.inputs.grafana-tag }} dst: | docker.io/antrea/theia-grafana:${{ github.event.inputs.grafana-tag }} - - name: Push antrea/theia-clickhouse-server - uses: akhilerm/tag-push-action@v2.0.0 - with: - src: docker.io/clickhouse/clickhouse-server:${{ github.event.inputs.ch-server-tag }} - dst: | - docker.io/antrea/theia-clickhouse-server:${{ github.event.inputs.ch-server-tag }} - name: Push antrea/theia-spark-operator uses: akhilerm/tag-push-action@v2.0.0 with: diff --git a/.github/workflows/kind.yml b/.github/workflows/kind.yml index cf3eea7e..58d9de1b 100644 --- a/.github/workflows/kind.yml +++ b/.github/workflows/kind.yml @@ -63,11 +63,29 @@ jobs: path: clickhouse-monitor.tar retention-days: 1 # minimum value, in case artifact deletion by 'artifact-cleanup' job fails + build-clickhouse-server-image: + name: Build ClickHouse server image to be used for Kind e2e tests + needs: check-changes + if: ${{ needs.check-changes.outputs.has_changes == 'yes' }} + runs-on: [ ubuntu-latest ] + steps: + - uses: actions/checkout@v3 + - run: make clickhouse-server + - name: Save ClickHouse server image to tarball + run: docker save -o clickhouse-server.tar antrea/theia-clickhouse-server + - name: Upload ClickHouse server image for subsequent jobs + uses: actions/upload-artifact@v3 + with: + name: clickhouse-server + path: clickhouse-server.tar + retention-days: 1 # minimum value, in case artifact deletion by 'artifact-cleanup' job fails + test-e2e-encap: name: E2e tests on a Kind cluster on Linux needs: - build-policy-recommendation-image - build-clickhouse-monitor-image + - build-clickhouse-server-image runs-on: [ubuntu-latest] steps: - name: Free disk space @@ -87,12 +105,18 @@ jobs: uses: actions/download-artifact@v3 with: name: clickhouse-monitor + - name: Download ClickHouse server images from previous jobs + uses: actions/download-artifact@v3 + with: + name: clickhouse-server - name: Load Theia image run: | docker load -i policy-recommendation.tar docker tag antrea/theia-policy-recommendation:latest projects.registry.vmware.com/antrea/theia-policy-recommendation:latest docker load -i clickhouse-monitor.tar docker tag antrea/theia-clickhouse-monitor:latest projects.registry.vmware.com/antrea/theia-clickhouse-monitor:latest + docker load -i clickhouse-server.tar + docker tag antrea/theia-clickhouse-server:latest projects.registry.vmware.com/antrea/theia-clickhouse-server:latest - name: Install Kind run: | curl -Lo ./kind https://github.com/kubernetes-sigs/kind/releases/download/${KIND_VERSION}/kind-$(uname)-amd64 @@ -115,7 +139,9 @@ jobs: test-upgrade-from-N-1: name: Upgrade from Theia version N-1 - needs: build-clickhouse-monitor-image + needs: + - build-clickhouse-monitor-image + - build-clickhouse-server-image runs-on: [ubuntu-latest] steps: - name: Free disk space @@ -135,6 +161,14 @@ jobs: run: | docker load -i clickhouse-monitor.tar docker tag antrea/theia-clickhouse-monitor:latest projects.registry.vmware.com/antrea/theia-clickhouse-monitor:latest + - name: Download ClickHouse server images from previous jobs + uses: actions/download-artifact@v3 + with: + name: clickhouse-server + - name: Load Theia image + run: | + docker load -i clickhouse-server.tar + docker tag antrea/theia-clickhouse-server:latest projects.registry.vmware.com/antrea/theia-clickhouse-server:latest - name: Install Kind run: | curl -Lo ./kind https://github.com/kubernetes-sigs/kind/releases/download/${KIND_VERSION}/kind-$(uname)-amd64 @@ -164,6 +198,7 @@ jobs: needs: - build-policy-recommendation-image - build-clickhouse-monitor-image + - build-clickhouse-server-image - test-e2e-encap - test-upgrade-from-N-1 runs-on: [ubuntu-latest] @@ -178,3 +213,8 @@ jobs: uses: geekyeggo/delete-artifact@v1 with: name: clickhouse-monitor + - name: Delete clickhouse-server + if: ${{ needs.build-clickhouse-server-image.result == 'success' }} + uses: geekyeggo/delete-artifact@v1 + with: + name: clickhouse-server diff --git a/Makefile b/Makefile index 01414eb6..2830d0e2 100644 --- a/Makefile +++ b/Makefile @@ -175,6 +175,18 @@ theia-manager: theia-manager-bin: @mkdir -p $(BINDIR) GOOS=linux $(GO) build -o $(BINDIR) $(GOFLAGS) -ldflags '$(LDFLAGS)' antrea.io/theia/cmd/theia-manager +.PHONY: clickhouse-server +clickhouse-server: + @echo "===> Building antrea/theia-clickhouse-server Docker image <===" + docker build --pull -t antrea/theia-clickhouse-server:$(DOCKER_IMG_VERSION) -f build/images/Dockerfile.clickhouse-server.ubuntu $(DOCKER_BUILD_ARGS) . + docker tag antrea/theia-clickhouse-server:$(DOCKER_IMG_VERSION) antrea/theia-clickhouse-server + docker tag antrea/theia-clickhouse-server:$(DOCKER_IMG_VERSION) projects.registry.vmware.com/antrea/theia-clickhouse-server + docker tag antrea/theia-clickhouse-server:$(DOCKER_IMG_VERSION) projects.registry.vmware.com/antrea/theia-clickhouse-server:$(DOCKER_IMG_VERSION) + +.PHONY: clickhouse-schema-management-plugin +clickhouse-schema-management-plugin: + @mkdir -p $(BINDIR) + GOOS=linux $(GO) build -o $(BINDIR) $(GOFLAGS) -ldflags '$(LDFLAGS)' antrea.io/theia/plugins/clickhouse-schema-management .PHONY: policy-recommendation policy-recommendation: diff --git a/build/charts/theia/README.md b/build/charts/theia/README.md index 977975fd..dfe8739f 100644 --- a/build/charts/theia/README.md +++ b/build/charts/theia/README.md @@ -26,7 +26,7 @@ Kubernetes: `>= 1.16.0-0` | clickhouse.cluster.shards | int | `1` | Number of ClickHouse shards in the cluster. | | clickhouse.cluster.zookeeperHosts | list | `[]` | To use a pre-installed ZooKeeper for ClickHouse data replication, please provide a list of your ZooKeeper hosts. To install a customized ZooKeeper, refer to | | clickhouse.connectionSecret | object | `{"password":"clickhouse_operator_password","username":"clickhouse_operator"}` | Credentials to connect to ClickHouse. They will be stored in a secret. | -| clickhouse.image | object | `{"pullPolicy":"IfNotPresent","repository":"projects.registry.vmware.com/antrea/theia-clickhouse-server","tag":"22.6"}` | Container image used by ClickHouse. | +| clickhouse.image | object | `{"pullPolicy":"IfNotPresent","repository":"projects.registry.vmware.com/antrea/theia-clickhouse-server","tag":""}` | Container image used by ClickHouse. | | clickhouse.monitor.deletePercentage | float | `0.5` | The percentage of records in ClickHouse that will be deleted when the storage grows above threshold. Vary from 0 to 1. | | clickhouse.monitor.enable | bool | `true` | Determine whether to run a monitor to periodically check the ClickHouse memory usage and clean data. | | clickhouse.monitor.execInterval | string | `"1m"` | The time interval between two round of monitoring. Can be a plain integer using one of these unit suffixes ns, us (or µs), ms, s, m, h. | diff --git a/build/charts/theia/provisioning/datasources/init.sh b/build/charts/theia/provisioning/datasources/init.sh index 4a61123b..0f77240a 100644 --- a/build/charts/theia/provisioning/datasources/init.sh +++ b/build/charts/theia/provisioning/datasources/init.sh @@ -18,9 +18,21 @@ set -e THIS_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )" -source $THIS_DIR/migrate.sh source $THIS_DIR/create_table.sh -migrate +# This function is kept to be compatible with version below v0.3.0 +function setDataVersion { + tables=$(clickhouse client -h 127.0.0.1 -q "SHOW TABLES") + if [[ $tables == *"migrate_version"* ]]; then + clickhouse client -h 127.0.0.1 -q "ALTER TABLE migrate_version DELETE WHERE version!=''" + else + clickhouse client -h 127.0.0.1 -q "CREATE TABLE migrate_version (version String) engine=MergeTree ORDER BY version" + fi + clickhouse client -h 127.0.0.1 -q "INSERT INTO migrate_version (*) VALUES ('{{ .Chart.Version }}')" + echo "=== Set data schema version to {{ .Chart.Version }} ===" +} + +../clickhouse-schema-management createTable setDataVersion + diff --git a/build/charts/theia/provisioning/datasources/migrate.sh b/build/charts/theia/provisioning/datasources/migrate.sh deleted file mode 100755 index a1824c07..00000000 --- a/build/charts/theia/provisioning/datasources/migrate.sh +++ /dev/null @@ -1,128 +0,0 @@ -#!/usr/bin/env bash - -# Copyright 2022 Antrea Authors. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -function checkDataVersion { - tables=$(clickhouse client -h 127.0.0.1 -q "SHOW TABLES") - if [[ $tables == *"migrate_version"* ]]; then - dataVersion=$(clickhouse client -h 127.0.0.1 -q "SELECT version FROM migrate_version") - elif [[ $tables == *"flows"* ]]; then - dataVersion="0.1.0" - fi -} - -function setDataVersion { - tables=$(clickhouse client -h 127.0.0.1 -q "SHOW TABLES") - if [[ $tables == *"migrate_version"* ]]; then - clickhouse client -h 127.0.0.1 -q "ALTER TABLE migrate_version DELETE WHERE version!=''" - else - clickhouse client -h 127.0.0.1 -q "CREATE TABLE migrate_version (version String) engine=MergeTree ORDER BY version" - fi - clickhouse client -h 127.0.0.1 -q "INSERT INTO migrate_version (*) VALUES ('{{ .Chart.Version }}')" - echo "=== Set data schema version to {{ .Chart.Version }} ===" -} - -function addVersionsToList { - if [[ "$versionListStr" != *"$1"* ]]; then - versionListStr+="$1," - fi -} - -function getMigrationPath { - # Get versions based on the SQL file names - versionListStr="" - for fileName in $(ls "$migratorDir/upgrade") - do - # fileName upgrading from v0.1.0 to v0.2.0: 0-1-0_0-2-0.sql - fileName=$(basename $fileName .sql) - versionPair=(${fileName//_/ }) - addVersionsToList ${versionPair[0]//-/.} - addVersionsToList ${versionPair[1]//-/.} - done - addVersionsToList $dataVersion - addVersionsToList $theiaVersion - # Sort the versions to generate the migration path - versionList=(${versionListStr//,/ }) - old_IFS=$IFS - IFS=$'\n' sortedversionList=($(sort -V <<<"${versionList[*]}")) - IFS=$old_IFS - # Define the upgrading/downgrading path - index=0 - prev="" - for version in "${sortedversionList[@]}" - do - migratePathStr+="$index:$version, " - index=$((index+1)) - if [[ -z "$prev" ]]; then - prev=${version//./-} - else - curr=${version//./-} - UPGRADING_MIGRATORS+=("${prev}_${curr}.sql") - DOWNGRADING_MIGRATORS+=("${curr}_${prev}.sql") - prev=$curr - fi - done -} - -function version_lt() { test "$(printf '%s\n' "$@" | sort -rV | head -n 1)" != "$1"; } - -function migrate { - THIS_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )" - migratorDir="/var/lib/clickhouse/migrators" - mkdir -p $migratorDir - - checkDataVersion - if [[ -z "$dataVersion" ]]; then - cp -r $THIS_DIR/migrators/* $migratorDir - echo "=== No existing data schema. Migration skipped. ===" - return 0 - fi - if [[ -z "$THEIA_VERSION" ]]; then - echo "=== Unable to load the environment variable THEIA_VERSION. Migration failed. ===" - exit 1 - fi - theiaVersion=$THEIA_VERSION - - migratePathStr="" - getMigrationPath - - cp -r $THIS_DIR/migrators/* $migratorDir - - dataVersionIndex=$(echo $migratePathStr | tr ', ' '\n' | grep "$dataVersion" | sed 's/:/ /g' | awk '{print $1}') - theiaVersionIndex=$(echo $migratePathStr | tr ', ' '\n' | grep "$theiaVersion" | sed 's/:/ /g' | awk '{print $1}') - - # Update along the path - if [[ "$dataVersionIndex" -lt "$theiaVersionIndex" ]]; then - for i in $(seq $dataVersionIndex $((theiaVersionIndex-1)) ); - do - if test -f "$migratorDir/upgrade/${UPGRADING_MIGRATORS[$i]}"; then - echo "=== Apply file ${UPGRADING_MIGRATORS[$i]} ===" - clickhouse client -h 127.0.0.1 --queries-file $migratorDir/upgrade/${UPGRADING_MIGRATORS[$i]} - fi - done - # Downgrade along the path - elif [[ "$dataVersionIndex" -gt "$theiaVersionIndex" ]]; then - for i in $(seq $((dataVersionIndex-1)) -1 $theiaVersionIndex); - do - if test -f "$migratorDir/downgrade/${DOWNGRADING_MIGRATORS[$i]}"; then - echo "=== Apply file ${DOWNGRADING_MIGRATORS[$i]} ===" - clickhouse client -h 127.0.0.1 --queries-file $migratorDir/downgrade/${DOWNGRADING_MIGRATORS[$i]} - fi - done - else - echo "=== Data schema version is the same as Theia version. Migration finished. ===" - fi - setDataVersion -} diff --git a/build/charts/theia/provisioning/datasources/migrators/000001_0-1-0.down.sql b/build/charts/theia/provisioning/datasources/migrators/000001_0-1-0.down.sql new file mode 100644 index 00000000..e69de29b diff --git a/build/charts/theia/provisioning/datasources/migrators/000001_0-1-0.up.sql b/build/charts/theia/provisioning/datasources/migrators/000001_0-1-0.up.sql new file mode 100644 index 00000000..d5c86029 --- /dev/null +++ b/build/charts/theia/provisioning/datasources/migrators/000001_0-1-0.up.sql @@ -0,0 +1,84 @@ +{{- $ttl := split " " .Values.clickhouse.ttl }} +{{- $ttlTimeout := 14400 }} +{{- if eq $ttl._1 "SECOND" }} +{{- $ttlTimeout = min $ttl._0 $ttlTimeout }} +{{- else if eq $ttl._1 "MINUTE" }} +{{- $ttlTimeout = min (mul $ttl._0 60) $ttlTimeout }} +{{- else if eq $ttl._1 "HOUR" }} +{{- $ttlTimeout = min (mul $ttl._0 60 60) $ttlTimeout }} +{{- end }} +--Create a table to store records +CREATE TABLE IF NOT EXISTS flows_local ( + timeInserted DateTime DEFAULT now(), + flowStartSeconds DateTime, + flowEndSeconds DateTime, + flowEndSecondsFromSourceNode DateTime, + flowEndSecondsFromDestinationNode DateTime, + flowEndReason UInt8, + sourceIP String, + destinationIP String, + sourceTransportPort UInt16, + destinationTransportPort UInt16, + protocolIdentifier UInt8, + packetTotalCount UInt64, + octetTotalCount UInt64, + packetDeltaCount UInt64, + octetDeltaCount UInt64, + reversePacketTotalCount UInt64, + reverseOctetTotalCount UInt64, + reversePacketDeltaCount UInt64, + reverseOctetDeltaCount UInt64, + sourcePodName String, + sourcePodNamespace String, + sourceNodeName String, + destinationPodName String, + destinationPodNamespace String, + destinationNodeName String, + destinationClusterIP String, + destinationServicePort UInt16, + destinationServicePortName String, + ingressNetworkPolicyName String, + ingressNetworkPolicyNamespace String, + ingressNetworkPolicyRuleName String, + ingressNetworkPolicyRuleAction UInt8, + ingressNetworkPolicyType UInt8, + egressNetworkPolicyName String, + egressNetworkPolicyNamespace String, + egressNetworkPolicyRuleName String, + egressNetworkPolicyRuleAction UInt8, + egressNetworkPolicyType UInt8, + tcpState String, + flowType UInt8, + sourcePodLabels String, + destinationPodLabels String, + throughput UInt64, + reverseThroughput UInt64, + throughputFromSourceNode UInt64, + throughputFromDestinationNode UInt64, + reverseThroughputFromSourceNode UInt64, + reverseThroughputFromDestinationNode UInt64, + trusted UInt8 DEFAULT 0 +) engine=ReplicatedMergeTree('/clickhouse/tables/{shard}/{database}/{table}', '{replica}') +ORDER BY (timeInserted, flowEndSeconds) +TTL timeInserted + INTERVAL {{ .Values.clickhouse.ttl }} +SETTINGS merge_with_ttl_timeout = {{ $ttlTimeout }}; + +--Move data from old table and drop old tables +INSERT INTO flows_local SELECT * FROM flows; +DROP TABLE flows; +DROP VIEW flows_pod_view; +DROP VIEW flows_node_view; +DROP VIEW flows_policy_view; + +--Create a table to store the network policy recommendation results +CREATE TABLE IF NOT EXISTS recommendations_local ( + id String, + type String, + timeCreated DateTime, + yamls String +) engine=ReplicatedMergeTree('/clickhouse/tables/{shard}/{database}/{table}', '{replica}') +ORDER BY (timeCreated); + +--Move data from old table and drop the old table +INSERT INTO recommendations_local SELECT * FROM recommendations; +DROP TABLE recommendations; diff --git a/build/charts/theia/provisioning/datasources/migrators/downgrade/0-2-0_0-1-0.sql b/build/charts/theia/provisioning/datasources/migrators/downgrade/0-3-0_0-2-0.sql similarity index 100% rename from build/charts/theia/provisioning/datasources/migrators/downgrade/0-2-0_0-1-0.sql rename to build/charts/theia/provisioning/datasources/migrators/downgrade/0-3-0_0-2-0.sql diff --git a/build/charts/theia/provisioning/datasources/migrators/upgrade/0-1-0_0-2-0.sql b/build/charts/theia/provisioning/datasources/migrators/upgrade/0-1-0_0-2-0.sql deleted file mode 100644 index 86897b56..00000000 --- a/build/charts/theia/provisioning/datasources/migrators/upgrade/0-1-0_0-2-0.sql +++ /dev/null @@ -1,262 +0,0 @@ -{{- $ttl := split " " .Values.clickhouse.ttl }} -{{- $ttlTimeout := 14400 }} -{{- if eq $ttl._1 "SECOND" }} -{{- $ttlTimeout = min $ttl._0 $ttlTimeout }} -{{- else if eq $ttl._1 "MINUTE" }} -{{- $ttlTimeout = min (mul $ttl._0 60) $ttlTimeout }} -{{- else if eq $ttl._1 "HOUR" }} -{{- $ttlTimeout = min (mul $ttl._0 60 60) $ttlTimeout }} -{{- end }} ---Create a table to store records -CREATE TABLE IF NOT EXISTS flows_local ( - timeInserted DateTime DEFAULT now(), - flowStartSeconds DateTime, - flowEndSeconds DateTime, - flowEndSecondsFromSourceNode DateTime, - flowEndSecondsFromDestinationNode DateTime, - flowEndReason UInt8, - sourceIP String, - destinationIP String, - sourceTransportPort UInt16, - destinationTransportPort UInt16, - protocolIdentifier UInt8, - packetTotalCount UInt64, - octetTotalCount UInt64, - packetDeltaCount UInt64, - octetDeltaCount UInt64, - reversePacketTotalCount UInt64, - reverseOctetTotalCount UInt64, - reversePacketDeltaCount UInt64, - reverseOctetDeltaCount UInt64, - sourcePodName String, - sourcePodNamespace String, - sourceNodeName String, - destinationPodName String, - destinationPodNamespace String, - destinationNodeName String, - destinationClusterIP String, - destinationServicePort UInt16, - destinationServicePortName String, - ingressNetworkPolicyName String, - ingressNetworkPolicyNamespace String, - ingressNetworkPolicyRuleName String, - ingressNetworkPolicyRuleAction UInt8, - ingressNetworkPolicyType UInt8, - egressNetworkPolicyName String, - egressNetworkPolicyNamespace String, - egressNetworkPolicyRuleName String, - egressNetworkPolicyRuleAction UInt8, - egressNetworkPolicyType UInt8, - tcpState String, - flowType UInt8, - sourcePodLabels String, - destinationPodLabels String, - throughput UInt64, - reverseThroughput UInt64, - throughputFromSourceNode UInt64, - throughputFromDestinationNode UInt64, - reverseThroughputFromSourceNode UInt64, - reverseThroughputFromDestinationNode UInt64, - trusted UInt8 DEFAULT 0 -) engine=ReplicatedMergeTree('/clickhouse/tables/{shard}/{database}/{table}', '{replica}') -ORDER BY (timeInserted, flowEndSeconds) -TTL timeInserted + INTERVAL {{ .Values.clickhouse.ttl }} -SETTINGS merge_with_ttl_timeout = {{ $ttlTimeout }}; - ---Move data from old table and drop old tables -INSERT INTO flows_local SELECT * FROM flows; -DROP TABLE flows; -DROP VIEW flows_pod_view; -DROP VIEW flows_node_view; -DROP VIEW flows_policy_view; - ---Create a Materialized View to aggregate data for pods -CREATE MATERIALIZED VIEW IF NOT EXISTS flows_pod_view_local -ENGINE = ReplicatedSummingMergeTree('/clickhouse/tables/{shard}/{database}/{table}', '{replica}') -ORDER BY ( - timeInserted, - flowEndSeconds, - flowEndSecondsFromSourceNode, - flowEndSecondsFromDestinationNode, - sourcePodName, - destinationPodName, - destinationIP, - destinationServicePort, - destinationServicePortName, - flowType, - sourcePodNamespace, - destinationPodNamespace, - sourceTransportPort, - destinationTransportPort) -TTL timeInserted + INTERVAL {{ .Values.clickhouse.ttl }} -SETTINGS merge_with_ttl_timeout = {{ $ttlTimeout }} -POPULATE -AS SELECT - timeInserted, - flowEndSeconds, - flowEndSecondsFromSourceNode, - flowEndSecondsFromDestinationNode, - sourcePodName, - destinationPodName, - destinationIP, - destinationServicePort, - destinationServicePortName, - flowType, - sourcePodNamespace, - destinationPodNamespace, - sourceTransportPort, - destinationTransportPort, - sum(octetDeltaCount) AS octetDeltaCount, - sum(reverseOctetDeltaCount) AS reverseOctetDeltaCount, - sum(throughput) AS throughput, - sum(reverseThroughput) AS reverseThroughput, - sum(throughputFromSourceNode) AS throughputFromSourceNode, - sum(throughputFromDestinationNode) AS throughputFromDestinationNode -FROM flows_local -GROUP BY - timeInserted, - flowEndSeconds, - flowEndSecondsFromSourceNode, - flowEndSecondsFromDestinationNode, - sourcePodName, - destinationPodName, - destinationIP, - destinationServicePort, - destinationServicePortName, - flowType, - sourcePodNamespace, - destinationPodNamespace, - sourceTransportPort, - destinationTransportPort; - ---Create a Materialized View to aggregate data for nodes -CREATE MATERIALIZED VIEW IF NOT EXISTS flows_node_view_local -ENGINE = ReplicatedSummingMergeTree('/clickhouse/tables/{shard}/{database}/{table}', '{replica}') -ORDER BY ( - timeInserted, - flowEndSeconds, - flowEndSecondsFromSourceNode, - flowEndSecondsFromDestinationNode, - sourceNodeName, - destinationNodeName, - sourcePodNamespace, - destinationPodNamespace) -TTL timeInserted + INTERVAL {{ .Values.clickhouse.ttl }} -SETTINGS merge_with_ttl_timeout = {{ $ttlTimeout }} -POPULATE -AS SELECT - timeInserted, - flowEndSeconds, - flowEndSecondsFromSourceNode, - flowEndSecondsFromDestinationNode, - sourceNodeName, - destinationNodeName, - sourcePodNamespace, - destinationPodNamespace, - sum(octetDeltaCount) AS octetDeltaCount, - sum(reverseOctetDeltaCount) AS reverseOctetDeltaCount, - sum(throughput) AS throughput, - sum(reverseThroughput) AS reverseThroughput, - sum(throughputFromSourceNode) AS throughputFromSourceNode, - sum(reverseThroughputFromSourceNode) AS reverseThroughputFromSourceNode, - sum(throughputFromDestinationNode) AS throughputFromDestinationNode, - sum(reverseThroughputFromDestinationNode) AS reverseThroughputFromDestinationNode -FROM flows_local -GROUP BY - timeInserted, - flowEndSeconds, - flowEndSecondsFromSourceNode, - flowEndSecondsFromDestinationNode, - sourceNodeName, - destinationNodeName, - sourcePodNamespace, - destinationPodNamespace; - ---Create a Materialized View to aggregate data for network policies -CREATE MATERIALIZED VIEW IF NOT EXISTS flows_policy_view_local -ENGINE = ReplicatedSummingMergeTree('/clickhouse/tables/{shard}/{database}/{table}', '{replica}') -ORDER BY ( - timeInserted, - flowEndSeconds, - flowEndSecondsFromSourceNode, - flowEndSecondsFromDestinationNode, - egressNetworkPolicyName, - egressNetworkPolicyNamespace, - egressNetworkPolicyRuleAction, - ingressNetworkPolicyName, - ingressNetworkPolicyNamespace, - ingressNetworkPolicyRuleAction, - sourcePodName, - sourceTransportPort, - sourcePodNamespace, - destinationPodName, - destinationTransportPort, - destinationPodNamespace, - destinationServicePort, - destinationServicePortName, - destinationIP) -TTL timeInserted + INTERVAL {{ .Values.clickhouse.ttl }} -SETTINGS merge_with_ttl_timeout = {{ $ttlTimeout }} -POPULATE -AS SELECT - timeInserted, - flowEndSeconds, - flowEndSecondsFromSourceNode, - flowEndSecondsFromDestinationNode, - egressNetworkPolicyName, - egressNetworkPolicyNamespace, - egressNetworkPolicyRuleAction, - ingressNetworkPolicyName, - ingressNetworkPolicyNamespace, - ingressNetworkPolicyRuleAction, - sourcePodName, - sourceTransportPort, - sourcePodNamespace, - destinationPodName, - destinationTransportPort, - destinationPodNamespace, - destinationServicePort, - destinationServicePortName, - destinationIP, - sum(octetDeltaCount) AS octetDeltaCount, - sum(reverseOctetDeltaCount) AS reverseOctetDeltaCount, - sum(throughput) AS throughput, - sum(reverseThroughput) AS reverseThroughput, - sum(throughputFromSourceNode) AS throughputFromSourceNode, - sum(reverseThroughputFromSourceNode) AS reverseThroughputFromSourceNode, - sum(throughputFromDestinationNode) AS throughputFromDestinationNode, - sum(reverseThroughputFromDestinationNode) AS reverseThroughputFromDestinationNode -FROM flows_local -GROUP BY - timeInserted, - flowEndSeconds, - flowEndSecondsFromSourceNode, - flowEndSecondsFromDestinationNode, - egressNetworkPolicyName, - egressNetworkPolicyNamespace, - egressNetworkPolicyRuleAction, - ingressNetworkPolicyName, - ingressNetworkPolicyNamespace, - ingressNetworkPolicyRuleAction, - sourcePodName, - sourceTransportPort, - sourcePodNamespace, - destinationPodName, - destinationTransportPort, - destinationPodNamespace, - destinationServicePort, - destinationServicePortName, - destinationIP; - ---Create a table to store the network policy recommendation results -CREATE TABLE IF NOT EXISTS recommendations_local ( - id String, - type String, - timeCreated DateTime, - yamls String -) engine=ReplicatedMergeTree('/clickhouse/tables/{shard}/{database}/{table}', '{replica}') -ORDER BY (timeCreated); - ---Move data from old table and drop the old table -INSERT INTO recommendations_local SELECT * FROM recommendations; -DROP TABLE recommendations; diff --git a/build/charts/theia/templates/_helpers.tpl b/build/charts/theia/templates/_helpers.tpl index 3a7d6875..f93305ac 100644 --- a/build/charts/theia/templates/_helpers.tpl +++ b/build/charts/theia/templates/_helpers.tpl @@ -1,6 +1,6 @@ {{- define "clickhouse.monitor.container" }} {{- $clickhouse := .clickhouse }} -{{- $version := .version }} +{{- $Chart := .Chart }} - name: clickhouse-monitor image: {{ include "clickHouseMonitorImage" . | quote }} imagePullPolicy: {{ $clickhouse.monitor.image.pullPolicy }} @@ -36,9 +36,9 @@ {{- define "clickhouse.server.container" }} {{- $clickhouse := .clickhouse }} {{- $enablePV := .enablePV }} -{{- $version := .version }} +{{- $Chart := .Chart }} - name: clickhouse - image: {{ $clickhouse.image.repository }}:{{ $clickhouse.image.tag }} + image: {{ include "clickHouseServerImage" . | quote }} imagePullPolicy: {{ $clickhouse.image.pullPolicy }} volumeMounts: - name: clickhouse-configmap-volume @@ -49,7 +49,19 @@ {{- end }} env: - name: THEIA_VERSION - value: {{ $version }} + value: {{ $Chart.Version }} + - name: DB_URL + value: "localhost:9000" + - name: MIGRATE_USERNAME + valueFrom: + secretKeyRef: + name: clickhouse-secret + key: username + - name: MIGRATE_PASSWORD + valueFrom: + secretKeyRef: + name: clickhouse-secret + key: password {{- end }} {{- define "clickhouse.volume" }} @@ -64,13 +76,13 @@ - key: {{ regexReplaceAll "(.*)/" $path "" }} path: {{ regexReplaceAll "(.*)/" $path "" }} {{- end }} - {{- range $path, $_ := $Files.Glob "provisioning/datasources/migrators/upgrade/*" }} + {{- range $path, $_ := $Files.Glob "provisioning/datasources/migrators/downgrade/*" }} - key: {{ regexReplaceAll "(.*)/" $path "" }} - path: migrators/upgrade/{{ regexReplaceAll "(.*)/" $path "" }} + path: migrators/downgrade/{{ regexReplaceAll "(.*)/" $path "" }} {{- end }} - {{- range $path, $_ := $Files.Glob "provisioning/datasources/migrators/downgrade/*" }} + {{- range $path, $_ := $Files.Glob "provisioning/datasources/migrators/*.sql" }} - key: {{ regexReplaceAll "(.*)/" $path "" }} - path: migrators/downgrade/{{ regexReplaceAll "(.*)/" $path "" }} + path: migrators/{{ regexReplaceAll "(.*)/" $path "" }} {{- end }} {{- if not $enablePV }} - name: clickhouse-storage-volume @@ -80,18 +92,26 @@ {{- end }} {{- end }} -{{- define "clickHouseMonitorImageTag" -}} -{{- if .clickhouse.monitor.image.tag }} -{{- .clickhouse.monitor.image.tag -}} -{{- else if eq .version "latest" }} + +{{- define "theiaImageTag" -}} +{{- $tag := .tag -}} +{{- $Chart := .Chart -}} +{{- if $tag }} +{{- $tag -}} +{{- else if eq $Chart.AppVersion "latest" }} {{- print "latest" -}} {{- else }} -{{- print "v" .version -}} +{{- print "v" $Chart.AppVersion -}} {{- end }} {{- end -}} + {{- define "clickHouseMonitorImage" -}} -{{- print .clickhouse.monitor.image.repository ":" (include "clickHouseMonitorImageTag" .) -}} +{{- print .clickhouse.monitor.image.repository ":" (include "theiaImageTag" (dict "tag" .clickhouse.monitor.image.tag "Chart" .Chart)) -}} +{{- end -}} + +{{- define "clickHouseServerImage" -}} +{{- print .clickhouse.image.repository ":" (include "theiaImageTag" (dict "tag" .clickhouse.image.tag "Chart" .Chart)) -}} {{- end -}} {{- define "theiaManagerImageTag" -}} diff --git a/build/charts/theia/templates/clickhouse/clickhouseinstallation.yaml b/build/charts/theia/templates/clickhouse/clickhouseinstallation.yaml index e4e63e6c..962e6769 100644 --- a/build/charts/theia/templates/clickhouse/clickhouseinstallation.yaml +++ b/build/charts/theia/templates/clickhouse/clickhouseinstallation.yaml @@ -64,7 +64,7 @@ spec: - name: pod-template-without-monitor spec: containers: - {{- include "clickhouse.server.container" (dict "clickhouse" .Values.clickhouse "enablePV" $enablePV "version" .Chart.Version) | indent 12 }} + {{- include "clickhouse.server.container" (dict "clickhouse" .Values.clickhouse "enablePV" $enablePV "Chart" .Chart) | indent 12 }} volumes: {{- include "clickhouse.volume" (dict "clickhouse" .Values.clickhouse "enablePV" $enablePV "Files" .Files) | indent 12 }} {{- if .Values.clickhouse.cluster.podDistribution }} @@ -77,9 +77,9 @@ spec: - name: pod-template spec: containers: - {{- include "clickhouse.server.container" (dict "clickhouse" .Values.clickhouse "enablePV" $enablePV "version" .Chart.Version) | indent 12 }} + {{- include "clickhouse.server.container" (dict "clickhouse" .Values.clickhouse "enablePV" $enablePV "Chart" .Chart) | indent 12 }} {{- if .Values.clickhouse.monitor.enable }} - {{- include "clickhouse.monitor.container" (dict "clickhouse" .Values.clickhouse "version" .Chart.AppVersion) | indent 12 }} + {{- include "clickhouse.monitor.container" (dict "clickhouse" .Values.clickhouse "Chart" .Chart) | indent 12 }} {{- end }} volumes: {{- include "clickhouse.volume" (dict "clickhouse" .Values.clickhouse "enablePV" $enablePV "Files" .Files) | indent 12 }} diff --git a/build/charts/theia/templates/clickhouse/configmap.yaml b/build/charts/theia/templates/clickhouse/configmap.yaml index efa24a51..b75e8ae1 100644 --- a/build/charts/theia/templates/clickhouse/configmap.yaml +++ b/build/charts/theia/templates/clickhouse/configmap.yaml @@ -5,5 +5,5 @@ metadata: namespace: {{ .Release.Namespace }} data: {{ tpl (.Files.Glob "provisioning/datasources/*.sh").AsConfig . | indent 2 }} -{{ tpl (.Files.Glob "provisioning/datasources/migrators/upgrade/*").AsConfig . | indent 2 }} {{ tpl (.Files.Glob "provisioning/datasources/migrators/downgrade/*").AsConfig . | indent 2 }} +{{ tpl (.Files.Glob "provisioning/datasources/migrators/*.sql").AsConfig . | indent 2 }} diff --git a/build/charts/theia/values.yaml b/build/charts/theia/values.yaml index fb49bb5c..823ef6ae 100644 --- a/build/charts/theia/values.yaml +++ b/build/charts/theia/values.yaml @@ -3,7 +3,7 @@ clickhouse: image: repository: "projects.registry.vmware.com/antrea/theia-clickhouse-server" pullPolicy: "IfNotPresent" - tag: "22.6" + tag: "" monitor: # -- Determine whether to run a monitor to periodically check the ClickHouse # memory usage and clean data. diff --git a/build/images/Dockerfile.clickhouse-server.ubuntu b/build/images/Dockerfile.clickhouse-server.ubuntu new file mode 100644 index 00000000..7e1c0688 --- /dev/null +++ b/build/images/Dockerfile.clickhouse-server.ubuntu @@ -0,0 +1,14 @@ +ARG GO_VERSION +FROM golang:${GO_VERSION} as clickhouse-schema-management-build + +COPY . /theia +WORKDIR /theia + +RUN make clickhouse-schema-management-plugin + +FROM docker.io/clickhouse/clickhouse-server:22.6 + +LABEL maintainer="Antrea " +LABEL description="A docker image to deploy the ClickHouse server." + +COPY --from=clickhouse-schema-management-build /theia/bin/clickhouse-schema-management / diff --git a/build/yamls/flow-visibility.yml b/build/yamls/flow-visibility.yml index 681639b9..5befd444 100644 --- a/build/yamls/flow-visibility.yml +++ b/build/yamls/flow-visibility.yml @@ -53,7 +53,10 @@ subjects: --- apiVersion: v1 data: - 0-1-0_0-2-0.sql: | + 0-3-0_0-2-0.sql: | + -- place holder + 000001_0-1-0.down.sql: "" + 000001_0-1-0.up.sql: | --Create a table to store records CREATE TABLE IF NOT EXISTS flows_local ( timeInserted DateTime DEFAULT now(), @@ -117,184 +120,6 @@ data: DROP VIEW flows_node_view; DROP VIEW flows_policy_view; - --Create a Materialized View to aggregate data for pods - CREATE MATERIALIZED VIEW IF NOT EXISTS flows_pod_view_local - ENGINE = ReplicatedSummingMergeTree('/clickhouse/tables/{shard}/{database}/{table}', '{replica}') - ORDER BY ( - timeInserted, - flowEndSeconds, - flowEndSecondsFromSourceNode, - flowEndSecondsFromDestinationNode, - sourcePodName, - destinationPodName, - destinationIP, - destinationServicePort, - destinationServicePortName, - flowType, - sourcePodNamespace, - destinationPodNamespace, - sourceTransportPort, - destinationTransportPort) - TTL timeInserted + INTERVAL 12 HOUR - SETTINGS merge_with_ttl_timeout = 14400 - POPULATE - AS SELECT - timeInserted, - flowEndSeconds, - flowEndSecondsFromSourceNode, - flowEndSecondsFromDestinationNode, - sourcePodName, - destinationPodName, - destinationIP, - destinationServicePort, - destinationServicePortName, - flowType, - sourcePodNamespace, - destinationPodNamespace, - sourceTransportPort, - destinationTransportPort, - sum(octetDeltaCount) AS octetDeltaCount, - sum(reverseOctetDeltaCount) AS reverseOctetDeltaCount, - sum(throughput) AS throughput, - sum(reverseThroughput) AS reverseThroughput, - sum(throughputFromSourceNode) AS throughputFromSourceNode, - sum(throughputFromDestinationNode) AS throughputFromDestinationNode - FROM flows_local - GROUP BY - timeInserted, - flowEndSeconds, - flowEndSecondsFromSourceNode, - flowEndSecondsFromDestinationNode, - sourcePodName, - destinationPodName, - destinationIP, - destinationServicePort, - destinationServicePortName, - flowType, - sourcePodNamespace, - destinationPodNamespace, - sourceTransportPort, - destinationTransportPort; - - --Create a Materialized View to aggregate data for nodes - CREATE MATERIALIZED VIEW IF NOT EXISTS flows_node_view_local - ENGINE = ReplicatedSummingMergeTree('/clickhouse/tables/{shard}/{database}/{table}', '{replica}') - ORDER BY ( - timeInserted, - flowEndSeconds, - flowEndSecondsFromSourceNode, - flowEndSecondsFromDestinationNode, - sourceNodeName, - destinationNodeName, - sourcePodNamespace, - destinationPodNamespace) - TTL timeInserted + INTERVAL 12 HOUR - SETTINGS merge_with_ttl_timeout = 14400 - POPULATE - AS SELECT - timeInserted, - flowEndSeconds, - flowEndSecondsFromSourceNode, - flowEndSecondsFromDestinationNode, - sourceNodeName, - destinationNodeName, - sourcePodNamespace, - destinationPodNamespace, - sum(octetDeltaCount) AS octetDeltaCount, - sum(reverseOctetDeltaCount) AS reverseOctetDeltaCount, - sum(throughput) AS throughput, - sum(reverseThroughput) AS reverseThroughput, - sum(throughputFromSourceNode) AS throughputFromSourceNode, - sum(reverseThroughputFromSourceNode) AS reverseThroughputFromSourceNode, - sum(throughputFromDestinationNode) AS throughputFromDestinationNode, - sum(reverseThroughputFromDestinationNode) AS reverseThroughputFromDestinationNode - FROM flows_local - GROUP BY - timeInserted, - flowEndSeconds, - flowEndSecondsFromSourceNode, - flowEndSecondsFromDestinationNode, - sourceNodeName, - destinationNodeName, - sourcePodNamespace, - destinationPodNamespace; - - --Create a Materialized View to aggregate data for network policies - CREATE MATERIALIZED VIEW IF NOT EXISTS flows_policy_view_local - ENGINE = ReplicatedSummingMergeTree('/clickhouse/tables/{shard}/{database}/{table}', '{replica}') - ORDER BY ( - timeInserted, - flowEndSeconds, - flowEndSecondsFromSourceNode, - flowEndSecondsFromDestinationNode, - egressNetworkPolicyName, - egressNetworkPolicyNamespace, - egressNetworkPolicyRuleAction, - ingressNetworkPolicyName, - ingressNetworkPolicyNamespace, - ingressNetworkPolicyRuleAction, - sourcePodName, - sourceTransportPort, - sourcePodNamespace, - destinationPodName, - destinationTransportPort, - destinationPodNamespace, - destinationServicePort, - destinationServicePortName, - destinationIP) - TTL timeInserted + INTERVAL 12 HOUR - SETTINGS merge_with_ttl_timeout = 14400 - POPULATE - AS SELECT - timeInserted, - flowEndSeconds, - flowEndSecondsFromSourceNode, - flowEndSecondsFromDestinationNode, - egressNetworkPolicyName, - egressNetworkPolicyNamespace, - egressNetworkPolicyRuleAction, - ingressNetworkPolicyName, - ingressNetworkPolicyNamespace, - ingressNetworkPolicyRuleAction, - sourcePodName, - sourceTransportPort, - sourcePodNamespace, - destinationPodName, - destinationTransportPort, - destinationPodNamespace, - destinationServicePort, - destinationServicePortName, - destinationIP, - sum(octetDeltaCount) AS octetDeltaCount, - sum(reverseOctetDeltaCount) AS reverseOctetDeltaCount, - sum(throughput) AS throughput, - sum(reverseThroughput) AS reverseThroughput, - sum(throughputFromSourceNode) AS throughputFromSourceNode, - sum(reverseThroughputFromSourceNode) AS reverseThroughputFromSourceNode, - sum(throughputFromDestinationNode) AS throughputFromDestinationNode, - sum(reverseThroughputFromDestinationNode) AS reverseThroughputFromDestinationNode - FROM flows_local - GROUP BY - timeInserted, - flowEndSeconds, - flowEndSecondsFromSourceNode, - flowEndSecondsFromDestinationNode, - egressNetworkPolicyName, - egressNetworkPolicyNamespace, - egressNetworkPolicyRuleAction, - ingressNetworkPolicyName, - ingressNetworkPolicyNamespace, - ingressNetworkPolicyRuleAction, - sourcePodName, - sourceTransportPort, - sourcePodNamespace, - destinationPodName, - destinationTransportPort, - destinationPodNamespace, - destinationServicePort, - destinationServicePortName, - destinationIP; - --Create a table to store the network policy recommendation results CREATE TABLE IF NOT EXISTS recommendations_local ( id String, @@ -307,8 +132,6 @@ data: --Move data from old table and drop the old table INSERT INTO recommendations_local SELECT * FROM recommendations; DROP TABLE recommendations; - 0-2-0_0-1-0.sql: | - -- place holder create_table.sh: | #!/usr/bin/env bash @@ -590,7 +413,7 @@ data: engine=Distributed('{cluster}', default, recommendations_local, rand()); EOSQL } - init.sh: | + init.sh: |+ #!/usr/bin/env bash # Copyright 2022 Antrea Authors. @@ -611,38 +434,9 @@ data: THIS_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )" - source $THIS_DIR/migrate.sh source $THIS_DIR/create_table.sh - migrate - createTable - setDataVersion - migrate.sh: | - #!/usr/bin/env bash - - # Copyright 2022 Antrea Authors. - # - # Licensed under the Apache License, Version 2.0 (the "License"); - # you may not use this file except in compliance with the License. - # You may obtain a copy of the License at - # - # http://www.apache.org/licenses/LICENSE-2.0 - # - # Unless required by applicable law or agreed to in writing, software - # distributed under the License is distributed on an "AS IS" BASIS, - # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - # See the License for the specific language governing permissions and - # limitations under the License. - - function checkDataVersion { - tables=$(clickhouse client -h 127.0.0.1 -q "SHOW TABLES") - if [[ $tables == *"migrate_version"* ]]; then - dataVersion=$(clickhouse client -h 127.0.0.1 -q "SELECT version FROM migrate_version") - elif [[ $tables == *"flows"* ]]; then - dataVersion="0.1.0" - fi - } - + # This function is kept to be compatible with version below v0.3.0 function setDataVersion { tables=$(clickhouse client -h 127.0.0.1 -q "SHOW TABLES") if [[ $tables == *"migrate_version"* ]]; then @@ -654,98 +448,10 @@ data: echo "=== Set data schema version to 0.3.0 ===" } - function addVersionsToList { - if [[ "$versionListStr" != *"$1"* ]]; then - versionListStr+="$1," - fi - } - - function getMigrationPath { - # Get versions based on the SQL file names - versionListStr="" - for fileName in $(ls "$migratorDir/upgrade") - do - # fileName upgrading from v0.1.0 to v0.2.0: 0-1-0_0-2-0.sql - fileName=$(basename $fileName .sql) - versionPair=(${fileName//_/ }) - addVersionsToList ${versionPair[0]//-/.} - addVersionsToList ${versionPair[1]//-/.} - done - addVersionsToList $dataVersion - addVersionsToList $theiaVersion - # Sort the versions to generate the migration path - versionList=(${versionListStr//,/ }) - old_IFS=$IFS - IFS=$'\n' sortedversionList=($(sort -V <<<"${versionList[*]}")) - IFS=$old_IFS - # Define the upgrading/downgrading path - index=0 - prev="" - for version in "${sortedversionList[@]}" - do - migratePathStr+="$index:$version, " - index=$((index+1)) - if [[ -z "$prev" ]]; then - prev=${version//./-} - else - curr=${version//./-} - UPGRADING_MIGRATORS+=("${prev}_${curr}.sql") - DOWNGRADING_MIGRATORS+=("${curr}_${prev}.sql") - prev=$curr - fi - done - } - - function version_lt() { test "$(printf '%s\n' "$@" | sort -rV | head -n 1)" != "$1"; } - - function migrate { - THIS_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )" - migratorDir="/var/lib/clickhouse/migrators" - mkdir -p $migratorDir - - checkDataVersion - if [[ -z "$dataVersion" ]]; then - cp -r $THIS_DIR/migrators/* $migratorDir - echo "=== No existing data schema. Migration skipped. ===" - return 0 - fi - if [[ -z "$THEIA_VERSION" ]]; then - echo "=== Unable to load the environment variable THEIA_VERSION. Migration failed. ===" - exit 1 - fi - theiaVersion=$THEIA_VERSION - - migratePathStr="" - getMigrationPath - - cp -r $THIS_DIR/migrators/* $migratorDir - - dataVersionIndex=$(echo $migratePathStr | tr ', ' '\n' | grep "$dataVersion" | sed 's/:/ /g' | awk '{print $1}') - theiaVersionIndex=$(echo $migratePathStr | tr ', ' '\n' | grep "$theiaVersion" | sed 's/:/ /g' | awk '{print $1}') + ../clickhouse-schema-management + createTable + setDataVersion - # Update along the path - if [[ "$dataVersionIndex" -lt "$theiaVersionIndex" ]]; then - for i in $(seq $dataVersionIndex $((theiaVersionIndex-1)) ); - do - if test -f "$migratorDir/upgrade/${UPGRADING_MIGRATORS[$i]}"; then - echo "=== Apply file ${UPGRADING_MIGRATORS[$i]} ===" - clickhouse client -h 127.0.0.1 --queries-file $migratorDir/upgrade/${UPGRADING_MIGRATORS[$i]} - fi - done - # Downgrade along the path - elif [[ "$dataVersionIndex" -gt "$theiaVersionIndex" ]]; then - for i in $(seq $((dataVersionIndex-1)) -1 $theiaVersionIndex); - do - if test -f "$migratorDir/downgrade/${DOWNGRADING_MIGRATORS[$i]}"; then - echo "=== Apply file ${DOWNGRADING_MIGRATORS[$i]} ===" - clickhouse client -h 127.0.0.1 --queries-file $migratorDir/downgrade/${DOWNGRADING_MIGRATORS[$i]} - fi - done - else - echo "=== Data schema version is the same as Theia version. Migration finished. ===" - fi - setDataVersion - } kind: ConfigMap metadata: name: clickhouse-mounted-configmap @@ -6534,7 +6240,19 @@ spec: - env: - name: THEIA_VERSION value: 0.3.0 - image: projects.registry.vmware.com/antrea/theia-clickhouse-server:22.6 + - name: DB_URL + value: localhost:9000 + - name: MIGRATE_USERNAME + valueFrom: + secretKeyRef: + key: username + name: clickhouse-secret + - name: MIGRATE_PASSWORD + valueFrom: + secretKeyRef: + key: password + name: clickhouse-secret + image: projects.registry.vmware.com/antrea/theia-clickhouse-server:latest imagePullPolicy: IfNotPresent name: clickhouse volumeMounts: @@ -6579,12 +6297,12 @@ spec: path: create_table.sh - key: init.sh path: init.sh - - key: migrate.sh - path: migrate.sh - - key: 0-1-0_0-2-0.sql - path: migrators/upgrade/0-1-0_0-2-0.sql - - key: 0-2-0_0-1-0.sql - path: migrators/downgrade/0-2-0_0-1-0.sql + - key: 0-3-0_0-2-0.sql + path: migrators/downgrade/0-3-0_0-2-0.sql + - key: 000001_0-1-0.down.sql + path: migrators/000001_0-1-0.down.sql + - key: 000001_0-1-0.up.sql + path: migrators/000001_0-1-0.up.sql name: clickhouse-mounted-configmap name: clickhouse-configmap-volume - emptyDir: diff --git a/ci/jenkins/test-vmc.sh b/ci/jenkins/test-vmc.sh index 7b6f2424..efffc24f 100644 --- a/ci/jenkins/test-vmc.sh +++ b/ci/jenkins/test-vmc.sh @@ -360,17 +360,16 @@ function deliver_antrea { docker pull antrea/flow-aggregator:latest docker pull projects.registry.vmware.com/antrea/theia-spark-operator:v1beta2-1.3.3-3.1.1 docker pull projects.registry.vmware.com/antrea/theia-zookeeper:3.8.0 - docker pull projects.registry.vmware.com/antrea/theia-clickhouse-server:22.6 docker save -o antrea-ubuntu.tar antrea/antrea-ubuntu:latest docker save -o flow-aggregator.tar antrea/flow-aggregator:latest docker save -o theia-spark-operator.tar projects.registry.vmware.com/antrea/theia-spark-operator:v1beta2-1.3.3-3.1.1 docker save -o theia-zookeeper.tar projects.registry.vmware.com/antrea/theia-zookeeper:3.8.0 - docker save -o theia-clickhouse-server.tar projects.registry.vmware.com/antrea/theia-clickhouse-server:22.6 - (cd $GIT_CHECKOUT_DIR && make policy-recommendation && make clickhouse-monitor) + (cd $GIT_CHECKOUT_DIR && make policy-recommendation && make clickhouse-monitor && make clickhouse-server) docker save -o theia-policy-recommendation.tar projects.registry.vmware.com/antrea/theia-policy-recommendation:latest docker save -o theia-clickhouse-monitor.tar projects.registry.vmware.com/antrea/theia-clickhouse-monitor:latest + docker save -o theia-clickhouse-server.tar projects.registry.vmware.com/antrea/theia-clickhouse-server:latest # not sure the exact image tag, so read from yaml # and we assume the image tag is the same for all images in this yaml @@ -389,13 +388,13 @@ function deliver_antrea { ssh-keygen -f "/var/lib/jenkins/.ssh/known_hosts" -R ${IPs[$i]} copy_image antrea-ubuntu.tar docker.io/antrea/antrea-ubuntu ${IPs[$i]} latest true copy_image flow-aggregator.tar docker.io/antrea/flow-aggregator ${IPs[$i]} latest true - copy_image theia-clickhouse-server.tar projects.registry.vmware.com/antrea/theia-clickhouse-server ${IPs[$i]} 22.6 true copy_image theia-clickhouse-operator.tar projects.registry.vmware.com/antrea/theia-clickhouse-operator ${IPs[$i]} $image_tag true copy_image theia-metrics-exporter.tar projects.registry.vmware.com/antrea/theia-metrics-exporter ${IPs[$i]} $image_tag true copy_image theia-zookeeper.tar projects.registry.vmware.com/antrea/theia-zookeeper ${IPs[$i]} 3.8.0 true copy_image theia-spark-operator.tar projects.registry.vmware.com/antrea/theia-spark-operator ${IPs[$i]} v1beta2-1.3.3-3.1.1 true copy_image theia-policy-recommendation.tar projects.registry.vmware.com/antrea/theia-policy-recommendation ${IPs[$i]} latest true copy_image theia-clickhouse-monitor.tar projects.registry.vmware.com/antrea/theia-clickhouse-monitor ${IPs[$i]} latest true + copy_image theia-clickhouse-server.tar projects.registry.vmware.com/antrea/theia-clickhouse-server ${IPs[$i]} latest true done } diff --git a/ci/kind/test-e2e-kind.sh b/ci/kind/test-e2e-kind.sh index 13bd4c6a..fdaf9a80 100755 --- a/ci/kind/test-e2e-kind.sh +++ b/ci/kind/test-e2e-kind.sh @@ -111,7 +111,6 @@ COMMON_IMAGES_LIST=("k8s.gcr.io/e2e-test-images/agnhost:2.29" \ "antrea/flow-aggregator:latest" \ "projects.registry.vmware.com/antrea/theia-clickhouse-operator:0.18.2" \ "projects.registry.vmware.com/antrea/theia-metrics-exporter:0.18.2" \ - "projects.registry.vmware.com/antrea/theia-clickhouse-server:22.6" \ "projects.registry.vmware.com/antrea/theia-zookeeper:3.8.0" \ "projects.registry.vmware.com/antrea/theia-grafana:8.3.3" \ "projects.registry.vmware.com/antrea/theia-spark-operator:v1beta2-1.3.3-3.1.1") @@ -124,7 +123,8 @@ for image in "${COMMON_IMAGES_LIST[@]}"; do done COMMON_IMAGES_LIST+=("projects.registry.vmware.com/antrea/theia-policy-recommendation:latest"\ - "projects.registry.vmware.com/antrea/theia-clickhouse-monitor:latest") + "projects.registry.vmware.com/antrea/theia-clickhouse-monitor:latest"\ + "projects.registry.vmware.com/antrea/theia-clickhouse-server:latest") printf -v COMMON_IMAGES "%s " "${COMMON_IMAGES_LIST[@]}" function setup_cluster { diff --git a/ci/kind/test-upgrade-theia.sh b/ci/kind/test-upgrade-theia.sh index 80a432e6..d05e7bf7 100755 --- a/ci/kind/test-upgrade-theia.sh +++ b/ci/kind/test-upgrade-theia.sh @@ -143,17 +143,24 @@ fi echo "Running upgrade test for tag $THEIA_FROM_TAG" ANTREA_FROM_TAG=$(grep "$THEIA_FROM_TAG" < $ROOT_DIR/VERSION_MAP | awk '{print $2}') +# From v0.3.0, ClickHouse Image is labeled with Theia version +if [[ $THEIA_FROM_TAG == "v0.2.0" || $THEIA_FROM_TAG == "v0.1.0" ]]; then + CLICKHOUSE_FROM_TAG="21.11" +else + CLICKHOUSE_FROM_TAG=$THEIA_FROM_TAG +fi + DOCKER_IMAGES=("k8s.gcr.io/e2e-test-images/agnhost:2.29" \ "projects.registry.vmware.com/antrea/busybox" \ "projects.registry.vmware.com/antrea/nginx:1.21.6-alpine" \ "projects.registry.vmware.com/antrea/perftool" \ "projects.registry.vmware.com/antrea/theia-clickhouse-operator:0.18.2" \ "projects.registry.vmware.com/antrea/theia-metrics-exporter:0.18.2" \ - "projects.registry.vmware.com/antrea/theia-clickhouse-server:22.6" \ "projects.registry.vmware.com/antrea/theia-zookeeper:3.8.0" \ "projects.registry.vmware.com/antrea/theia-grafana:8.3.3" \ "projects.registry.vmware.com/antrea/antrea-ubuntu:$ANTREA_FROM_TAG" \ "projects.registry.vmware.com/antrea/theia-clickhouse-monitor:$THEIA_FROM_TAG" \ + "projects.registry.vmware.com/antrea/theia-clickhouse-server:$CLICKHOUSE_FROM_TAG" \ "antrea/antrea-ubuntu:latest") for img in "${DOCKER_IMAGES[@]}"; do @@ -164,7 +171,8 @@ for img in "${DOCKER_IMAGES[@]}"; do done done -DOCKER_IMAGES+=("projects.registry.vmware.com/antrea/theia-clickhouse-monitor:latest") +DOCKER_IMAGES+=("projects.registry.vmware.com/antrea/theia-clickhouse-monitor:latest\ + projects.registry.vmware.com/antrea/theia-clickhouse-server:latest") echo "Creating Kind cluster" IMAGES="${DOCKER_IMAGES[@]}" diff --git a/go.mod b/go.mod index 7de11a6a..0446660b 100644 --- a/go.mod +++ b/go.mod @@ -59,6 +59,7 @@ require ( github.com/go-openapi/jsonreference v0.19.5 // indirect github.com/go-openapi/swag v0.19.14 // indirect github.com/gogo/protobuf v1.3.2 // indirect + github.com/golang-migrate/migrate v3.5.4+incompatible github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect github.com/golang/protobuf v1.5.2 // indirect github.com/google/gnostic v0.5.7-v3refs // indirect diff --git a/go.sum b/go.sum index 3f0260e3..86cc1a87 100644 --- a/go.sum +++ b/go.sum @@ -266,6 +266,8 @@ github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zV github.com/gogo/protobuf v1.3.1/go.mod h1:SlYgWuQ5SjCEi6WLHjHCa1yvBfUnHcTbrrZtXPKa29o= github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= +github.com/golang-migrate/migrate v3.5.4+incompatible h1:R7OzwvCJTCgwapPCiX6DyBiu2czIUMDCB118gFTKTUA= +github.com/golang-migrate/migrate v3.5.4+incompatible/go.mod h1:IsVUlFN5puWOmXrqjgGUfIRIbU7mr8oNBE2tyERd9Wk= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= github.com/golang/groupcache v0.0.0-20190129154638-5b532d6fd5ef/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= diff --git a/hack/generate-manifest.sh b/hack/generate-manifest.sh index 4869c0c4..5efc2344 100755 --- a/hack/generate-manifest.sh +++ b/hack/generate-manifest.sh @@ -143,6 +143,11 @@ if [ "$MODE" == "dev" ] && [ -n "$IMG_NAME" ]; then fi if [ "$MODE" == "release" ]; then HELM_VALUES+=("clickhouse.monitor.image.repository=$IMG_NAME" "clickhouse.monitor.image.tag=$IMG_TAG") + if [ $IMG_TAG == "v0.1.0" ] || [ $IMG_TAG == "v0.2.0" ]; then + HELM_VALUES+=("clickhouse.image.tag=21.11") + else + HELM_VALUES+=("clickhouse.image.tag=$IMG_TAG") + fi fi if [ "$MODE" == "antrea-e2e" ]; then HELM_VALUES+=("grafana.enable=false" "clickhouse.monitor.enable=false") diff --git a/plugins/clickhouse-schema-management/main.go b/plugins/clickhouse-schema-management/main.go new file mode 100644 index 00000000..10356db9 --- /dev/null +++ b/plugins/clickhouse-schema-management/main.go @@ -0,0 +1,338 @@ +// Copyright 2022 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "database/sql" + "fmt" + "os" + "os/exec" + "strconv" + "strings" + "time" + + "github.com/ClickHouse/clickhouse-go" + "github.com/golang-migrate/migrate" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/klog/v2" + + _ "github.com/golang-migrate/migrate/database/clickhouse" + _ "github.com/golang-migrate/migrate/source/file" +) + +var ( + versionMap = make(map[string]int) + migratorTmpPath = "/docker-entrypoint-initdb.d/migrators" + migratorPersistentPath = "/var/lib/clickhouse/migrators" + clickHouseURL string + execCommand = exec.Command + readDir = os.ReadDir + getEnv = os.Getenv + openSql = sql.Open + newMigrate = migrate.New +) + +func main() { + clickhouseMigrate, err := initMigration() + if err != nil { + klog.ErrorS(err, "Error when initializing migration") + } + defer clickhouseMigrate.Close() + if err := startMigration(clickhouseMigrate); err != nil { + klog.ErrorS(err, "Error when migrating") + } +} + +func initMigration() (*migrate.Migrate, error) { + // Copy migrators from tmp path to persistent path for the downgrading usage in the future + if err := copyMigrators(); err != nil { + return nil, fmt.Errorf("error when copying migrators: %v", err) + } + // versionMap is used to map Theia version string to golang-migrate version number + if err := initializeVersionMap(); err != nil { + return nil, fmt.Errorf("error when generating version number map: %v", err) + } + userName := getEnv("MIGRATE_USERNAME") + password := getEnv("MIGRATE_PASSWORD") + databaseURL := getEnv("DB_URL") + if len(userName) == 0 || len(password) == 0 || len(databaseURL) == 0 { + return nil, fmt.Errorf("unable to load environment variables, MIGRATE_USERNAME, MIGRATE_PASSWORD and DB_URL must be defined") + } + clickHouseURL = fmt.Sprintf("%s?username=%s&password=%s", databaseURL, userName, password) + migrateDatabaseURL := fmt.Sprintf("clickhouse://%s&x-multi-statement=true", clickHouseURL) + migrateSourceURL := fmt.Sprintf("file://%s", migratorPersistentPath) + clickhouseMigrate, err := newMigrate(migrateSourceURL, migrateDatabaseURL) + if err != nil { + return nil, fmt.Errorf("error when creating a Migrate instance for ClickHouse: %v", err) + } + return clickhouseMigrate, nil +} + +// Get Theia version and data version number, migrate if they are different +func startMigration(clickhouseMigrate *migrate.Migrate) error { + theiaVersionNumber, err := getTheiaVerstionNumber() + if err != nil { + return fmt.Errorf("error when getting Theia version: %v", err) + } + dataVersionNumber, err := getDataVersionNumber(*clickhouseMigrate) + if err != nil { + return fmt.Errorf("error when getting the data version: %v", err) + } + if theiaVersionNumber == dataVersionNumber { + klog.InfoS("Data schema version is the same as Theia version. Migration skipped.") + } else if dataVersionNumber == -1 { + klog.InfoS("No existing data schema. Migration skipped.") + } else { + klog.InfoS("Migrate data schema", "from", dataVersionNumber, "to", theiaVersionNumber) + err = clickhouseMigrate.Steps(theiaVersionNumber - dataVersionNumber) + if err != nil { + return fmt.Errorf("error when applying migrations: %v", err) + } + } + // Set the data schema version to Theia version anyway, as we expect initial + // data will be created even if the migration is skipped. + err = clickhouseMigrate.Force(theiaVersionNumber) + if err != nil { + return fmt.Errorf("error when setting version: %v", err) + } + return nil +} + +func copyMigrators() error { + bashCmd := fmt.Sprintf("mkdir -p %s", migratorPersistentPath) + cmd := execCommand("bash", "-c", bashCmd) + if err := cmd.Run(); err != nil { + return fmt.Errorf("error when creating folder: %s, error: %v", bashCmd, err) + } + bashCmd = fmt.Sprintf("cp -r %s/* %s", migratorTmpPath, migratorPersistentPath) + cmd = execCommand("bash", "-c", bashCmd) + if err := cmd.Run(); err != nil { + return fmt.Errorf("error when copying files: %s, error: %v", bashCmd, err) + } + return nil +} + +// Use the file names to map the Theia version string to golang-migrate version number +func initializeVersionMap() error { + files, err := readDir(migratorPersistentPath) + if err != nil { + return fmt.Errorf("unable to get files in folder migrators: %v", err) + } + for _, file := range files { + if file.IsDir() { + continue + } + // fileName example: 000001_0-1-0.down.sql + fileNameArr := strings.Split(strings.Split(file.Name(), ".")[0], "_") + versionNumber, err := strconv.ParseInt(fileNameArr[0], 10, 64) + if err != nil { + return fmt.Errorf("error when parsing the version number: %v", err) + } + version := strings.Replace(fileNameArr[1], "-", ".", -1) + // File _.up.sql is expected to be + // applied when upgrading from . + // golang-migrate applies this file when upgrading from - 1. + versionMap[version] = int(versionNumber - 1) + } + return nil +} + +// Get Theia version based on the environment variable +func getTheiaVerstionNumber() (int, error) { + theiaVersion := getEnv("THEIA_VERSION") + if len(theiaVersion) == 0 { + return 0, fmt.Errorf("unable to load environment variables, THEIA_VERSION must be defined") + } + theiaVersionNumber, err := getVersionNumber(theiaVersion) + if err != nil { + return theiaVersionNumber, fmt.Errorf("error when getting theia version number for %s: %v", theiaVersion, err) + } + return theiaVersionNumber, nil +} + +// From v0.3, get data version based on version table +// For v0.1 and v0.2, determine version based on tables in database +func getDataVersionNumber(clickhouseMigrate migrate.Migrate) (int, error) { + var version int + // Get data version for version before v0.3 + versionStr, err := getDataVersionBasedOnTables() + if err != nil { + return 0, fmt.Errorf("error when getting data version based on tables: %v", err) + } + if versionStr != "" { + version, err = getVersionNumber(versionStr) + if err != nil { + return version, fmt.Errorf("error when getting version number for %s: %v", versionStr, err) + } + if version > 0 { + // Set data version for golang-migrate tool + err = clickhouseMigrate.Force(version) + if err != nil { + return 0, fmt.Errorf("error when collarating data version: %v", err) + } + } + return version, nil + } + // Get data version for version after v0.3 + uintVersion, _, err := clickhouseMigrate.Version() + if err != nil && err != migrate.ErrNilVersion { + return version, fmt.Errorf("error when getting migration version: %v", err) + } + // No data schema created before + if err == migrate.ErrNilVersion { + return -1, nil + } + version = int(uintVersion) + return version, nil +} + +func getVersionNumber(version string) (int, error) { + versionNumber, ok := versionMap[version] + if !ok { + var err error + // In case data schema does not change between version A and B (assuming B is the later version) + // we will not have file xxx_A.up.sql and xxx_A.down.sql + // In migration, we can treat version A the same as version C, + // which is the first version after A having data schema changes comparing its next version. + versionNumber, err = roundUpVersion(version) + if err != nil { + return versionNumber, fmt.Errorf("error when rounding up version: %v", err) + } + } + return versionNumber, nil +} + +func roundUpVersion(version string) (int, error) { + var versionNumber int + for key, value := range versionMap { + less, err := versionLessThan(key, version) + if err != nil { + return versionNumber, fmt.Errorf("error when comparing version %s and %s: %v", key, version, err) + } + if less && (versionNumber < value+1) { + versionNumber = value + 1 + } + } + return versionNumber, nil +} + +// Return true if version a is earlier than version b. +func versionLessThan(a, b string) (bool, error) { + as := strings.Split(a, ".") + bs := strings.Split(b, ".") + for i := 0; i < len(as); i++ { + xi, err := strconv.Atoi(as[i]) + if err != nil { + return false, fmt.Errorf("error when parsing version %s: %v", a, err) + } + yi, err := strconv.Atoi(bs[i]) + if err != nil { + return false, fmt.Errorf("error when parsing version %s: %v", b, err) + } + if xi == yi { + continue + } + return (xi < yi), nil + } + return false, nil +} + +// go-migrate is introduced in Theia v0.3. It cannot distinguish versions before v0.3 automatically. +// If table flows_local is created, the data version is v0.2. +// If only table flows_local does not exist while table flows is created, the data version is v0.1. +// If non of these two tables are found, it means no data schema has been created before. +func getDataVersionBasedOnTables() (string, error) { + // Query to ClickHouse time out if it fails for 10 seconds. + queryTimeout := 10 * time.Second + // Retry query to ClickHouse every second if it fails. + queryRetryInterval := 1 * time.Second + + connect, err := connectClickHouse() + if err != nil { + return "", fmt.Errorf("error when connecting to ClickHouse: %v", err) + } + var tableName string + var version string + command := "SHOW TABLES" + var containsNewVersionTable, containsFlows bool + if err := wait.PollImmediate(queryRetryInterval, queryTimeout, func() (bool, error) { + rows, err := connect.Query(command) + if err != nil { + return false, nil + } else { + for rows.Next() { + if err := rows.Scan(&tableName); err != nil { + return false, nil + } + klog.Info(tableName) + if tableName == "migrate_version" { + var versionFromOldTable string + err = connect.QueryRow("SELECT * FROM migrate_version").Scan(&versionFromOldTable) + if err != nil { + return false, nil + } + if versionFromOldTable == "0.2.0" { + version = versionFromOldTable + } + } + if tableName == "flows" { + containsFlows = true + } + if tableName == "schema_migrations" { + containsNewVersionTable = true + } + } + return true, nil + } + }); err != nil { + return version, err + } + if containsFlows && !containsNewVersionTable { + version = "0.1.0" + } + return version, nil +} + +func connectClickHouse() (*sql.DB, error) { + var connect *sql.DB + var connErr error + connRetryInterval := 1 * time.Second + connTimeout := 10 * time.Second + + // Connect to ClickHouse in a loop + if err := wait.PollImmediate(connRetryInterval, connTimeout, func() (bool, error) { + // Open the database and ping it + var err error + url := fmt.Sprintf("tcp://%s", clickHouseURL) + connect, err = openSql("clickhouse", url) + if err != nil { + connErr = fmt.Errorf("failed to open ClickHouse: %v", err) + return false, nil + } + if err := connect.Ping(); err != nil { + if exception, ok := err.(*clickhouse.Exception); ok { + connErr = fmt.Errorf("failed to ping ClickHouse: %v", exception.Message) + } else { + connErr = fmt.Errorf("failed to ping ClickHouse: %v", err) + } + return false, nil + } else { + return true, nil + } + }); err != nil { + return nil, fmt.Errorf("failed to connect to ClickHouse after %s: %v", connTimeout, connErr) + } + return connect, nil +} diff --git a/plugins/clickhouse-schema-management/main_test.go b/plugins/clickhouse-schema-management/main_test.go new file mode 100644 index 00000000..42462bf1 --- /dev/null +++ b/plugins/clickhouse-schema-management/main_test.go @@ -0,0 +1,316 @@ +// Copyright 2022 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "bytes" + "database/sql" + "fmt" + "io" + "io/fs" + "os" + "os/exec" + "strings" + "testing" + + "github.com/DATA-DOG/go-sqlmock" + "github.com/golang-migrate/migrate" + "github.com/golang-migrate/migrate/database" + dStub "github.com/golang-migrate/migrate/database/stub" + "github.com/golang-migrate/migrate/source" + sStub "github.com/golang-migrate/migrate/source/stub" + "github.com/stretchr/testify/assert" +) + +// fakeDirEntry implements os.DirEntry interface +type fakeDirEntry struct { + name string + isDir bool +} + +func (f fakeDirEntry) Name() string { + return f.name +} +func (f fakeDirEntry) IsDir() bool { + return f.isDir +} +func (f fakeDirEntry) Type() fs.FileMode { + return 0 +} +func (f fakeDirEntry) Info() (fs.FileInfo, error) { + return nil, nil +} + +type migrationSequence []*migrate.Migration + +func (m *migrationSequence) bodySequence() []string { + r := make([]string, 0) + for _, v := range *m { + if v.Body != nil { + body, err := io.ReadAll(v.Body) + if err != nil { + panic(err) + } + v.Body = io.NopCloser(bytes.NewReader(body)) + + r = append(r, string(body[:])) + } else { + r = append(r, "") + } + } + return r +} + +// mr is a convenience func to create a new *Migration from the raw database query +func mr(value string) *migrate.Migration { + return &migrate.Migration{ + Body: io.NopCloser(strings.NewReader(value)), + } +} + +var ( + sourceInstance source.Driver + databaseInstance database.Driver + fakeExecCommand = func(name string, arg ...string) *exec.Cmd { + cmdArr := []string{name} + cmdArr = append(cmdArr, arg...) + return exec.Command("echo", cmdArr...) + } + + fakeReadDir = func(name string) ([]os.DirEntry, error) { + fileUpEntry1 := fakeDirEntry{name: "000001_0-1-0.up.sql", isDir: false} + fileDownEntry1 := fakeDirEntry{name: "000001_0-1-0.down.sql", isDir: false} + fileUpEntry2 := fakeDirEntry{name: "000002_0-3-0.up.sql", isDir: false} + fileDownEntry2 := fakeDirEntry{name: "000002_0-3-0.down.sql", isDir: false} + fileUpEntry3 := fakeDirEntry{name: "000003_0-5-0.up.sql", isDir: false} + fileDownEntry3 := fakeDirEntry{name: "000003_0-5-0.down.sql", isDir: false} + folderEntry := fakeDirEntry{name: "folder", isDir: true} + return []os.DirEntry{fileUpEntry1, fileDownEntry1, fileUpEntry2, fileDownEntry2, fileUpEntry3, fileDownEntry3, folderEntry}, nil + } + + fakeGetEnv = func(key string) string { + switch key { + case "MIGRATE_USERNAME": + return "username" + case "MIGRATE_PASSWORD": + return "password" + case "DB_URL": + return "localhost:9000" + case "THEIA_VERSION": + return "0.4.0" + default: + return "" + } + } + + fakeNewMigrate = func(sourceURL, databaseURL string) (*migrate.Migrate, error) { + sourceStubMigrations := source.NewMigrations() + sourceStubMigrations.Append(&source.Migration{Version: 1, Direction: source.Up, Identifier: "CREATE 1"}) + sourceStubMigrations.Append(&source.Migration{Version: 1, Direction: source.Down, Identifier: "DROP 1"}) + sourceStubMigrations.Append(&source.Migration{Version: 2, Direction: source.Up, Identifier: "CREATE 2"}) + sourceStubMigrations.Append(&source.Migration{Version: 2, Direction: source.Down, Identifier: "DROP 2"}) + sourceStubMigrations.Append(&source.Migration{Version: 3, Direction: source.Up, Identifier: "CREATE 3"}) + sourceStubMigrations.Append(&source.Migration{Version: 3, Direction: source.Down, Identifier: "DROP 3"}) + sourceInstance.(*sStub.Stub).Migrations = sourceStubMigrations + + return migrate.NewWithInstance("stub://", sourceInstance, "stub://", databaseInstance) + } +) + +func TestSchemaManagement(t *testing.T) { + execCommand = fakeExecCommand + readDir = fakeReadDir + getEnv = fakeGetEnv + newMigrate = fakeNewMigrate + checkMigrations(t) + checkErrorMsg(t) +} + +func checkMigrations(t *testing.T) { + testcases := []struct { + name string + ms migrationSequence + setDataVersion func() + showTablesRows *sqlmock.Rows + oldVersionTablesRow *sqlmock.Rows + }{ + { + name: "No existing data schema", + ms: migrationSequence{}, + setDataVersion: func() {}, + showTablesRows: sqlmock.NewRows([]string{"table"}), + }, + { + name: "Upgrading from v0.1.0 to v0.4.0", + ms: migrationSequence{mr("CREATE 1"), mr("CREATE 2")}, + setDataVersion: func() {}, + showTablesRows: sqlmock.NewRows([]string{"table"}).AddRow("flows"), + }, + { + name: "Upgrading from v0.2.0 to v0.4.0", + ms: migrationSequence{mr("CREATE 2")}, + setDataVersion: func() {}, + showTablesRows: sqlmock.NewRows([]string{"table"}).AddRow("flows").AddRow("migrate_version").AddRow("schema_migrations"), + oldVersionTablesRow: sqlmock.NewRows([]string{"version"}).AddRow("0.2.0"), + }, + { + name: "Downgrading from v0.6.0 to v0.4.0", + ms: migrationSequence{mr("DROP 3")}, + showTablesRows: sqlmock.NewRows([]string{"table"}).AddRow("schema_migrations"), + setDataVersion: func() { + databaseInstance.SetVersion(3, false) + }, + }, + { + name: "No migration", + ms: migrationSequence{}, + showTablesRows: sqlmock.NewRows([]string{"table"}).AddRow("flows").AddRow("schema_migrations"), + setDataVersion: func() { + databaseInstance.SetVersion(2, false) + }, + }, + } + + for _, tc := range testcases { + t.Run(tc.name, func(t *testing.T) { + openSql = func(driverName, dataSourceName string) (*sql.DB, error) { + db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual), sqlmock.MonitorPingsOption(true)) + if err != nil { + return db, err + } + mock.ExpectPing() + mock.ExpectQuery("SHOW TABLES").WillReturnRows(tc.showTablesRows) + if tc.oldVersionTablesRow != nil { + mock.ExpectQuery("SELECT * FROM migrate_version").WillReturnRows(tc.oldVersionTablesRow) + } + return db, err + } + var err error + sourceInstance, err = source.Open("stub://") + assert.NoError(t, err, "error when creating stub source migrate") + databaseInstance, err = database.Open("stub://") + assert.NoError(t, err, "error when creating stub database migrate") + clickhouseMigrate, err := initMigration() + assert.NoErrorf(t, err, "error when initializing migrator: %v", err) + assert.Equalf(t, 0, versionMap["0.1.0"], "version 0.1.0 should map to version 0") + assert.Equalf(t, 1, versionMap["0.3.0"], "version 0.3.0 should map to version 1") + assert.Equalf(t, 2, versionMap["0.5.0"], "version 0.5.0 should map to version 2") + tc.setDataVersion() + err = startMigration(clickhouseMigrate) + assert.NoError(t, err, "error when migrating: %v", err) + bs := tc.ms.bodySequence() + assert.True(t, databaseInstance.(*dStub.Stub).EqualSequence(bs), "error in migration sequence") + }) + } +} + +func checkErrorMsg(t *testing.T) { + testcases := []struct { + name string + execCommand func(name string, arg ...string) *exec.Cmd + readDir func(name string) ([]fs.DirEntry, error) + getEnv func(key string) string + newMigrate func(sourceURL string, databaseURL string) (*migrate.Migrate, error) + initExpectedErrorMsg string + startExpectedErrorMsg string + }{ + { + name: "Environment variable not set", + getEnv: func(key string) string { return "" }, + initExpectedErrorMsg: "unable to load environment variables, MIGRATE_USERNAME, MIGRATE_PASSWORD and DB_URL must be defined", + }, + { + name: "Fail to create migration instance", + newMigrate: func(sourceURL, databaseURL string) (*migrate.Migrate, error) { + return nil, fmt.Errorf("") + }, + initExpectedErrorMsg: "error when creating a Migrate instance for ClickHouse: ", + }, + { + name: "Fail to read directory", + readDir: func(name string) ([]fs.DirEntry, error) { + return nil, fmt.Errorf("") + }, + initExpectedErrorMsg: "error when generating version number map: unable to get files in folder migrators: ", + }, + { + name: "Wrong migrator file name", + readDir: func(name string) ([]fs.DirEntry, error) { + file := fakeDirEntry{name: "0-1-0.up.sql", isDir: false} + return []os.DirEntry{file}, nil + }, + initExpectedErrorMsg: "error when generating version number map: error when parsing the version number: ", + }, + { + name: "Invalid theia version", + getEnv: func(key string) string { + if key == "THEIA_VERSION" { + return "v0.1.0" + } else { + return fakeGetEnv(key) + } + }, + startExpectedErrorMsg: "error when getting Theia version: error when getting theia version number for v0.1.0: error when rounding up version: ", + }, + } + + for _, tc := range testcases { + t.Run(tc.name, func(t *testing.T) { + execCommand = fakeExecCommand + readDir = fakeReadDir + getEnv = fakeGetEnv + newMigrate = fakeNewMigrate + if tc.execCommand != nil { + execCommand = tc.execCommand + } + if tc.readDir != nil { + readDir = tc.readDir + } + if tc.getEnv != nil { + getEnv = tc.getEnv + } + if tc.newMigrate != nil { + newMigrate = tc.newMigrate + } + openSql = func(driverName, dataSourceName string) (*sql.DB, error) { + db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual), sqlmock.MonitorPingsOption(true)) + if err != nil { + return db, err + } + mock.ExpectPing() + mock.ExpectQuery("SHOW TABLES") + return db, err + } + var err error + sourceInstance, err = source.Open("stub://") + assert.NoError(t, err, "error when creating stub source migrate") + databaseInstance, err = database.Open("stub://") + assert.NoError(t, err, "error when creating stub database migrate") + clickhouseMigrate, err := initMigration() + if tc.initExpectedErrorMsg != "" { + assert.ErrorContains(t, err, tc.initExpectedErrorMsg) + } else { + assert.Nil(t, err) + err = startMigration(clickhouseMigrate) + if tc.startExpectedErrorMsg != "" { + assert.ErrorContains(t, err, tc.startExpectedErrorMsg) + } else { + assert.Nil(t, err) + } + } + + }) + } +} diff --git a/test/e2e/upgrade_test.go b/test/e2e/upgrade_test.go index 568c7f86..846fac5f 100644 --- a/test/e2e/upgrade_test.go +++ b/test/e2e/upgrade_test.go @@ -84,10 +84,16 @@ func checkClickHouseDataSchema(t *testing.T, data *TestData, version string) { queryOutput, stderr, err := data.RunCommandFromPod(flowVisibilityNamespace, clickHousePodName, "clickhouse", []string{"bash", "-c", "clickhouse client -q \"SHOW TABLES\""}) require.NoErrorf(t, err, "Fail to get tables from ClickHouse: %v", stderr) if version != "v0.1.0" { - require.Contains(t, queryOutput, "migrate_version") - queryOutput, stderr, err = data.RunCommandFromPod(flowVisibilityNamespace, clickHousePodName, "clickhouse", []string{"bash", "-c", "clickhouse client -q \"SELECT version FROM migrate_version\""}) - require.NoErrorf(t, err, "Fail to get version from ClickHouse: %v", stderr) - // strip leading 'v' - assert.Contains(t, queryOutput, version[1:]) + require.Contains(t, queryOutput, "flows") + require.Contains(t, queryOutput, "flows_local") + if version == "v0.2.0" { + require.Contains(t, queryOutput, "migrate_version") + queryOutput, stderr, err := data.RunCommandFromPod(flowVisibilityNamespace, clickHousePodName, "clickhouse", []string{"bash", "-c", "clickhouse client -q \"SELECT version FROM migrate_version\""}) + require.NoErrorf(t, err, "Fail to get version from ClickHouse: %v", stderr) + // strip leading 'v' + assert.Contains(t, queryOutput, version[1:]) + } else { + require.Contains(t, queryOutput, "schema_migrations") + } } }