diff --git a/.github/audit.yml b/.github/audit.yml index 6a3007248db03..83db5d60151bd 100644 --- a/.github/audit.yml +++ b/.github/audit.yml @@ -11,9 +11,7 @@ jobs: security_audit: runs-on: ubuntu-20.04 steps: - - uses: actions/checkout@v3 - with: - submodules: "recursive" + - uses: actions/checkout@v1 - uses: actions-rs/audit-check@v1 with: token: ${{ secrets.GITHUB_TOKEN }} diff --git a/.github/workflows/changes.yml b/.github/workflows/changes.yml index 4b6697fed8726..db6c4225073e0 100644 --- a/.github/workflows/changes.yml +++ b/.github/workflows/changes.yml @@ -119,8 +119,6 @@ jobs: k8s: ${{ steps.filter.outputs.k8s }} steps: - uses: actions/checkout@v3 - with: - submodules: "recursive" - uses: dorny/paths-filter@v2 id: filter @@ -214,8 +212,6 @@ jobs: webhdfs: ${{ steps.filter.outputs.webhdfs }} steps: - uses: actions/checkout@v3 - with: - submodules: "recursive" # creates a yaml file that contains the filters for each integration, # extracted from the output of the `vdev int ci-paths` command, which diff --git a/.github/workflows/cli.yml b/.github/workflows/cli.yml index 2801cc78bb945..9d22fda8527bb 100644 --- a/.github/workflows/cli.yml +++ b/.github/workflows/cli.yml @@ -28,13 +28,10 @@ jobs: uses: actions/checkout@v3 with: ref: ${{ steps.comment-branch.outputs.head_ref }} - submodules: "recursive" - name: Checkout branch if: ${{ github.event_name != 'issue_comment' }} uses: actions/checkout@v3 - with: - submodules: "recursive" - name: Cache Cargo registry + index uses: actions/cache@v3 diff --git a/.github/workflows/comment-trigger.yml b/.github/workflows/comment-trigger.yml index 8d3128ba14300..7be887247ce73 100644 --- a/.github/workflows/comment-trigger.yml +++ b/.github/workflows/comment-trigger.yml @@ -59,7 +59,7 @@ jobs: || contains(github.event.comment.body, '/ci-run-regression') ) steps: - - name: Validate issue comment + - name: Get PR comment author id: comment uses: tspascoal/get-user-teams-membership@v2 with: @@ -67,6 +67,10 @@ jobs: team: 'Vector' GITHUB_TOKEN: ${{ secrets.GH_PAT_ORG }} + - name: Validate author membership + if: steps.comment.outputs.isTeamMember == 'false' + run: exit 1 + cli: needs: validate if: contains(github.event.comment.body, '/ci-run-all') || contains(github.event.comment.body, '/ci-run-cli') diff --git a/.github/workflows/compilation-timings.yml b/.github/workflows/compilation-timings.yml index 94616e04af086..e96bea65ea946 100644 --- a/.github/workflows/compilation-timings.yml +++ b/.github/workflows/compilation-timings.yml @@ -17,8 +17,6 @@ jobs: steps: - uses: colpal/actions-clean@v1 - uses: actions/checkout@v3 - with: - submodules: "recursive" - run: sudo -E bash scripts/environment/bootstrap-ubuntu-20.04.sh - run: bash scripts/environment/prepare.sh - run: cargo clean @@ -35,8 +33,6 @@ jobs: steps: - uses: colpal/actions-clean@v1 - uses: actions/checkout@v3 - with: - submodules: "recursive" - run: sudo -E bash scripts/environment/bootstrap-ubuntu-20.04.sh - run: bash scripts/environment/prepare.sh - run: cargo clean @@ -48,8 +44,6 @@ jobs: steps: - uses: colpal/actions-clean@v1 - uses: actions/checkout@v3 - with: - submodules: "recursive" - run: sudo -E bash scripts/environment/bootstrap-ubuntu-20.04.sh - run: bash scripts/environment/prepare.sh - run: cargo clean @@ -61,8 +55,6 @@ jobs: steps: - uses: colpal/actions-clean@v1 - uses: actions/checkout@v3 - with: - submodules: "recursive" - run: sudo -E bash scripts/environment/bootstrap-ubuntu-20.04.sh - run: bash scripts/environment/prepare.sh - run: cargo clean @@ -76,8 +68,6 @@ jobs: steps: - uses: colpal/actions-clean@v1 - uses: actions/checkout@v3 - with: - submodules: "recursive" - run: sudo -E bash scripts/environment/bootstrap-ubuntu-20.04.sh - run: bash scripts/environment/prepare.sh - run: cargo clean diff --git a/.github/workflows/component_features.yml b/.github/workflows/component_features.yml index a888fe6c83d05..2705483f598c4 100644 --- a/.github/workflows/component_features.yml +++ b/.github/workflows/component_features.yml @@ -26,13 +26,10 @@ jobs: uses: actions/checkout@v3 with: ref: ${{ steps.comment-branch.outputs.head_ref }} - submodules: "recursive" - name: Checkout branch if: ${{ github.event_name != 'issue_comment' }} uses: actions/checkout@v3 - with: - submodules: "recursive" - run: sudo -E bash scripts/environment/bootstrap-ubuntu-20.04.sh - run: bash scripts/environment/prepare.sh diff --git a/.github/workflows/cross.yml b/.github/workflows/cross.yml index 19b53a9fae27f..9d36cb4cb3240 100644 --- a/.github/workflows/cross.yml +++ b/.github/workflows/cross.yml @@ -39,13 +39,10 @@ jobs: uses: actions/checkout@v3 with: ref: ${{ steps.comment-branch.outputs.head_ref }} - submodules: "recursive" - name: Checkout branch if: ${{ github.event_name != 'issue_comment' }} uses: actions/checkout@v3 - with: - submodules: "recursive" - uses: actions/cache@v3 name: Cache Cargo registry + index @@ -85,13 +82,6 @@ jobs: needs: cross-linux if: needs.cross-linux.result == 'success' && github.event_name == 'issue_comment' steps: - - name: Validate issue comment - uses: tspascoal/get-user-teams-membership@v2 - with: - username: ${{ github.actor }} - team: 'Vector' - GITHUB_TOKEN: ${{ secrets.GH_PAT_ORG }} - - name: (PR comment) Get PR branch uses: xt0rted/pull-request-comment-branch@v2 id: comment-branch diff --git a/.github/workflows/environment.yml b/.github/workflows/environment.yml index 2c17383d169ac..cdddb0a980db8 100644 --- a/.github/workflows/environment.yml +++ b/.github/workflows/environment.yml @@ -34,13 +34,10 @@ jobs: uses: actions/checkout@v3 with: ref: ${{ steps.comment-branch.outputs.head_ref }} - submodules: "recursive" - name: Checkout branch if: ${{ github.event_name != 'issue_comment' }} uses: actions/checkout@v3 - with: - submodules: "recursive" - name: Set up QEMU uses: docker/setup-qemu-action@v2.2.0 diff --git a/.github/workflows/gardener_open_pr.yml b/.github/workflows/gardener_open_pr.yml index 701bfbacae7f4..e9b0fef67ba05 100644 --- a/.github/workflows/gardener_open_pr.yml +++ b/.github/workflows/gardener_open_pr.yml @@ -18,7 +18,7 @@ jobs: with: username: ${{ github.actor }} team: vector - GITHUB_TOKEN: ${{ secrets.GH_PROJECT_PAT }} + GITHUB_TOKEN: ${{ secrets.GH_PAT_ORG }} - uses: actions/add-to-project@v0.5.0 if: ${{ steps.checkVectorMember.outputs.isTeamMember == 'false' }} with: diff --git a/.github/workflows/gardener_remove_waiting_author.yml b/.github/workflows/gardener_remove_waiting_author.yml index 37f6665034a14..9fe063e50b40d 100644 --- a/.github/workflows/gardener_remove_waiting_author.yml +++ b/.github/workflows/gardener_remove_waiting_author.yml @@ -9,8 +9,6 @@ jobs: runs-on: ubuntu-latest steps: - uses: actions/checkout@v3 - with: - submodules: "recursive" - uses: actions-ecosystem/action-remove-labels@v1 with: labels: "meta: awaiting author" diff --git a/.github/workflows/install-sh.yml b/.github/workflows/install-sh.yml index 83d7f9517035e..045319a191642 100644 --- a/.github/workflows/install-sh.yml +++ b/.github/workflows/install-sh.yml @@ -28,13 +28,10 @@ jobs: uses: actions/checkout@v3 with: ref: ${{ steps.comment-branch.outputs.head_ref }} - submodules: "recursive" - name: Checkout branch if: ${{ github.event_name != 'issue_comment' }} uses: actions/checkout@v3 - with: - submodules: "recursive" - run: pip3 install awscli --upgrade --user - env: diff --git a/.github/workflows/integration-comment.yml b/.github/workflows/integration-comment.yml index 629278ff95c20..a35f994f82f25 100644 --- a/.github/workflows/integration-comment.yml +++ b/.github/workflows/integration-comment.yml @@ -46,14 +46,18 @@ jobs: runs-on: ubuntu-latest if: contains(github.event.comment.body, '/ci-run-integration') || contains(github.event.comment.body, '/ci-run-all') steps: - - name: Validate issue comment - if: github.event_name == 'issue_comment' + - name: Get PR comment author + id: comment uses: tspascoal/get-user-teams-membership@v2 with: username: ${{ github.actor }} team: 'Vector' GITHUB_TOKEN: ${{ secrets.GH_PAT_ORG }} + - name: Validate author membership + if: steps.comment.outputs.isTeamMember == 'false' + run: exit 1 + - name: (PR comment) Get PR branch uses: xt0rted/pull-request-comment-branch@v2 id: comment-branch diff --git a/.github/workflows/integration-test.yml b/.github/workflows/integration-test.yml index ffb741158c87e..4d8d635fcb477 100644 --- a/.github/workflows/integration-test.yml +++ b/.github/workflows/integration-test.yml @@ -54,13 +54,10 @@ jobs: uses: actions/checkout@v3 with: ref: ${{ steps.comment-branch.outputs.head_ref }} - submodules: "recursive" - name: Checkout branch if: ${{ github.event_name != 'issue_comment' }} uses: actions/checkout@v3 - with: - submodules: "recursive" - run: sudo npm -g install @datadog/datadog-ci diff --git a/.github/workflows/k8s_e2e.yml b/.github/workflows/k8s_e2e.yml index 61196de8befa0..50de3f5db9a2c 100644 --- a/.github/workflows/k8s_e2e.yml +++ b/.github/workflows/k8s_e2e.yml @@ -88,13 +88,10 @@ jobs: uses: actions/checkout@v3 with: ref: ${{ steps.comment-branch.outputs.head_ref }} - submodules: "recursive" - name: Checkout branch if: ${{ github.event_name != 'issue_comment' }} uses: actions/checkout@v3 - with: - submodules: "recursive" - uses: actions/cache@v3 with: @@ -208,13 +205,10 @@ jobs: uses: actions/checkout@v3 with: ref: ${{ steps.comment-branch.outputs.head_ref }} - submodules: "recursive" - name: Checkout branch if: ${{ github.event_name != 'issue_comment' }} uses: actions/checkout@v3 - with: - submodules: "recursive" - uses: actions/download-artifact@v3 with: diff --git a/.github/workflows/misc.yml b/.github/workflows/misc.yml index 035bbbc92a66e..4af7ab44a3bef 100644 --- a/.github/workflows/misc.yml +++ b/.github/workflows/misc.yml @@ -28,13 +28,10 @@ jobs: uses: actions/checkout@v3 with: ref: ${{ steps.comment-branch.outputs.head_ref }} - submodules: "recursive" - name: Checkout branch if: ${{ github.event_name != 'issue_comment' }} uses: actions/checkout@v3 - with: - submodules: "recursive" - uses: actions/cache@v3 name: Cache Cargo registry + index diff --git a/.github/workflows/msrv.yml b/.github/workflows/msrv.yml index c910d2394053d..9a82d1ddecbd0 100644 --- a/.github/workflows/msrv.yml +++ b/.github/workflows/msrv.yml @@ -16,8 +16,6 @@ jobs: runs-on: ubuntu-latest steps: - uses: actions/checkout@v3 - with: - submodules: "recursive" - run: sudo -E bash scripts/environment/bootstrap-ubuntu-20.04.sh - run: cargo install cargo-msrv --version 0.15.1 - run: cargo msrv verify diff --git a/.github/workflows/publish.yml b/.github/workflows/publish.yml index e4ea253a558e8..e8cdfee808f6b 100644 --- a/.github/workflows/publish.yml +++ b/.github/workflows/publish.yml @@ -39,7 +39,6 @@ jobs: uses: actions/checkout@v3 with: ref: ${{ inputs.git_ref }} - submodules: "recursive" - name: Generate publish metadata id: generate-publish-metadata run: make ci-generate-publish-metadata @@ -57,7 +56,6 @@ jobs: uses: actions/checkout@v3 with: ref: ${{ inputs.git_ref }} - submodules: "recursive" - name: Bootstrap runner environment (Ubuntu-specific) run: sudo -E bash scripts/environment/bootstrap-ubuntu-20.04.sh - name: Bootstrap runner environment (generic) @@ -83,7 +81,6 @@ jobs: uses: actions/checkout@v3 with: ref: ${{ inputs.git_ref }} - submodules: "recursive" - name: Bootstrap runner environment (Ubuntu-specific) run: sudo -E bash scripts/environment/bootstrap-ubuntu-20.04.sh - name: Bootstrap runner environment (generic) @@ -109,7 +106,6 @@ jobs: uses: actions/checkout@v3 with: ref: ${{ inputs.git_ref }} - submodules: "recursive" - name: Bootstrap runner environment (Ubuntu-specific) run: sudo -E bash scripts/environment/bootstrap-ubuntu-20.04.sh - name: Bootstrap runner environment (generic) @@ -137,7 +133,6 @@ jobs: uses: actions/checkout@v3 with: ref: ${{ inputs.git_ref }} - submodules: "recursive" - name: Bootstrap runner environment (Ubuntu-specific) run: sudo -E bash scripts/environment/bootstrap-ubuntu-20.04.sh - name: Bootstrap runner environment (generic) @@ -165,7 +160,6 @@ jobs: uses: actions/checkout@v3 with: ref: ${{ inputs.git_ref }} - submodules: "recursive" - name: Bootstrap runner environment (Ubuntu-specific) run: sudo -E bash scripts/environment/bootstrap-ubuntu-20.04.sh - name: Bootstrap runner environment (generic) @@ -193,7 +187,6 @@ jobs: uses: actions/checkout@v3 with: ref: ${{ inputs.git_ref }} - submodules: "recursive" - name: Bootstrap runner environment (Ubuntu-specific) run: sudo -E bash scripts/environment/bootstrap-ubuntu-20.04.sh - name: Bootstrap runner environment (generic) @@ -221,7 +214,6 @@ jobs: uses: actions/checkout@v3 with: ref: ${{ inputs.git_ref }} - submodules: "recursive" - name: Bootstrap runner environment (macOS-specific) run: bash scripts/environment/bootstrap-macos-10.sh - name: Build Vector @@ -252,7 +244,6 @@ jobs: uses: actions/checkout@v3 with: ref: ${{ inputs.git_ref }} - submodules: "recursive" - name: Bootstrap runner environment (Windows-specific) run: .\scripts\environment\bootstrap-windows-2019.ps1 - name: Install Wix @@ -320,11 +311,6 @@ jobs: uses: actions/checkout@v3 with: ref: ${{ inputs.git_ref }} - # Workaround for older OS images - # https://github.com/actions/checkout/issues/758 - - name: Checkout submodules - run: | - git submodule update --init --recursive - name: Download staged package artifacts (x86_64-unknown-linux-gnu) uses: actions/download-artifact@v3 with: @@ -381,7 +367,6 @@ jobs: uses: actions/checkout@v3 with: ref: ${{ inputs.git_ref }} - submodules: "recursive" - name: Download staged package artifacts (x86_64-unknown-linux-gnu) uses: actions/download-artifact@v3 with: @@ -409,7 +394,6 @@ jobs: uses: actions/checkout@v3 with: ref: ${{ inputs.git_ref }} - submodules: "recursive" - name: Download staged package artifacts (x86_64-apple-darwin) uses: actions/download-artifact@v3 with: @@ -440,7 +424,6 @@ jobs: uses: actions/checkout@v3 with: ref: ${{ inputs.git_ref }} - submodules: "recursive" - name: Login to DockerHub uses: docker/login-action@v2.1.0 with: @@ -516,7 +499,6 @@ jobs: uses: actions/checkout@v3 with: ref: ${{ inputs.git_ref }} - submodules: "recursive" - name: Download staged package artifacts (aarch64-unknown-linux-gnu) uses: actions/download-artifact@v3 with: @@ -588,7 +570,6 @@ jobs: uses: actions/checkout@v3 with: ref: ${{ inputs.git_ref }} - submodules: "recursive" - name: Download staged package artifacts (aarch64-unknown-linux-gnu) uses: actions/download-artifact@v3 with: @@ -649,7 +630,6 @@ jobs: uses: actions/checkout@v3 with: ref: ${{ inputs.git_ref }} - submodules: "recursive" - name: Publish update to Homebrew tap env: GITHUB_TOKEN: ${{ secrets.GH_PACKAGE_PUBLISHER_TOKEN }} @@ -675,7 +655,6 @@ jobs: uses: actions/checkout@v3 with: ref: ${{ inputs.git_ref }} - submodules: "recursive" - name: Download staged package artifacts (aarch64-unknown-linux-gnu) uses: actions/download-artifact@v3 with: diff --git a/.github/workflows/regression.yml b/.github/workflows/regression.yml index f3370293198ef..4d770d496486f 100644 --- a/.github/workflows/regression.yml +++ b/.github/workflows/regression.yml @@ -48,8 +48,6 @@ jobs: comment_valid: ${{ steps.comment.outputs.isTeamMember }} steps: - uses: actions/checkout@v3 - with: - submodules: "recursive" - name: Collect file changes id: changes @@ -131,7 +129,6 @@ jobs: - uses: actions/checkout@v3 with: fetch-depth: 1000 - submodules: "recursive" # If triggered by issue comment, the event payload doesn't directly contain the head and base sha from the PR. # But, we can retrieve this info from some commands. @@ -290,14 +287,11 @@ jobs: - uses: colpal/actions-clean@v1 - uses: actions/checkout@v3 - with: - submodules: "recursive" - uses: actions/checkout@v3 with: ref: ${{ needs.compute-metadata.outputs.baseline-sha }} path: baseline-vector - submodules: "recursive" - name: Set up Docker Buildx id: buildx @@ -330,14 +324,11 @@ jobs: - uses: colpal/actions-clean@v1 - uses: actions/checkout@v3 - with: - submodules: "recursive" - uses: actions/checkout@v3 with: ref: ${{ needs.compute-metadata.outputs.comparison-sha }} path: comparison-vector - submodules: "recursive" - name: Set up Docker Buildx id: buildx @@ -482,7 +473,6 @@ jobs: - uses: actions/checkout@v3 with: ref: ${{ needs.compute-metadata.outputs.comparison-sha }} - submodules: "recursive" - name: Configure AWS Credentials uses: aws-actions/configure-aws-credentials@v2.2.0 @@ -602,8 +592,6 @@ jobs: - compute-metadata steps: - uses: actions/checkout@v3 - with: - submodules: "recursive" - name: Configure AWS Credentials uses: aws-actions/configure-aws-credentials@v2.2.0 @@ -695,7 +683,6 @@ jobs: - uses: actions/checkout@v3 with: ref: ${{ needs.compute-metadata.outputs.comparison-sha }} - submodules: "recursive" - name: Configure AWS Credentials uses: aws-actions/configure-aws-credentials@v2.2.0 diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 7947529457c90..ca5184b5041c2 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -44,7 +44,6 @@ jobs: with: # check-version needs tags fetch-depth: 0 # fetch everything - submodules: "recursive" - uses: actions/cache@v3 name: Cache Cargo registry + index diff --git a/.github/workflows/unit_mac.yml b/.github/workflows/unit_mac.yml index 22476d63efada..abda6ae1e177f 100644 --- a/.github/workflows/unit_mac.yml +++ b/.github/workflows/unit_mac.yml @@ -32,13 +32,10 @@ jobs: uses: actions/checkout@v3 with: ref: ${{ steps.comment-branch.outputs.head_ref }} - submodules: "recursive" - name: Checkout branch if: ${{ github.event_name != 'issue_comment' }} uses: actions/checkout@v3 - with: - submodules: "recursive" - uses: actions/cache@v3 name: Cache Cargo registry + index diff --git a/.github/workflows/unit_windows.yml b/.github/workflows/unit_windows.yml index 61128cf5a801a..479d18938e4ec 100644 --- a/.github/workflows/unit_windows.yml +++ b/.github/workflows/unit_windows.yml @@ -8,14 +8,6 @@ jobs: test-windows: runs-on: [windows, windows-2019-8core] steps: - - name: Validate issue comment - if: github.event_name == 'issue_comment' - uses: tspascoal/get-user-teams-membership@v2 - with: - username: ${{ github.actor }} - team: 'Vector' - GITHUB_TOKEN: ${{ secrets.GH_PAT_ORG }} - - name: (PR comment) Get PR branch if: ${{ github.event_name == 'issue_comment' }} uses: xt0rted/pull-request-comment-branch@v2 @@ -35,13 +27,10 @@ jobs: uses: actions/checkout@v3 with: ref: ${{ steps.comment-branch.outputs.head_ref }} - submodules: "recursive" - name: Checkout branch if: ${{ github.event_name != 'issue_comment' }} uses: actions/checkout@v3 - with: - submodules: "recursive" - run: .\scripts\environment\bootstrap-windows-2019.ps1 - run: make test diff --git a/Cargo.lock b/Cargo.lock index 1b0b771572fec..e96f6160f8dd2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -260,7 +260,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9c6368f9ae5c6ec403ca910327ae0c9437b0a85255b6950c90d497e6177f6e5e" dependencies = [ "proc-macro-hack", - "quote 1.0.28", + "quote 1.0.29", "syn 1.0.109", ] @@ -473,7 +473,7 @@ dependencies = [ "darling 0.14.2", "proc-macro-crate 1.2.1", "proc-macro2 1.0.63", - "quote 1.0.28", + "quote 1.0.29", "syn 1.0.109", "thiserror", ] @@ -593,7 +593,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0e97ce7de6cf12de5d7226c73f5ba9811622f4db3a5b91b55c53e987e5f91cba" dependencies = [ "proc-macro2 1.0.63", - "quote 1.0.28", + "quote 1.0.29", "syn 2.0.10", ] @@ -615,7 +615,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "16e62a023e7c117e27523144c5d2459f4397fcc3cab0085af8e2224f643a0193" dependencies = [ "proc-macro2 1.0.63", - "quote 1.0.28", + "quote 1.0.29", "syn 2.0.10", ] @@ -632,7 +632,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b9ccdd8f2a161be9bd5c023df56f1b2a0bd1d83872ae53b71a84a12c9bf6e842" dependencies = [ "proc-macro2 1.0.63", - "quote 1.0.28", + "quote 1.0.29", "syn 2.0.10", ] @@ -1417,7 +1417,7 @@ version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fd9e32d7420c85055e8107e5b2463c4eeefeaac18b52359fe9f9c08a18f342b2" dependencies = [ - "quote 1.0.28", + "quote 1.0.29", "syn 1.0.109", ] @@ -1561,7 +1561,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "61820b4c5693eafb998b1e67485423c923db4a75f72585c247bdee32bad81e7b" dependencies = [ "proc-macro2 1.0.63", - "quote 1.0.28", + "quote 1.0.29", "syn 1.0.109", ] @@ -1572,7 +1572,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c76cdbfa13def20d1f8af3ae7b3c6771f06352a74221d8851262ac384c122b8e" dependencies = [ "proc-macro2 1.0.63", - "quote 1.0.28", + "quote 1.0.29", "syn 1.0.109", ] @@ -1643,7 +1643,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "13e576ebe98e605500b3c8041bb888e966653577172df6dd97398714eb30b9bf" dependencies = [ "proc-macro2 1.0.63", - "quote 1.0.28", + "quote 1.0.29", "syn 1.0.109", ] @@ -1726,7 +1726,7 @@ dependencies = [ "cached_proc_macro_types", "darling 0.14.2", "proc-macro2 1.0.63", - "quote 1.0.28", + "quote 1.0.29", "syn 1.0.109", ] @@ -1951,7 +1951,7 @@ checksum = "81d7dc0031c3a59a04fc2ba395c8e2dd463cba1859275f065d225f6122221b45" dependencies = [ "heck 0.4.0", "proc-macro2 1.0.63", - "quote 1.0.28", + "quote 1.0.29", "syn 2.0.10", ] @@ -2405,7 +2405,7 @@ version = "0.1.26" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6d2301688392eb071b0bf1a37be05c469d3cc4dbbd95df672fe28ab021e6a096" dependencies = [ - "quote 1.0.28", + "quote 1.0.29", "syn 1.0.109", ] @@ -2459,7 +2459,7 @@ dependencies = [ "codespan-reporting", "once_cell", "proc-macro2 1.0.63", - "quote 1.0.28", + "quote 1.0.29", "scratch", "syn 1.0.109", ] @@ -2477,7 +2477,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a08a6e2fcc370a089ad3b4aaf54db3b1b4cee38ddabce5896b33eb693275f470" dependencies = [ "proc-macro2 1.0.63", - "quote 1.0.28", + "quote 1.0.29", "syn 1.0.109", ] @@ -2510,7 +2510,7 @@ dependencies = [ "fnv", "ident_case", "proc-macro2 1.0.63", - "quote 1.0.28", + "quote 1.0.29", "strsim 0.10.0", "syn 1.0.109", ] @@ -2524,7 +2524,7 @@ dependencies = [ "fnv", "ident_case", "proc-macro2 1.0.63", - "quote 1.0.28", + "quote 1.0.29", "strsim 0.10.0", "syn 1.0.109", ] @@ -2536,7 +2536,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9c972679f83bdf9c42bd905396b6c3588a843a17f0f16dfcfa3e2c5d57441835" dependencies = [ "darling_core 0.13.4", - "quote 1.0.28", + "quote 1.0.29", "syn 1.0.109", ] @@ -2547,7 +2547,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7618812407e9402654622dd402b0a89dff9ba93badd6540781526117b92aab7e" dependencies = [ "darling_core 0.14.2", - "quote 1.0.28", + "quote 1.0.29", "syn 1.0.109", ] @@ -2623,7 +2623,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fcc3dd5e9e9c0b295d6e1e4d811fb6f157d5ffd784b8d202fc62eac8035a770b" dependencies = [ "proc-macro2 1.0.63", - "quote 1.0.28", + "quote 1.0.29", "syn 1.0.109", ] @@ -2634,7 +2634,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f3cdeb9ec472d588e539a818b2dee436825730da08ad0017c4b1a17676bdc8b7" dependencies = [ "proc-macro2 1.0.63", - "quote 1.0.28", + "quote 1.0.29", "syn 1.0.109", ] @@ -2646,7 +2646,7 @@ checksum = "4fb810d30a7c1953f91334de7244731fc3f3c10d7fe163338a35b9f640960321" dependencies = [ "convert_case 0.4.0", "proc-macro2 1.0.63", - "quote 1.0.28", + "quote 1.0.29", "rustc_version 0.4.0", "syn 1.0.109", ] @@ -2902,7 +2902,7 @@ checksum = "21cdad81446a7f7dc43f6a77409efeb9733d2fa65553efef6018ef257c959b73" dependencies = [ "heck 0.4.0", "proc-macro2 1.0.63", - "quote 1.0.28", + "quote 1.0.29", "syn 1.0.109", ] @@ -2914,7 +2914,7 @@ checksum = "c9720bba047d567ffc8a3cba48bf19126600e249ab7f128e9233e6376976a116" dependencies = [ "heck 0.4.0", "proc-macro2 1.0.63", - "quote 1.0.28", + "quote 1.0.29", "syn 1.0.109", ] @@ -2926,7 +2926,7 @@ checksum = "11f36e95862220b211a6e2aa5eca09b4fa391b13cd52ceb8035a24bf65a79de2" dependencies = [ "once_cell", "proc-macro2 1.0.63", - "quote 1.0.28", + "quote 1.0.29", "syn 1.0.109", ] @@ -2946,7 +2946,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5e9a1f9f7d83e59740248a6e14ecf93929ade55027844dfcea78beafccc15745" dependencies = [ "proc-macro2 1.0.63", - "quote 1.0.28", + "quote 1.0.29", "syn 2.0.10", ] @@ -3074,7 +3074,7 @@ checksum = "f47da3a72ec598d9c8937a7ebca8962a5c7a1f28444e38c2b33c771ba3f55f05" dependencies = [ "proc-macro-error", "proc-macro2 1.0.63", - "quote 1.0.28", + "quote 1.0.29", "syn 1.0.109", ] @@ -3170,7 +3170,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e4c81935e123ab0741c4c4f0d9b8377e5fb21d3de7e062fa4b1263b1fbcba1ea" dependencies = [ "proc-macro2 1.0.63", - "quote 1.0.28", + "quote 1.0.29", "syn 1.0.109", ] @@ -3351,7 +3351,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "89ca545a94061b6365f2c7355b4b32bd20df3ff95f02da9329b34ccc3bd6ee72" dependencies = [ "proc-macro2 1.0.63", - "quote 1.0.28", + "quote 1.0.29", "syn 2.0.10", ] @@ -3434,7 +3434,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "eb19fe8de3ea0920d282f7b77dd4227aea6b8b999b42cdf0ca41b2472b14443a" dependencies = [ "proc-macro2 1.0.63", - "quote 1.0.28", + "quote 1.0.29", "syn 1.0.109", ] @@ -3540,7 +3540,7 @@ dependencies = [ "heck 0.4.0", "lazy_static", "proc-macro2 1.0.63", - "quote 1.0.28", + "quote 1.0.29", "serde", "serde_json", "syn 1.0.109", @@ -4926,7 +4926,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ddece26afd34c31585c74a4db0630c376df271c285d682d1e55012197830b6df" dependencies = [ "proc-macro2 1.0.63", - "quote 1.0.28", + "quote 1.0.29", "syn 2.0.10", ] @@ -5452,7 +5452,7 @@ checksum = "dcbff9bc912032c62bf65ef1d5aea88983b420f4f839db1e9b0c281a25c9c799" dependencies = [ "proc-macro-crate 1.2.1", "proc-macro2 1.0.63", - "quote 1.0.28", + "quote 1.0.29", "syn 1.0.109", ] @@ -5464,7 +5464,7 @@ checksum = "96667db765a921f7b295ffee8b60472b686a51d4f21c2ee4ffdb94c7013b65a6" dependencies = [ "proc-macro-crate 1.2.1", "proc-macro2 1.0.63", - "quote 1.0.28", + "quote 1.0.29", "syn 2.0.10", ] @@ -5647,7 +5647,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b501e44f11665960c7e7fcf062c7d96a14ade4aa98116c004b2e37b5be7d736c" dependencies = [ "proc-macro2 1.0.63", - "quote 1.0.28", + "quote 1.0.29", "syn 1.0.109", ] @@ -5891,7 +5891,7 @@ dependencies = [ "pest", "pest_meta", "proc-macro2 1.0.63", - "quote 1.0.28", + "quote 1.0.29", "syn 1.0.109", ] @@ -5979,7 +5979,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "39407670928234ebc5e6e580247dd567ad73a3578460c5990f9503df207e8f07" dependencies = [ "proc-macro2 1.0.63", - "quote 1.0.28", + "quote 1.0.29", "syn 2.0.10", ] @@ -6245,7 +6245,7 @@ checksum = "da25490ff9892aab3fcf7c36f08cfb902dd3e71ca0f9f9517bea02a73a5ce38c" dependencies = [ "proc-macro-error-attr", "proc-macro2 1.0.63", - "quote 1.0.28", + "quote 1.0.29", "syn 1.0.109", "version_check", ] @@ -6257,7 +6257,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a1be40180e52ecc98ad80b184934baf3d0d29f979574e439af5a55274b35f869" dependencies = [ "proc-macro2 1.0.63", - "quote 1.0.28", + "quote 1.0.29", "version_check", ] @@ -6366,7 +6366,7 @@ dependencies = [ "anyhow", "itertools 0.10.5", "proc-macro2 1.0.63", - "quote 1.0.28", + "quote 1.0.29", "syn 1.0.109", ] @@ -6395,7 +6395,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "16b845dbfca988fa33db069c0e230574d15a3088f147a87b64c7589eb662c9ac" dependencies = [ "proc-macro2 1.0.63", - "quote 1.0.28", + "quote 1.0.29", "syn 1.0.109", ] @@ -6494,7 +6494,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b22a693222d716a9587786f37ac3f6b4faedb5b80c23914e7303ff5a1d8016e9" dependencies = [ "proc-macro2 1.0.63", - "quote 1.0.28", + "quote 1.0.29", "syn 1.0.109", ] @@ -6509,9 +6509,9 @@ dependencies = [ [[package]] name = "quote" -version = "1.0.28" +version = "1.0.29" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1b9ab9c7eadfd8df19006f1cf1a4aed13540ed5cbc047010ece5826e10825488" +checksum = "573015e8ab27661678357f27dc26460738fd2b6c86e46f386fde94cb5d913105" dependencies = [ "proc-macro2 1.0.63", ] @@ -6909,7 +6909,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ff26ed6c7c4dfc2aa9480b86a60e3c7233543a270a680e10758a507c5a4ce476" dependencies = [ "proc-macro2 1.0.63", - "quote 1.0.28", + "quote 1.0.29", "syn 1.0.109", ] @@ -7409,7 +7409,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d9735b638ccc51c28bf6914d90a2e9725b377144fc612c49a611fddd1b631d68" dependencies = [ "proc-macro2 1.0.63", - "quote 1.0.28", + "quote 1.0.29", "syn 2.0.10", ] @@ -7420,7 +7420,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "85bf8229e7920a9f636479437026331ce11aa132b4dde37d121944a44d6e5f3c" dependencies = [ "proc-macro2 1.0.63", - "quote 1.0.28", + "quote 1.0.29", "syn 1.0.109", ] @@ -7472,7 +7472,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1fe39d9fbb0ebf5eb2c7cb7e2a47e4f462fad1379f1166b8ae49ad9eae89a7ca" dependencies = [ "proc-macro2 1.0.63", - "quote 1.0.28", + "quote 1.0.29", "syn 1.0.109", ] @@ -7531,7 +7531,7 @@ checksum = "e182d6ec6f05393cc0e5ed1bf81ad6db3a8feedf8ee515ecdd369809bcce8082" dependencies = [ "darling 0.13.4", "proc-macro2 1.0.63", - "quote 1.0.28", + "quote 1.0.29", "syn 1.0.109", ] @@ -7543,7 +7543,7 @@ checksum = "859011bddcc11f289f07f467cc1fe01c7a941daa4d8f6c40d4d1c92eb6d9319c" dependencies = [ "darling 0.14.2", "proc-macro2 1.0.63", - "quote 1.0.28", + "quote 1.0.29", "syn 1.0.109", ] @@ -7814,7 +7814,7 @@ checksum = "475b3bbe5245c26f2d8a6f62d67c1f30eb9fffeccee721c45d162c3ebbdf81b2" dependencies = [ "heck 0.4.0", "proc-macro2 1.0.63", - "quote 1.0.28", + "quote 1.0.29", "syn 1.0.109", ] @@ -7955,7 +7955,7 @@ dependencies = [ "heck 0.3.3", "proc-macro-error", "proc-macro2 1.0.63", - "quote 1.0.28", + "quote 1.0.29", "syn 1.0.109", ] @@ -7973,7 +7973,7 @@ checksum = "1e385be0d24f186b4ce2f9982191e7101bb737312ad61c1f2f984f34bcf85d59" dependencies = [ "heck 0.4.0", "proc-macro2 1.0.63", - "quote 1.0.28", + "quote 1.0.29", "rustversion", "syn 1.0.109", ] @@ -8012,7 +8012,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "72b64191b275b66ffe2469e8af2c1cfe3bafa67b529ead792a6d0160888b4237" dependencies = [ "proc-macro2 1.0.63", - "quote 1.0.28", + "quote 1.0.29", "unicode-ident", ] @@ -8023,7 +8023,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5aad1363ed6d37b84299588d62d3a7d95b5a5c2d9aad5c85609fda12afaa1f40" dependencies = [ "proc-macro2 1.0.63", - "quote 1.0.28", + "quote 1.0.29", "unicode-ident", ] @@ -8040,7 +8040,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f36bdaa60a83aca3921b5259d5400cbf5e90fc51931376a9bd4a0eb79aa7210f" dependencies = [ "proc-macro2 1.0.63", - "quote 1.0.28", + "quote 1.0.29", "syn 1.0.109", "unicode-xid 0.2.4", ] @@ -8194,7 +8194,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f9456a42c5b0d803c8cd86e73dd7cc9edd429499f37a3550d286d5e86720569f" dependencies = [ "proc-macro2 1.0.63", - "quote 1.0.28", + "quote 1.0.29", "syn 2.0.10", ] @@ -8340,7 +8340,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "630bdcf245f78637c13ec01ffae6187cca34625e8c63150d424b59e55af2675e" dependencies = [ "proc-macro2 1.0.63", - "quote 1.0.28", + "quote 1.0.29", "syn 2.0.10", ] @@ -8561,7 +8561,7 @@ dependencies = [ "prettyplease", "proc-macro2 1.0.63", "prost-build", - "quote 1.0.28", + "quote 1.0.29", "syn 1.0.109", ] @@ -8665,7 +8665,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4017f8f45139870ca7e672686113917c71c7a6e02d4924eda67186083c03081a" dependencies = [ "proc-macro2 1.0.63", - "quote 1.0.28", + "quote 1.0.29", "syn 1.0.109", ] @@ -8936,7 +8936,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "89851716b67b937e393b3daa8423e67ddfc4bbbf1654bcf05488e95e0828db0c" dependencies = [ "proc-macro2 1.0.63", - "quote 1.0.28", + "quote 1.0.29", "syn 1.0.109", ] @@ -8966,7 +8966,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2c3e1c30cedd24fc597f7d37a721efdbdc2b1acae012c1ef1218f4c7c2c0f3e7" dependencies = [ "proc-macro2 1.0.63", - "quote 1.0.28", + "quote 1.0.29", "syn 2.0.10", ] @@ -9506,7 +9506,7 @@ dependencies = [ "darling 0.13.4", "once_cell", "proc-macro2 1.0.63", - "quote 1.0.28", + "quote 1.0.29", "serde", "serde_json", "syn 1.0.109", @@ -9519,7 +9519,7 @@ version = "0.1.0" dependencies = [ "darling 0.13.4", "proc-macro2 1.0.63", - "quote 1.0.28", + "quote 1.0.29", "serde", "serde_derive_internals", "syn 1.0.109", @@ -9779,7 +9779,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d257817081c7dffcdbab24b9e62d2def62e2ff7d00b1c20062551e6cccc145ff" dependencies = [ "proc-macro2 1.0.63", - "quote 1.0.28", + "quote 1.0.29", ] [[package]] @@ -9880,7 +9880,7 @@ dependencies = [ "log", "once_cell", "proc-macro2 1.0.63", - "quote 1.0.28", + "quote 1.0.29", "syn 2.0.10", "wasm-bindgen-shared", ] @@ -9903,7 +9903,7 @@ version = "0.2.87" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dee495e55982a3bd48105a7b947fd2a9b4a8ae3010041b9e0faab3f9cd028f1d" dependencies = [ - "quote 1.0.28", + "quote 1.0.29", "wasm-bindgen-macro-support", ] @@ -9914,7 +9914,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "54681b18a46765f095758388f2d0cf16eb8d4169b639ab575a8f5693af210c7b" dependencies = [ "proc-macro2 1.0.63", - "quote 1.0.28", + "quote 1.0.29", "syn 2.0.10", "wasm-bindgen-backend", "wasm-bindgen-shared", @@ -10325,7 +10325,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6505e6815af7de1746a08f69c69606bb45695a17149517680f3b2149713b19a3" dependencies = [ "proc-macro2 1.0.63", - "quote 1.0.28", + "quote 1.0.29", "syn 1.0.109", ] @@ -10345,7 +10345,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3f8f187641dad4f680d25c4bfc4225b418165984179f26ca76ec4fb6441d3a17" dependencies = [ "proc-macro2 1.0.63", - "quote 1.0.28", + "quote 1.0.29", "syn 1.0.109", "synstructure", ] diff --git a/lib/vector-core/Cargo.toml b/lib/vector-core/Cargo.toml index 5326901b16bc0..7317a47e0ea70 100644 --- a/lib/vector-core/Cargo.toml +++ b/lib/vector-core/Cargo.toml @@ -94,7 +94,7 @@ rand = "0.8.5" rand_distr = "0.4.3" tracing-subscriber = { version = "0.3.17", default-features = false, features = ["env-filter", "fmt", "ansi", "registry"] } vector-common = { path = "../vector-common", default-features = false, features = ["test"] } -vrl = { version = "0.4.0", default-features = false, features = ["value", "arbitrary", "lua"] } +vrl = { version = "0.4.0", default-features = false, features = ["value", "arbitrary", "lua", "test"] } [features] api = ["dep:async-graphql"] diff --git a/lib/vector-core/src/config/mod.rs b/lib/vector-core/src/config/mod.rs index 3ff5152a293a7..71786155d1d8f 100644 --- a/lib/vector-core/src/config/mod.rs +++ b/lib/vector-core/src/config/mod.rs @@ -1,3 +1,4 @@ +use std::sync::Arc; use std::{collections::HashMap, fmt, num::NonZeroUsize}; use bitmask_enum::bitmask; @@ -111,7 +112,7 @@ pub struct SourceOutput { // NOTE: schema definitions are only implemented/supported for log-type events. There is no // inherent blocker to support other types as well, but it'll require additional work to add // the relevant schemas, and store them separately in this type. - pub schema_definition: Option, + pub schema_definition: Option>, } impl SourceOutput { @@ -129,7 +130,7 @@ impl SourceOutput { Self { port: None, ty, - schema_definition: Some(schema_definition), + schema_definition: Some(Arc::new(schema_definition)), } } @@ -168,17 +169,15 @@ impl SourceOutput { /// Schema enabled is set in the users configuration. #[must_use] pub fn schema_definition(&self, schema_enabled: bool) -> Option { + use std::ops::Deref; + self.schema_definition.as_ref().map(|definition| { if schema_enabled { - definition.clone() + definition.deref().clone() } else { let mut new_definition = schema::Definition::default_for_namespace(definition.log_namespaces()); - - if definition.log_namespaces().contains(&LogNamespace::Vector) { - new_definition.add_meanings(definition.meanings()); - } - + new_definition.add_meanings(definition.meanings()); new_definition } }) @@ -203,7 +202,7 @@ pub struct TransformOutput { /// enabled, at least one definition should be output. If the transform /// has multiple connected sources, it is possible to have multiple output /// definitions - one for each input. - log_schema_definitions: HashMap, + pub log_schema_definitions: HashMap, } impl TransformOutput { @@ -245,11 +244,7 @@ impl TransformOutput { .map(|(output, definition)| { let mut new_definition = schema::Definition::default_for_namespace(definition.log_namespaces()); - - if definition.log_namespaces().contains(&LogNamespace::Vector) { - new_definition.add_meanings(definition.meanings()); - } - + new_definition.add_meanings(definition.meanings()); (output.clone(), new_definition) }) .collect() @@ -606,7 +601,10 @@ mod test { // There should be the default legacy definition without schemas enabled. assert_eq!( - Some(schema::Definition::default_legacy_namespace()), + Some( + schema::Definition::default_legacy_namespace() + .with_meaning(OwnedTargetPath::event(owned_value_path!("zork")), "zork") + ), output.schema_definition(false) ); } diff --git a/lib/vector-core/src/event/metadata.rs b/lib/vector-core/src/event/metadata.rs index f13bee6a5e009..d86884be7582c 100644 --- a/lib/vector-core/src/event/metadata.rs +++ b/lib/vector-core/src/event/metadata.rs @@ -7,7 +7,10 @@ use vector_common::{config::ComponentKey, EventDataEq}; use vrl::value::{Kind, Secrets, Value}; use super::{BatchNotifier, EventFinalizer, EventFinalizers, EventStatus}; -use crate::{config::LogNamespace, schema, ByteSizeOf}; +use crate::{ + config::{LogNamespace, OutputId}, + schema, ByteSizeOf, +}; const DATADOG_API_KEY: &str = "datadog_api_key"; const SPLUNK_HEC_TOKEN: &str = "splunk_hec_token"; @@ -30,8 +33,15 @@ pub struct EventMetadata { /// The id of the source source_id: Option>, + /// The id of the component this event originated from. This is used to + /// determine which schema definition to attach to an event in transforms. + /// This should always have a value set for events in transforms. It will always be `None` + /// in a source, and there is currently no use-case for reading the value in a sink. + upstream_id: Option>, + /// An identifier for a globally registered schema definition which provides information about /// the event shape (type information, and semantic meaning of fields). + /// This definition is only currently valid for logs, and shouldn't be used for other event types. /// /// TODO(Jean): must not skip serialization to track schemas across restarts. #[serde(default = "default_schema_definition", skip)] @@ -71,17 +81,29 @@ impl EventMetadata { &mut self.secrets } - /// Returns a reference to the metadata source. + /// Returns a reference to the metadata source id. #[must_use] pub fn source_id(&self) -> Option<&Arc> { self.source_id.as_ref() } + /// Returns a reference to the metadata parent id. This is the `OutputId` + /// of the previous component the event was sent through (if any). + #[must_use] + pub fn upstream_id(&self) -> Option<&OutputId> { + self.upstream_id.as_deref() + } + /// Sets the `source_id` in the metadata to the provided value. pub fn set_source_id(&mut self, source_id: Arc) { self.source_id = Some(source_id); } + /// Sets the `upstream_id` in the metadata to the provided value. + pub fn set_upstream_id(&mut self, upstream_id: Arc) { + self.upstream_id = Some(upstream_id); + } + /// Return the datadog API key, if it exists pub fn datadog_api_key(&self) -> Option> { self.secrets.get(DATADOG_API_KEY).cloned() @@ -111,6 +133,7 @@ impl Default for EventMetadata { finalizers: Default::default(), schema_definition: default_schema_definition(), source_id: None, + upstream_id: None, } } } diff --git a/lib/vector-core/src/event/mod.rs b/lib/vector-core/src/event/mod.rs index ae2e51e8a23a8..9547f58dc5ed3 100644 --- a/lib/vector-core/src/event/mod.rs +++ b/lib/vector-core/src/event/mod.rs @@ -5,7 +5,7 @@ use std::{ sync::Arc, }; -use crate::ByteSizeOf; +use crate::{config::OutputId, ByteSizeOf}; pub use array::{into_event_stream, EventArray, EventContainer, LogArray, MetricArray, TraceArray}; pub use estimated_json_encoded_size_of::EstimatedJsonEncodedSizeOf; pub use finalization::{ @@ -309,12 +309,24 @@ impl Event { self.metadata_mut().set_source_id(source_id); } + /// Sets the `upstream_id` in the event metadata to the provided value. + pub fn set_upstream_id(&mut self, upstream_id: Arc) { + self.metadata_mut().set_upstream_id(upstream_id); + } + /// Sets the `source_id` in the event metadata to the provided value. #[must_use] pub fn with_source_id(mut self, source_id: Arc) -> Self { self.metadata_mut().set_source_id(source_id); self } + + /// Sets the `upstream_id` in the event metadata to the provided value. + #[must_use] + pub fn with_upstream_id(mut self, upstream_id: Arc) -> Self { + self.metadata_mut().set_upstream_id(upstream_id); + self + } } impl EventDataEq for Event { diff --git a/lib/vector-core/src/transform/mod.rs b/lib/vector-core/src/transform/mod.rs index a60cd85c8200a..af81c51aa69a1 100644 --- a/lib/vector-core/src/transform/mod.rs +++ b/lib/vector-core/src/transform/mod.rs @@ -1,3 +1,4 @@ +use std::sync::Arc; use std::{collections::HashMap, error, pin::Pin}; use futures::{Stream, StreamExt}; @@ -7,13 +8,16 @@ use vector_common::internal_event::{ use vector_common::json_size::JsonSize; use vector_common::EventDataEq; +use crate::config::{ComponentKey, OutputId}; +use crate::event::EventMutRef; +use crate::schema::Definition; use crate::{ config, event::{ into_event_stream, EstimatedJsonEncodedSizeOf, Event, EventArray, EventContainer, EventRef, }, fanout::{self, Fanout}, - ByteSizeOf, + schema, ByteSizeOf, }; #[cfg(any(feature = "lua"))] @@ -178,6 +182,8 @@ impl SyncTransform for Box { struct TransformOutput { fanout: Fanout, events_sent: Registered, + log_schema_definitions: HashMap>, + output_id: Arc, } pub struct TransformOutputs { @@ -189,6 +195,7 @@ pub struct TransformOutputs { impl TransformOutputs { pub fn new( outputs_in: Vec, + component_key: &ComponentKey, ) -> (Self, HashMap, fanout::ControlChannel>) { let outputs_spec = outputs_in.clone(); let mut primary_output = None; @@ -197,6 +204,13 @@ impl TransformOutputs { for output in outputs_in { let (fanout, control) = Fanout::new(); + + let log_schema_definitions = output + .log_schema_definitions + .into_iter() + .map(|(id, definition)| (id, Arc::new(definition))) + .collect(); + match output.port { None => { primary_output = Some(TransformOutput { @@ -204,6 +218,11 @@ impl TransformOutputs { events_sent: register(EventsSent::from(internal_event::Output(Some( DEFAULT_OUTPUT.into(), )))), + log_schema_definitions, + output_id: Arc::new(OutputId { + component: component_key.clone(), + port: None, + }), }); controls.insert(None, control); } @@ -215,6 +234,11 @@ impl TransformOutputs { events_sent: register(EventsSent::from(internal_event::Output(Some( name.clone().into(), )))), + log_schema_definitions, + output_id: Arc::new(OutputId { + component: component_key.clone(), + port: Some(name.clone()), + }), }, ); controls.insert(Some(name.clone()), control); @@ -246,31 +270,61 @@ impl TransformOutputs { buf: &mut TransformOutputsBuf, ) -> Result<(), Box> { if let Some(primary) = self.primary_output.as_mut() { - let count = buf.primary_buffer.as_ref().map_or(0, OutputBuffer::len); - let byte_size = buf.primary_buffer.as_ref().map_or( - JsonSize::new(0), - EstimatedJsonEncodedSizeOf::estimated_json_encoded_size_of, - ); - buf.primary_buffer - .as_mut() - .expect("mismatched outputs") - .send(&mut primary.fanout) - .await?; - primary.events_sent.emit(CountByteSize(count, byte_size)); + let buf = buf.primary_buffer.as_mut().expect("mismatched outputs"); + Self::send_single_buffer(buf, primary).await?; } - for (key, buf) in &mut buf.named_buffers { - let count = buf.len(); - let byte_size = buf.estimated_json_encoded_size_of(); let output = self.named_outputs.get_mut(key).expect("unknown output"); - buf.send(&mut output.fanout).await?; - output.events_sent.emit(CountByteSize(count, byte_size)); + Self::send_single_buffer(buf, output).await?; } + Ok(()) + } + async fn send_single_buffer( + buf: &mut OutputBuffer, + output: &mut TransformOutput, + ) -> Result<(), Box> { + for event in buf.events_mut() { + update_runtime_schema_definition( + event, + &output.output_id, + &output.log_schema_definitions, + ); + } + let count = buf.len(); + let byte_size = buf.estimated_json_encoded_size_of(); + buf.send(&mut output.fanout).await?; + output.events_sent.emit(CountByteSize(count, byte_size)); Ok(()) } } +#[allow(clippy::implicit_hasher)] +/// `event`: The event that will be updated +/// `output_id`: The `output_id` that the current even is being sent to (will be used as the new `parent_id`) +/// `log_schema_definitions`: A mapping of parent `OutputId` to definitions, that will be used to lookup the new runtime definition of the event +pub fn update_runtime_schema_definition( + mut event: EventMutRef, + output_id: &Arc, + log_schema_definitions: &HashMap>, +) { + if let EventMutRef::Log(log) = &mut event { + if let Some(parent_component_id) = log.metadata().upstream_id() { + if let Some(definition) = log_schema_definitions.get(parent_component_id) { + log.metadata_mut().set_schema_definition(definition); + } + } else { + // there is no parent defined. That means this event originated from a component that + // isn't able to track the source, such as `reduce` or `lua`. In these cases, all of the + // schema definitions _must_ be the same, so the first one is picked + if let Some(definition) = log_schema_definitions.values().next() { + log.metadata_mut().set_schema_definition(definition); + } + } + } + event.metadata_mut().set_upstream_id(Arc::clone(output_id)); +} + #[derive(Debug, Clone)] pub struct TransformOutputsBuf { primary_buffer: Option, @@ -299,34 +353,17 @@ impl TransformOutputsBuf { } } - pub fn push(&mut self, event: Event) { - self.primary_buffer - .as_mut() - .expect("no default output") - .push(event); - } - - pub fn push_named(&mut self, name: &str, event: Event) { - self.named_buffers - .get_mut(name) - .expect("unknown output") - .push(event); - } - - pub fn append(&mut self, slice: &mut Vec) { - self.primary_buffer - .as_mut() - .expect("no default output") - .append(slice); - } - - pub fn append_named(&mut self, name: &str, slice: &mut Vec) { - self.named_buffers - .get_mut(name) - .expect("unknown output") - .append(slice); + /// Adds a new event to the transform output buffer + pub fn push(&mut self, name: Option<&str>, event: Event) { + match name { + Some(name) => self.named_buffers.get_mut(name), + None => self.primary_buffer.as_mut(), + } + .expect("unknown output") + .push(event); } + #[cfg(any(feature = "test", test))] pub fn drain(&mut self) -> impl Iterator + '_ { self.primary_buffer .as_mut() @@ -334,6 +371,7 @@ impl TransformOutputsBuf { .drain() } + #[cfg(any(feature = "test", test))] pub fn drain_named(&mut self, name: &str) -> impl Iterator + '_ { self.named_buffers .get_mut(name) @@ -341,33 +379,15 @@ impl TransformOutputsBuf { .drain() } - pub fn extend(&mut self, events: impl Iterator) { - self.primary_buffer - .as_mut() - .expect("no default output") - .extend(events); - } - + #[cfg(any(feature = "test", test))] pub fn take_primary(&mut self) -> OutputBuffer { std::mem::take(self.primary_buffer.as_mut().expect("no default output")) } + #[cfg(any(feature = "test", test))] pub fn take_all_named(&mut self) -> HashMap { std::mem::take(&mut self.named_buffers) } - - pub fn len(&self) -> usize { - self.primary_buffer.as_ref().map_or(0, OutputBuffer::len) - + self - .named_buffers - .values() - .map(OutputBuffer::len) - .sum::() - } - - pub fn is_empty(&self) -> bool { - self.len() == 0 - } } impl ByteSizeOf for TransformOutputsBuf { @@ -439,6 +459,7 @@ impl OutputBuffer { }) } + #[cfg(any(feature = "test", test))] pub fn drain(&mut self) -> impl Iterator + '_ { self.0.drain(..).flat_map(EventArray::into_events) } @@ -458,12 +479,12 @@ impl OutputBuffer { self.0.iter().flat_map(EventArray::iter_events) } - pub fn into_events(self) -> impl Iterator { - self.0.into_iter().flat_map(EventArray::into_events) + fn events_mut(&mut self) -> impl Iterator { + self.0.iter_mut().flat_map(EventArray::iter_events_mut) } - pub fn take_events(&mut self) -> Vec { - std::mem::take(&mut self.0) + pub fn into_events(self) -> impl Iterator { + self.0.into_iter().flat_map(EventArray::into_events) } } diff --git a/src/config/transform.rs b/src/config/transform.rs index c2be848d53361..1b9f442ef0786 100644 --- a/src/config/transform.rs +++ b/src/config/transform.rs @@ -195,6 +195,9 @@ pub trait TransformConfig: DynClone + NamedComponent + core::fmt::Debug + Send + &self, enrichment_tables: enrichment::TableRegistry, input_definitions: &[(OutputId, schema::Definition)], + + // This only exists for transforms that create logs from non-logs, to know which namespace + // to use, such as `metric_to_log` global_log_namespace: LogNamespace, ) -> Vec; diff --git a/src/sinks/azure_common/mod.rs b/src/sinks/azure_common/mod.rs index f3e4ff73ebe40..4d1c931977f03 100644 --- a/src/sinks/azure_common/mod.rs +++ b/src/sinks/azure_common/mod.rs @@ -1,3 +1,3 @@ -pub(crate) mod config; -pub(crate) mod service; -pub(crate) mod sink; +pub mod config; +pub mod service; +pub mod sink; diff --git a/src/sinks/azure_common/service.rs b/src/sinks/azure_common/service.rs index 122bd66525b18..eed5a068c48fa 100644 --- a/src/sinks/azure_common/service.rs +++ b/src/sinks/azure_common/service.rs @@ -12,7 +12,7 @@ use tracing::Instrument; use crate::sinks::azure_common::config::{AzureBlobRequest, AzureBlobResponse}; #[derive(Clone)] -pub(crate) struct AzureBlobService { +pub struct AzureBlobService { client: Arc, } diff --git a/src/sinks/s3_common/mod.rs b/src/sinks/s3_common/mod.rs index 193857740a297..37a3669c195ca 100644 --- a/src/sinks/s3_common/mod.rs +++ b/src/sinks/s3_common/mod.rs @@ -1,4 +1,4 @@ -pub(crate) mod config; -pub(crate) mod partitioner; -pub(crate) mod service; -pub(crate) mod sink; +pub mod config; +pub mod partitioner; +pub mod service; +pub mod sink; diff --git a/src/source_sender/mod.rs b/src/source_sender/mod.rs index a4f4eaae3b751..fea4a3980b64d 100644 --- a/src/source_sender/mod.rs +++ b/src/source_sender/mod.rs @@ -1,4 +1,5 @@ #![allow(missing_docs)] +use std::sync::Arc; use std::{collections::HashMap, fmt}; use chrono::Utc; @@ -19,6 +20,8 @@ use vrl::value::Value; mod errors; +use crate::config::{ComponentKey, OutputId}; +use crate::schema::Definition; pub use errors::{ClosedError, StreamSendError}; use lookup::PathPrefix; @@ -48,17 +51,37 @@ impl Builder { } } - pub fn add_source_output(&mut self, output: SourceOutput) -> LimitedReceiver { + pub fn add_source_output( + &mut self, + output: SourceOutput, + component_key: ComponentKey, + ) -> LimitedReceiver { let lag_time = self.lag_time.clone(); + let log_definition = output.schema_definition.clone(); + let output_id = OutputId { + component: component_key, + port: output.port.clone(), + }; match output.port { None => { - let (inner, rx) = - Inner::new_with_buffer(self.buf_size, DEFAULT_OUTPUT.to_owned(), lag_time); + let (inner, rx) = Inner::new_with_buffer( + self.buf_size, + DEFAULT_OUTPUT.to_owned(), + lag_time, + log_definition, + output_id, + ); self.inner = Some(inner); rx } Some(name) => { - let (inner, rx) = Inner::new_with_buffer(self.buf_size, name.clone(), lag_time); + let (inner, rx) = Inner::new_with_buffer( + self.buf_size, + name.clone(), + lag_time, + log_definition, + output_id, + ); self.named_inners.insert(name, inner); rx } @@ -91,9 +114,15 @@ impl SourceSender { } } - pub fn new_with_buffer(n: usize) -> (Self, LimitedReceiver) { + #[cfg(test)] + pub fn new_test_sender_with_buffer(n: usize) -> (Self, LimitedReceiver) { let lag_time = Some(register_histogram!(LAG_TIME_NAME)); - let (inner, rx) = Inner::new_with_buffer(n, DEFAULT_OUTPUT.to_owned(), lag_time); + let output_id = OutputId { + component: "test".to_string().into(), + port: None, + }; + let (inner, rx) = + Inner::new_with_buffer(n, DEFAULT_OUTPUT.to_owned(), lag_time, None, output_id); ( Self { inner: Some(inner), @@ -105,14 +134,14 @@ impl SourceSender { #[cfg(test)] pub fn new_test() -> (Self, impl Stream + Unpin) { - let (pipe, recv) = Self::new_with_buffer(TEST_BUFFER_SIZE); + let (pipe, recv) = Self::new_test_sender_with_buffer(TEST_BUFFER_SIZE); let recv = recv.into_stream().flat_map(into_event_stream); (pipe, recv) } #[cfg(test)] pub fn new_test_finalize(status: EventStatus) -> (Self, impl Stream + Unpin) { - let (pipe, recv) = Self::new_with_buffer(TEST_BUFFER_SIZE); + let (pipe, recv) = Self::new_test_sender_with_buffer(TEST_BUFFER_SIZE); // In a source test pipeline, there is no sink to acknowledge // events, so we have to add a map to the receiver to handle the // finalization. @@ -131,7 +160,7 @@ impl SourceSender { pub fn new_test_errors( error_at: impl Fn(usize) -> bool, ) -> (Self, impl Stream + Unpin) { - let (pipe, recv) = Self::new_with_buffer(TEST_BUFFER_SIZE); + let (pipe, recv) = Self::new_test_sender_with_buffer(TEST_BUFFER_SIZE); // In a source test pipeline, there is no sink to acknowledge // events, so we have to add a map to the receiver to handle the // finalization. @@ -161,7 +190,11 @@ impl SourceSender { ) -> impl Stream + Unpin { // The lag_time parameter here will need to be filled in if this function is ever used for // non-test situations. - let (inner, recv) = Inner::new_with_buffer(100, name.clone(), None); + let output_id = OutputId { + component: "test".to_string().into(), + port: Some(name.clone()), + }; + let (inner, recv) = Inner::new_with_buffer(100, name.clone(), None, None, output_id); let recv = recv.into_stream().map(move |mut events| { events.iter_events_mut().for_each(|mut event| { let metadata = event.metadata_mut(); @@ -225,6 +258,11 @@ struct Inner { output: String, lag_time: Option, events_sent: Registered, + /// The schema definition that will be attached to Log events sent through here + log_definition: Option>, + /// The OutputId related to this source sender. This is set as the `upstream_id` in + /// `EventMetadata` for all event sent through here. + output_id: Arc, } impl fmt::Debug for Inner { @@ -242,6 +280,8 @@ impl Inner { n: usize, output: String, lag_time: Option, + log_definition: Option>, + output_id: OutputId, ) -> (Self, LimitedReceiver) { let (tx, rx) = channel::limited(n); ( @@ -252,16 +292,29 @@ impl Inner { events_sent: register!(EventsSent::from(internal_event::Output(Some( output.into() )))), + log_definition, + output_id: Arc::new(output_id), }, rx, ) } - async fn send(&mut self, events: EventArray) -> Result<(), ClosedError> { + async fn send(&mut self, mut events: EventArray) -> Result<(), ClosedError> { let reference = Utc::now().timestamp_millis(); events .iter_events() .for_each(|event| self.emit_lag_time(event, reference)); + + events.iter_events_mut().for_each(|mut event| { + // attach runtime schema definitions from the source + if let Some(log_definition) = &self.log_definition { + event.metadata_mut().set_schema_definition(log_definition); + } + event + .metadata_mut() + .set_upstream_id(Arc::clone(&self.output_id)); + }); + let byte_size = events.estimated_json_encoded_size_of(); let count = events.len(); self.inner.send(events).await.map_err(|_| ClosedError)?; @@ -290,23 +343,10 @@ impl Inner { E: Into + ByteSizeOf, I: IntoIterator, { - let reference = Utc::now().timestamp_millis(); let events = events.into_iter().map(Into::into); for events in array::events_into_arrays(events, Some(CHUNK_SIZE)) { - events - .iter_events() - .for_each(|event| self.emit_lag_time(event, reference)); - let cbs = CountByteSize(events.len(), events.estimated_json_encoded_size_of()); - match self.inner.send(events).await { - Ok(()) => { - self.events_sent.emit(cbs); - } - Err(error) => { - return Err(error.into()); - } - } + self.send(events).await?; } - Ok(()) } diff --git a/src/sources/kafka.rs b/src/sources/kafka.rs index ca904de314a62..8ba6511c53557 100644 --- a/src/sources/kafka.rs +++ b/src/sources/kafka.rs @@ -1121,7 +1121,7 @@ mod integration_test { delay: Duration, status: EventStatus, ) -> (SourceSender, impl Stream + Unpin) { - let (pipe, recv) = SourceSender::new_with_buffer(100); + let (pipe, recv) = SourceSender::new_test_sender_with_buffer(100); let recv = BufferReceiver::new(recv.into()).into_stream(); let recv = recv.then(move |mut events| async move { events.iter_logs_mut().for_each(|log| { diff --git a/src/sources/opentelemetry/tests.rs b/src/sources/opentelemetry/tests.rs index 798759fa1138d..fc538efad199d 100644 --- a/src/sources/opentelemetry/tests.rs +++ b/src/sources/opentelemetry/tests.rs @@ -10,10 +10,12 @@ use opentelemetry_proto::proto::{ }; use similar_asserts::assert_eq; use std::collections::BTreeMap; +use std::sync::Arc; use tonic::Request; use vector_core::config::LogNamespace; use vrl::value; +use crate::config::OutputId; use crate::{ config::{SourceConfig, SourceContext}, event::{into_event_stream, Event, EventStatus, LogEvent, Value}, @@ -269,7 +271,11 @@ async fn receive_grpc_logs_legacy_namespace() { ("observed_timestamp", Utc.timestamp_nanos(2).into()), ("source_type", "opentelemetry".into()), ]); - let expect_event = Event::from(LogEvent::from(expect_vec)); + let mut expect_event = Event::from(LogEvent::from(expect_vec)); + expect_event.set_upstream_id(Arc::new(OutputId { + component: "test".into(), + port: Some("logs".into()), + })); assert_eq!(actual_event, expect_event); }) .await; diff --git a/src/sources/socket/mod.rs b/src/sources/socket/mod.rs index 93366629f3420..58ef30c3fcf99 100644 --- a/src/sources/socket/mod.rs +++ b/src/sources/socket/mod.rs @@ -727,7 +727,7 @@ mod test { // shutdown. let addr = next_addr(); - let (source_tx, source_rx) = SourceSender::new_with_buffer(10_000); + let (source_tx, source_rx) = SourceSender::new_test_sender_with_buffer(10_000); let source_key = ComponentKey::from("tcp_shutdown_infinite_stream"); let (source_cx, mut shutdown) = SourceContext::new_shutdown(&source_key, source_tx); diff --git a/src/sources/statsd/mod.rs b/src/sources/statsd/mod.rs index 467dae41fec30..a1ae446c56d36 100644 --- a/src/sources/statsd/mod.rs +++ b/src/sources/statsd/mod.rs @@ -453,7 +453,7 @@ mod test { // packet we send has a lot of metrics per packet. We could technically count them all up // and have a more accurate number here, but honestly, who cares? This is big enough. let component_key = ComponentKey::from("statsd"); - let (tx, rx) = SourceSender::new_with_buffer(4096); + let (tx, rx) = SourceSender::new_test_sender_with_buffer(4096); let (source_ctx, shutdown) = SourceContext::new_shutdown(&component_key, tx); let sink = statsd_config .build(source_ctx) @@ -547,7 +547,7 @@ mod test { // packet we send has a lot of metrics per packet. We could technically count them all up // and have a more accurate number here, but honestly, who cares? This is big enough. let component_key = ComponentKey::from("statsd"); - let (tx, _rx) = SourceSender::new_with_buffer(4096); + let (tx, _rx) = SourceSender::new_test_sender_with_buffer(4096); let (source_ctx, shutdown) = SourceContext::new_shutdown(&component_key, tx); let sink = statsd_config .build(source_ctx) diff --git a/src/test_util/mock/mod.rs b/src/test_util/mock/mod.rs index 62b0d96d76f10..3fb594b677e7a 100644 --- a/src/test_util/mock/mod.rs +++ b/src/test_util/mock/mod.rs @@ -30,12 +30,12 @@ pub fn backpressure_source(counter: &Arc) -> BackpressureSourceConf } pub fn basic_source() -> (SourceSender, BasicSourceConfig) { - let (tx, rx) = SourceSender::new_with_buffer(1); + let (tx, rx) = SourceSender::new_test_sender_with_buffer(1); (tx, BasicSourceConfig::new(rx)) } pub fn basic_source_with_data(data: &str) -> (SourceSender, BasicSourceConfig) { - let (tx, rx) = SourceSender::new_with_buffer(1); + let (tx, rx) = SourceSender::new_test_sender_with_buffer(1); (tx, BasicSourceConfig::new_with_data(rx, data)) } @@ -43,7 +43,7 @@ pub fn basic_source_with_event_counter( force_shutdown: bool, ) -> (SourceSender, BasicSourceConfig, Arc) { let event_counter = Arc::new(AtomicUsize::new(0)); - let (tx, rx) = SourceSender::new_with_buffer(1); + let (tx, rx) = SourceSender::new_test_sender_with_buffer(1); let mut source = BasicSourceConfig::new_with_event_counter(rx, Arc::clone(&event_counter)); source.set_force_shutdown(force_shutdown); @@ -75,7 +75,7 @@ pub const fn backpressure_sink(num_to_consume: usize) -> BackpressureSinkConfig } pub fn basic_sink(channel_size: usize) -> (impl Stream, BasicSinkConfig) { - let (tx, rx) = SourceSender::new_with_buffer(channel_size); + let (tx, rx) = SourceSender::new_test_sender_with_buffer(channel_size); let sink = BasicSinkConfig::new(tx, true); (rx.into_stream(), sink) } @@ -84,7 +84,7 @@ pub fn basic_sink_with_data( channel_size: usize, data: &str, ) -> (impl Stream, BasicSinkConfig) { - let (tx, rx) = SourceSender::new_with_buffer(channel_size); + let (tx, rx) = SourceSender::new_test_sender_with_buffer(channel_size); let sink = BasicSinkConfig::new_with_data(tx, true, data); (rx.into_stream(), sink) } @@ -92,7 +92,7 @@ pub fn basic_sink_with_data( pub fn basic_sink_failing_healthcheck( channel_size: usize, ) -> (impl Stream, BasicSinkConfig) { - let (tx, rx) = SourceSender::new_with_buffer(channel_size); + let (tx, rx) = SourceSender::new_test_sender_with_buffer(channel_size); let sink = BasicSinkConfig::new(tx, false); (rx.into_stream(), sink) } diff --git a/src/topology/builder.rs b/src/topology/builder.rs index 4a858acb7d113..b7ace14acd57b 100644 --- a/src/topology/builder.rs +++ b/src/topology/builder.rs @@ -20,6 +20,7 @@ use vector_common::internal_event::{ self, CountByteSize, EventsSent, InternalEventHandle as _, Registered, }; use vector_core::config::LogNamespace; +use vector_core::transform::update_runtime_schema_definition; use vector_core::{ buffers::{ topology::{ @@ -242,7 +243,7 @@ impl<'a> Builder<'a> { let mut schema_definitions = HashMap::with_capacity(source_outputs.len()); for output in source_outputs.into_iter() { - let mut rx = builder.add_source_output(output.clone()); + let mut rx = builder.add_source_output(output.clone(), key.clone()); let (mut fanout, control) = Fanout::new(); let source = Arc::new(key.clone()); @@ -735,6 +736,7 @@ fn build_transform( node.input_details.data_type(), node.typetag, &node.key, + &node.outputs, ), } } @@ -744,7 +746,7 @@ fn build_sync_transform( node: TransformNode, input_rx: BufferReceiver, ) -> (Task, HashMap) { - let (outputs, controls) = TransformOutputs::new(node.outputs); + let (outputs, controls) = TransformOutputs::new(node.outputs, &node.key); let runner = Runner::new(t, input_rx, node.input_details.data_type(), outputs); let transform = if node.enable_concurrency { @@ -926,6 +928,7 @@ fn build_task_transform( input_type: DataType, typetag: &str, key: &ComponentKey, + outputs: &[TransformOutput], ) -> (Task, HashMap) { let (mut fanout, control) = Fanout::new(); @@ -941,8 +944,30 @@ fn build_task_transform( )) }); let events_sent = register!(EventsSent::from(internal_event::Output(None))); + let output_id = Arc::new(OutputId { + component: key.clone(), + port: None, + }); + + // Task transforms can only write to the default output, so only a single schema def map is needed + let schema_definition_map = outputs + .iter() + .find(|x| x.port.is_none()) + .expect("output for default port required for task transforms") + .log_schema_definitions + .clone() + .into_iter() + .map(|(key, value)| (key, Arc::new(value))) + .collect(); + let stream = t .transform(Box::pin(filtered)) + .map(move |mut events| { + for event in events.iter_events_mut() { + update_runtime_schema_definition(event, &output_id, &schema_definition_map); + } + events + }) .inspect(move |events: &EventArray| { events_sent.emit(CountByteSize( events.len(), diff --git a/src/topology/test/compliance.rs b/src/topology/test/compliance.rs index a716d29593998..8f4602aa1bba3 100644 --- a/src/topology/test/compliance.rs +++ b/src/topology/test/compliance.rs @@ -2,8 +2,10 @@ use std::sync::Arc; use tokio::sync::oneshot::{channel, Receiver}; use vector_common::config::ComponentKey; +use vector_core::config::OutputId; use vector_core::event::{Event, EventArray, EventContainer, LogEvent}; +use crate::config::schema::Definition; use crate::{ config::{unit_test::UnitTestSourceConfig, ConfigBuilder}, test_util::{ @@ -57,6 +59,10 @@ async fn test_function_transform_single_event() { assert_eq!(events.len(), 1); original_event.set_source_id(Arc::new(ComponentKey::from("in"))); + original_event.set_upstream_id(Arc::new(OutputId::from("transform"))); + original_event + .metadata_mut() + .set_schema_definition(&Arc::new(Definition::default_legacy_namespace())); let event = events.remove(0); assert_eq!(original_event, event); @@ -78,6 +84,10 @@ async fn test_sync_transform_single_event() { assert_eq!(events.len(), 1); original_event.set_source_id(Arc::new(ComponentKey::from("in"))); + original_event.set_upstream_id(Arc::new(OutputId::from("transform"))); + original_event + .metadata_mut() + .set_schema_definition(&Arc::new(Definition::default_legacy_namespace())); let event = events.remove(0); assert_eq!(original_event, event); @@ -98,6 +108,10 @@ async fn test_task_transform_single_event() { assert_eq!(events.len(), 1); original_event.set_source_id(Arc::new(ComponentKey::from("in"))); + original_event.set_upstream_id(Arc::new(OutputId::from("transform"))); + original_event + .metadata_mut() + .set_schema_definition(&Arc::new(Definition::default_legacy_namespace())); let event = events.remove(0); assert_eq!(original_event, event); diff --git a/src/topology/test/mod.rs b/src/topology/test/mod.rs index aa5720382e96c..b8b9c3a0fd5d0 100644 --- a/src/topology/test/mod.rs +++ b/src/topology/test/mod.rs @@ -7,6 +7,7 @@ use std::{ }, }; +use crate::schema::Definition; use crate::{ config::{Config, ConfigDiff, SinkOuter}, event::{into_event_stream, Event, EventArray, EventContainer, LogEvent}, @@ -27,6 +28,7 @@ use tokio::{ }; use vector_buffers::{BufferConfig, BufferType, WhenFull}; use vector_common::config::ComponentKey; +use vector_core::config::OutputId; mod backpressure; mod compliance; @@ -149,6 +151,10 @@ async fn topology_source_and_sink() { let res = out1.flat_map(into_event_stream).collect::>().await; event.set_source_id(Arc::new(ComponentKey::from("in1"))); + event.set_upstream_id(Arc::new(OutputId::from("test"))); + event + .metadata_mut() + .set_schema_definition(&Arc::new(Definition::default_legacy_namespace())); assert_eq!(vec![event], res); } @@ -184,6 +190,16 @@ async fn topology_multiple_sources() { event1.set_source_id(Arc::new(ComponentKey::from("in1"))); event2.set_source_id(Arc::new(ComponentKey::from("in2"))); + event1.set_upstream_id(Arc::new(OutputId::from("test"))); + event1 + .metadata_mut() + .set_schema_definition(&Arc::new(Definition::default_legacy_namespace())); + + event2.set_upstream_id(Arc::new(OutputId::from("test"))); + event2 + .metadata_mut() + .set_schema_definition(&Arc::new(Definition::default_legacy_namespace())); + assert_eq!(out_event1, Some(event1.into())); assert_eq!(out_event2, Some(event2.into())); } @@ -218,6 +234,12 @@ async fn topology_multiple_sinks() { // We should see that both sinks got the exact same event: event.set_source_id(Arc::new(ComponentKey::from("in1"))); + + event.set_upstream_id(Arc::new(OutputId::from("test"))); + event + .metadata_mut() + .set_schema_definition(&Arc::new(Definition::default_legacy_namespace())); + let expected = vec![event]; assert_eq!(expected, res1); assert_eq!(expected, res2); @@ -293,6 +315,11 @@ async fn topology_remove_one_source() { event1.set_source_id(Arc::new(ComponentKey::from("in1"))); + event1.set_upstream_id(Arc::new(OutputId::from("test"))); + event1 + .metadata_mut() + .set_schema_definition(&Arc::new(Definition::default_legacy_namespace())); + let res = h_out1.await.unwrap(); assert_eq!(vec![event1], res); } @@ -332,6 +359,11 @@ async fn topology_remove_one_sink() { event.set_source_id(Arc::new(ComponentKey::from("in1"))); + event.set_upstream_id(Arc::new(OutputId::from("test"))); + event + .metadata_mut() + .set_schema_definition(&Arc::new(Definition::default_legacy_namespace())); + assert_eq!(vec![event], res1); assert_eq!(Vec::::new(), res2); } @@ -442,6 +474,11 @@ async fn topology_swap_source() { assert_eq!(Vec::::new(), res1); event2.set_source_id(Arc::new(ComponentKey::from("in2"))); + event2.set_upstream_id(Arc::new(OutputId::from("test"))); + event2 + .metadata_mut() + .set_schema_definition(&Arc::new(Definition::default_legacy_namespace())); + assert_eq!(vec![event2], res2); } @@ -554,6 +591,10 @@ async fn topology_swap_sink() { assert_eq!(Vec::::new(), res1); event1.set_source_id(Arc::new(ComponentKey::from("in1"))); + event1.set_upstream_id(Arc::new(OutputId::from("test"))); + event1 + .metadata_mut() + .set_schema_definition(&Arc::new(Definition::default_legacy_namespace())); assert_eq!(vec![event1], res2); } @@ -663,6 +704,15 @@ async fn topology_rebuild_connected() { event1.set_source_id(Arc::new(ComponentKey::from("in1"))); event2.set_source_id(Arc::new(ComponentKey::from("in1"))); + event1.set_upstream_id(Arc::new(OutputId::from("test"))); + event2.set_upstream_id(Arc::new(OutputId::from("test"))); + event1 + .metadata_mut() + .set_schema_definition(&Arc::new(Definition::default_legacy_namespace())); + event2 + .metadata_mut() + .set_schema_definition(&Arc::new(Definition::default_legacy_namespace())); + assert_eq!(vec![event1, event2], res); } @@ -715,6 +765,10 @@ async fn topology_rebuild_connected_transform() { assert_eq!(Vec::::new(), res1); event.set_source_id(Arc::new(ComponentKey::from("in1"))); + event.set_upstream_id(Arc::new(OutputId::from("test"))); + event + .metadata_mut() + .set_schema_definition(&Arc::new(Definition::default_legacy_namespace())); assert_eq!(vec![event], res2); } diff --git a/src/transforms/aggregate.rs b/src/transforms/aggregate.rs index a591305764df1..ca5a7ae8679cb 100644 --- a/src/transforms/aggregate.rs +++ b/src/transforms/aggregate.rs @@ -156,8 +156,10 @@ mod tests { use tokio::sync::mpsc; use tokio_stream::wrappers::ReceiverStream; use vector_common::config::ComponentKey; + use vrl::value::Kind; use super::*; + use crate::schema::Definition; use crate::{ event::{metric, Event, Metric}, test_util::components::assert_transform_compliance, @@ -174,8 +176,13 @@ mod tests { kind: metric::MetricKind, value: metric::MetricValue, ) -> Event { - Event::Metric(Metric::new(name, kind, value)) + let mut event = Event::Metric(Metric::new(name, kind, value)) .with_source_id(Arc::new(ComponentKey::from("in"))) + .with_upstream_id(Arc::new(OutputId::from("transform"))); + event.metadata_mut().set_schema_definition(&Arc::new( + Definition::new_with_default_metadata(Kind::any_object(), [LogNamespace::Legacy]), + )); + event } #[test] diff --git a/src/transforms/dedupe.rs b/src/transforms/dedupe.rs index 4a6497628d78a..513a91ce9115e 100644 --- a/src/transforms/dedupe.rs +++ b/src/transforms/dedupe.rs @@ -289,7 +289,9 @@ mod tests { use tokio::sync::mpsc; use tokio_stream::wrappers::ReceiverStream; use vector_common::config::ComponentKey; + use vector_core::config::OutputId; + use crate::config::schema::Definition; use crate::{ event::{Event, LogEvent, Value}, test_util::components::assert_transform_compliance, @@ -363,6 +365,11 @@ mod tests { let new_event = out.recv().await.unwrap(); event1.set_source_id(Arc::new(ComponentKey::from("in"))); + event1.set_upstream_id(Arc::new(OutputId::from("transform"))); + // the schema definition is copied from the source for dedupe + event1 + .metadata_mut() + .set_schema_definition(&Arc::new(Definition::default_legacy_namespace())); assert_eq!(new_event, event1); // Second event differs in matched field so should be output even though it @@ -371,6 +378,11 @@ mod tests { let new_event = out.recv().await.unwrap(); event2.set_source_id(Arc::new(ComponentKey::from("in"))); + event2.set_upstream_id(Arc::new(OutputId::from("transform"))); + // the schema definition is copied from the source for dedupe + event2 + .metadata_mut() + .set_schema_definition(&Arc::new(Definition::default_legacy_namespace())); assert_eq!(new_event, event2); // Third event has the same value for "matched" as first event, so it should be dropped. @@ -413,6 +425,11 @@ mod tests { let new_event = out.recv().await.unwrap(); event1.set_source_id(Arc::new(ComponentKey::from("in"))); + event1.set_upstream_id(Arc::new(OutputId::from("transform"))); + // the schema definition is copied from the source for dedupe + event1 + .metadata_mut() + .set_schema_definition(&Arc::new(Definition::default_legacy_namespace())); assert_eq!(new_event, event1); // Second event has a different matched field name with the same value, @@ -421,6 +438,11 @@ mod tests { let new_event = out.recv().await.unwrap(); event2.set_source_id(Arc::new(ComponentKey::from("in"))); + event2.set_upstream_id(Arc::new(OutputId::from("transform"))); + // the schema definition is copied from the source for dedupe + event2 + .metadata_mut() + .set_schema_definition(&Arc::new(Definition::default_legacy_namespace())); assert_eq!(new_event, event2); drop(tx); @@ -466,6 +488,11 @@ mod tests { let new_event = out.recv().await.unwrap(); event1.set_source_id(Arc::new(ComponentKey::from("in"))); + event1.set_upstream_id(Arc::new(OutputId::from("transform"))); + // the schema definition is copied from the source for dedupe + event1 + .metadata_mut() + .set_schema_definition(&Arc::new(Definition::default_legacy_namespace())); assert_eq!(new_event, event1); // Second event is the same just with different field order, so it @@ -511,6 +538,12 @@ mod tests { let new_event = out.recv().await.unwrap(); event1.set_source_id(Arc::new(ComponentKey::from("in"))); + event1.set_upstream_id(Arc::new(OutputId::from("transform"))); + + // the schema definition is copied from the source for dedupe + event1 + .metadata_mut() + .set_schema_definition(&Arc::new(Definition::default_legacy_namespace())); assert_eq!(new_event, event1); // Second event gets output because it's not a dupe. This causes the first @@ -519,6 +552,12 @@ mod tests { let new_event = out.recv().await.unwrap(); event2.set_source_id(Arc::new(ComponentKey::from("in"))); + event2.set_upstream_id(Arc::new(OutputId::from("transform"))); + // the schema definition is copied from the source for dedupe + event2 + .metadata_mut() + .set_schema_definition(&Arc::new(Definition::default_legacy_namespace())); + assert_eq!(new_event, event2); // Third event is a dupe but gets output anyway because the first @@ -568,6 +607,11 @@ mod tests { let new_event = out.recv().await.unwrap(); event1.set_source_id(Arc::new(ComponentKey::from("in"))); + event1.set_upstream_id(Arc::new(OutputId::from("transform"))); + // the schema definition is copied from the source for dedupe + event1 + .metadata_mut() + .set_schema_definition(&Arc::new(Definition::default_legacy_namespace())); assert_eq!(new_event, event1); // Second event should also get passed through even though the string @@ -576,6 +620,11 @@ mod tests { let new_event = out.recv().await.unwrap(); event2.set_source_id(Arc::new(ComponentKey::from("in"))); + event2.set_upstream_id(Arc::new(OutputId::from("transform"))); + // the schema definition is copied from the source for dedupe + event2 + .metadata_mut() + .set_schema_definition(&Arc::new(Definition::default_legacy_namespace())); assert_eq!(new_event, event2); drop(tx); @@ -621,6 +670,11 @@ mod tests { let new_event = out.recv().await.unwrap(); event1.set_source_id(Arc::new(ComponentKey::from("in"))); + event1.set_upstream_id(Arc::new(OutputId::from("transform"))); + // the schema definition is copied from the source for dedupe + event1 + .metadata_mut() + .set_schema_definition(&Arc::new(Definition::default_legacy_namespace())); assert_eq!(new_event, event1); // Second event should also get passed through even though the string @@ -629,6 +683,11 @@ mod tests { let new_event = out.recv().await.unwrap(); event2.set_source_id(Arc::new(ComponentKey::from("in"))); + event2.set_upstream_id(Arc::new(OutputId::from("transform"))); + // the schema definition is copied from the source for dedupe + event2 + .metadata_mut() + .set_schema_definition(&Arc::new(Definition::default_legacy_namespace())); assert_eq!(new_event, event2); drop(tx); @@ -667,6 +726,11 @@ mod tests { let new_event = out.recv().await.unwrap(); event1.set_source_id(Arc::new(ComponentKey::from("in"))); + event1.set_upstream_id(Arc::new(OutputId::from("transform"))); + // the schema definition is copied from the source for dedupe + event1 + .metadata_mut() + .set_schema_definition(&Arc::new(Definition::default_legacy_namespace())); assert_eq!(new_event, event1); // Second event should also get passed through as null is different than @@ -675,6 +739,11 @@ mod tests { let new_event = out.recv().await.unwrap(); event2.set_source_id(Arc::new(ComponentKey::from("in"))); + event2.set_upstream_id(Arc::new(OutputId::from("transform"))); + // the schema definition is copied from the source for dedupe + event2 + .metadata_mut() + .set_schema_definition(&Arc::new(Definition::default_legacy_namespace())); assert_eq!(new_event, event2); drop(tx); diff --git a/src/transforms/filter.rs b/src/transforms/filter.rs index 95e8877bee255..e14f0c7347ab7 100644 --- a/src/transforms/filter.rs +++ b/src/transforms/filter.rs @@ -104,6 +104,7 @@ mod test { use vector_core::event::{Metric, MetricKind, MetricValue}; use super::*; + use crate::config::schema::Definition; use crate::{ conditions::ConditionConfig, event::{Event, LogEvent}, @@ -129,6 +130,10 @@ mod test { tx.send(log.clone()).await.unwrap(); log.set_source_id(Arc::new(ComponentKey::from("in"))); + log.set_upstream_id(Arc::new(OutputId::from("transform"))); + log.metadata_mut() + .set_schema_definition(&Arc::new(Definition::default_legacy_namespace())); + assert_eq!(out.recv().await.unwrap(), log); let metric = Event::from(Metric::new( diff --git a/src/transforms/log_to_metric.rs b/src/transforms/log_to_metric.rs index ad44b0a9e6d55..cb99cb186de8b 100644 --- a/src/transforms/log_to_metric.rs +++ b/src/transforms/log_to_metric.rs @@ -1,3 +1,4 @@ +use std::sync::Arc; use std::{collections::HashMap, num::ParseFloatError}; use chrono::Utc; @@ -5,6 +6,7 @@ use indexmap::IndexMap; use vector_config::configurable_component; use vector_core::config::LogNamespace; +use crate::config::schema::Definition; use crate::{ config::{ DataType, GenerateConfig, Input, OutputId, TransformConfig, TransformContext, @@ -256,7 +258,10 @@ fn to_metric(config: &MetricConfig, event: &Event) -> Result Vec { let log_namespace = global_log_namespace.merge(self.log_namespace); - let mut schema_definition = - Definition::default_for_namespace(&BTreeSet::from([log_namespace])) - .with_event_field(&owned_value_path!("name"), Kind::bytes(), None) - .with_event_field( - &owned_value_path!("namespace"), - Kind::bytes().or_undefined(), - None, - ) - .with_event_field( - &owned_value_path!("tags"), - Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(), - None, - ) - .with_event_field(&owned_value_path!("kind"), Kind::bytes(), None) - .with_event_field( - &owned_value_path!("counter"), - Kind::object(Collection::empty().with_known("value", Kind::float())) - .or_undefined(), - None, - ) - .with_event_field( - &owned_value_path!("gauge"), - Kind::object(Collection::empty().with_known("value", Kind::float())) - .or_undefined(), - None, - ) - .with_event_field( - &owned_value_path!("set"), - Kind::object(Collection::empty().with_known( - "values", - Kind::array(Collection::empty().with_unknown(Kind::bytes())), - )) - .or_undefined(), - None, - ) - .with_event_field( - &owned_value_path!("distribution"), - Kind::object( - Collection::empty() - .with_known( - "samples", - Kind::array( - Collection::empty().with_unknown(Kind::object( - Collection::empty() - .with_known("value", Kind::float()) - .with_known("rate", Kind::integer()), - )), - ), - ) - .with_known("statistic", Kind::bytes()), - ) - .or_undefined(), - None, - ) - .with_event_field( - &owned_value_path!("aggregated_histogram"), - Kind::object( - Collection::empty() - .with_known( - "buckets", - Kind::array( - Collection::empty().with_unknown(Kind::object( - Collection::empty() - .with_known("upper_limit", Kind::float()) - .with_known("count", Kind::integer()), - )), - ), - ) - .with_known("count", Kind::integer()) - .with_known("sum", Kind::float()), - ) - .or_undefined(), - None, - ) - .with_event_field( - &owned_value_path!("aggregated_summary"), - Kind::object( - Collection::empty() - .with_known( - "quantiles", - Kind::array( - Collection::empty().with_unknown(Kind::object( - Collection::empty() - .with_known("quantile", Kind::float()) - .with_known("value", Kind::float()), - )), - ), - ) - .with_known("count", Kind::integer()) - .with_known("sum", Kind::float()), - ) - .or_undefined(), - None, - ) - .with_event_field( - &owned_value_path!("sketch"), - Kind::any().or_undefined(), - None, - ); - - match log_namespace { - LogNamespace::Vector => { - // from serializing the Metric (Legacy moves it to another field) - schema_definition = schema_definition.with_event_field( - &owned_value_path!("timestamp"), - Kind::bytes().or_undefined(), - None, - ); - - // This is added as a "marker" field to determine which namespace is being used at runtime. - // This is normally handled automatically by sources, but this is a special case. - schema_definition = schema_definition.with_metadata_field( - &owned_value_path!("vector"), - Kind::object(Collection::empty()), - None, - ); - } - LogNamespace::Legacy => { - if let Some(timestamp_key) = log_schema().timestamp_key() { - schema_definition = - schema_definition.with_event_field(timestamp_key, Kind::timestamp(), None); - } - - schema_definition = schema_definition.with_event_field( - &parse_value_path(log_schema().host_key()).expect("valid host key"), - Kind::bytes().or_undefined(), - None, - ); - } - } + let schema_definition = schema_definition(log_namespace); vec![TransformOutput::new( DataType::Log, @@ -249,6 +120,137 @@ impl TransformConfig for MetricToLogConfig { } } +fn schema_definition(log_namespace: LogNamespace) -> Definition { + let mut schema_definition = Definition::default_for_namespace(&BTreeSet::from([log_namespace])) + .with_event_field(&owned_value_path!("name"), Kind::bytes(), None) + .with_event_field( + &owned_value_path!("namespace"), + Kind::bytes().or_undefined(), + None, + ) + .with_event_field( + &owned_value_path!("tags"), + Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(), + None, + ) + .with_event_field(&owned_value_path!("kind"), Kind::bytes(), None) + .with_event_field( + &owned_value_path!("counter"), + Kind::object(Collection::empty().with_known("value", Kind::float())).or_undefined(), + None, + ) + .with_event_field( + &owned_value_path!("gauge"), + Kind::object(Collection::empty().with_known("value", Kind::float())).or_undefined(), + None, + ) + .with_event_field( + &owned_value_path!("set"), + Kind::object(Collection::empty().with_known( + "values", + Kind::array(Collection::empty().with_unknown(Kind::bytes())), + )) + .or_undefined(), + None, + ) + .with_event_field( + &owned_value_path!("distribution"), + Kind::object( + Collection::empty() + .with_known( + "samples", + Kind::array( + Collection::empty().with_unknown(Kind::object( + Collection::empty() + .with_known("value", Kind::float()) + .with_known("rate", Kind::integer()), + )), + ), + ) + .with_known("statistic", Kind::bytes()), + ) + .or_undefined(), + None, + ) + .with_event_field( + &owned_value_path!("aggregated_histogram"), + Kind::object( + Collection::empty() + .with_known( + "buckets", + Kind::array( + Collection::empty().with_unknown(Kind::object( + Collection::empty() + .with_known("upper_limit", Kind::float()) + .with_known("count", Kind::integer()), + )), + ), + ) + .with_known("count", Kind::integer()) + .with_known("sum", Kind::float()), + ) + .or_undefined(), + None, + ) + .with_event_field( + &owned_value_path!("aggregated_summary"), + Kind::object( + Collection::empty() + .with_known( + "quantiles", + Kind::array( + Collection::empty().with_unknown(Kind::object( + Collection::empty() + .with_known("quantile", Kind::float()) + .with_known("value", Kind::float()), + )), + ), + ) + .with_known("count", Kind::integer()) + .with_known("sum", Kind::float()), + ) + .or_undefined(), + None, + ) + .with_event_field( + &owned_value_path!("sketch"), + Kind::any().or_undefined(), + None, + ); + + match log_namespace { + LogNamespace::Vector => { + // from serializing the Metric (Legacy moves it to another field) + schema_definition = schema_definition.with_event_field( + &owned_value_path!("timestamp"), + Kind::bytes().or_undefined(), + None, + ); + + // This is added as a "marker" field to determine which namespace is being used at runtime. + // This is normally handled automatically by sources, but this is a special case. + schema_definition = schema_definition.with_metadata_field( + &owned_value_path!("vector"), + Kind::object(Collection::empty()), + None, + ); + } + LogNamespace::Legacy => { + if let Some(timestamp_key) = log_schema().timestamp_key() { + schema_definition = + schema_definition.with_event_field(timestamp_key, Kind::timestamp(), None); + } + + schema_definition = schema_definition.with_event_field( + &parse_value_path(log_schema().host_key()).expect("valid host key"), + Kind::bytes().or_undefined(), + None, + ); + } + } + schema_definition +} + #[derive(Clone, Debug)] pub struct MetricToLog { host_tag: String, @@ -412,6 +414,8 @@ mod tests { .with_timestamp(Some(ts())); let mut metadata = counter.metadata().clone(); metadata.set_source_id(Arc::new(ComponentKey::from("in"))); + metadata.set_upstream_id(Arc::new(OutputId::from("transform"))); + metadata.set_schema_definition(&Arc::new(schema_definition(LogNamespace::Legacy))); let log = do_transform(counter).await.unwrap(); let collected: Vec<_> = log.all_fields().unwrap().collect(); @@ -440,6 +444,8 @@ mod tests { .with_timestamp(Some(ts())); let mut metadata = gauge.metadata().clone(); metadata.set_source_id(Arc::new(ComponentKey::from("in"))); + metadata.set_upstream_id(Arc::new(OutputId::from("transform"))); + metadata.set_schema_definition(&Arc::new(schema_definition(LogNamespace::Legacy))); let log = do_transform(gauge).await.unwrap(); let collected: Vec<_> = log.all_fields().unwrap().collect(); @@ -468,6 +474,8 @@ mod tests { .with_timestamp(Some(ts())); let mut metadata = set.metadata().clone(); metadata.set_source_id(Arc::new(ComponentKey::from("in"))); + metadata.set_upstream_id(Arc::new(OutputId::from("transform"))); + metadata.set_schema_definition(&Arc::new(schema_definition(LogNamespace::Legacy))); let log = do_transform(set).await.unwrap(); let collected: Vec<_> = log.all_fields().unwrap().collect(); @@ -498,6 +506,8 @@ mod tests { .with_timestamp(Some(ts())); let mut metadata = distro.metadata().clone(); metadata.set_source_id(Arc::new(ComponentKey::from("in"))); + metadata.set_upstream_id(Arc::new(OutputId::from("transform"))); + metadata.set_schema_definition(&Arc::new(schema_definition(LogNamespace::Legacy))); let log = do_transform(distro).await.unwrap(); let collected: Vec<_> = log.all_fields().unwrap().collect(); @@ -547,6 +557,8 @@ mod tests { .with_timestamp(Some(ts())); let mut metadata = histo.metadata().clone(); metadata.set_source_id(Arc::new(ComponentKey::from("in"))); + metadata.set_upstream_id(Arc::new(OutputId::from("transform"))); + metadata.set_schema_definition(&Arc::new(schema_definition(LogNamespace::Legacy))); let log = do_transform(histo).await.unwrap(); let collected: Vec<_> = log.all_fields().unwrap().collect(); @@ -594,6 +606,8 @@ mod tests { .with_timestamp(Some(ts())); let mut metadata = summary.metadata().clone(); metadata.set_source_id(Arc::new(ComponentKey::from("in"))); + metadata.set_upstream_id(Arc::new(OutputId::from("transform"))); + metadata.set_schema_definition(&Arc::new(schema_definition(LogNamespace::Legacy))); let log = do_transform(summary).await.unwrap(); let collected: Vec<_> = log.all_fields().unwrap().collect(); diff --git a/src/transforms/reduce/mod.rs b/src/transforms/reduce/mod.rs index 455a4b142e4d6..90c9294b0cb63 100644 --- a/src/transforms/reduce/mod.rs +++ b/src/transforms/reduce/mod.rs @@ -26,6 +26,7 @@ use crate::{ mod merge_strategy; +use crate::config::schema::Definition; use crate::event::Value; pub use merge_strategy::*; use vector_core::config::LogNamespace; @@ -133,94 +134,101 @@ impl TransformConfig for ReduceConfig { input_definitions: &[(OutputId, schema::Definition)], _: LogNamespace, ) -> Vec { - let mut output_definitions = HashMap::new(); - - for (output, input) in input_definitions { - let mut schema_definition = input.clone(); - - for (key, merge_strategy) in self.merge_strategies.iter() { - let key = if let Ok(key) = parse_target_path(key) { - key - } else { - continue; - }; - - let input_kind = match key.prefix { - PathPrefix::Event => schema_definition.event_kind().at_path(&key.path), - PathPrefix::Metadata => schema_definition.metadata_kind().at_path(&key.path), - }; - - let new_kind = match merge_strategy { - MergeStrategy::Discard | MergeStrategy::Retain => { - /* does not change the type */ - input_kind.clone() + // Events may be combined, so there isn't a true single "source" for events. + // All of the definitions must be merged. + let merged_definition: Definition = input_definitions + .iter() + .map(|(_output, definition)| definition.clone()) + .reduce(Definition::merge) + .unwrap_or_else(Definition::any); + + let mut schema_definition = merged_definition; + + for (key, merge_strategy) in self.merge_strategies.iter() { + let key = if let Ok(key) = parse_target_path(key) { + key + } else { + continue; + }; + + let input_kind = match key.prefix { + PathPrefix::Event => schema_definition.event_kind().at_path(&key.path), + PathPrefix::Metadata => schema_definition.metadata_kind().at_path(&key.path), + }; + + let new_kind = match merge_strategy { + MergeStrategy::Discard | MergeStrategy::Retain => { + /* does not change the type */ + input_kind.clone() + } + MergeStrategy::Sum | MergeStrategy::Max | MergeStrategy::Min => { + // only keeps integer / float values + match (input_kind.contains_integer(), input_kind.contains_float()) { + (true, true) => Kind::float().or_integer(), + (true, false) => Kind::integer(), + (false, true) => Kind::float(), + (false, false) => Kind::undefined(), } - MergeStrategy::Sum | MergeStrategy::Max | MergeStrategy::Min => { - // only keeps integer / float values - match (input_kind.contains_integer(), input_kind.contains_float()) { - (true, true) => Kind::float().or_integer(), - (true, false) => Kind::integer(), - (false, true) => Kind::float(), - (false, false) => Kind::undefined(), - } + } + MergeStrategy::Array => { + let unknown_kind = input_kind.clone(); + Kind::array(Collection::empty().with_unknown(unknown_kind)) + } + MergeStrategy::Concat => { + let mut new_kind = Kind::never(); + + if input_kind.contains_bytes() { + new_kind.add_bytes(); } - MergeStrategy::Array => { - let unknown_kind = input_kind.clone(); - Kind::array(Collection::empty().with_unknown(unknown_kind)) + if let Some(array) = input_kind.as_array() { + // array elements can be either any type that the field can be, or any + // element of the array + let array_elements = array.reduced_kind().union(input_kind.without_array()); + new_kind.add_array(Collection::empty().with_unknown(array_elements)); } - MergeStrategy::Concat => { - let mut new_kind = Kind::never(); - - if input_kind.contains_bytes() { - new_kind.add_bytes(); - } - if let Some(array) = input_kind.as_array() { - // array elements can be either any type that the field can be, or any - // element of the array - let array_elements = - array.reduced_kind().union(input_kind.without_array()); - new_kind.add_array(Collection::empty().with_unknown(array_elements)); - } - new_kind + new_kind + } + MergeStrategy::ConcatNewline | MergeStrategy::ConcatRaw => { + // can only produce bytes (or undefined) + if input_kind.contains_bytes() { + Kind::bytes() + } else { + Kind::undefined() } - MergeStrategy::ConcatNewline | MergeStrategy::ConcatRaw => { - // can only produce bytes (or undefined) - if input_kind.contains_bytes() { - Kind::bytes() - } else { - Kind::undefined() - } + } + MergeStrategy::ShortestArray | MergeStrategy::LongestArray => { + if let Some(array) = input_kind.as_array() { + Kind::array(array.clone()) + } else { + Kind::undefined() } - MergeStrategy::ShortestArray | MergeStrategy::LongestArray => { - if let Some(array) = input_kind.as_array() { - Kind::array(array.clone()) - } else { - Kind::undefined() - } + } + MergeStrategy::FlatUnique => { + let mut array_elements = input_kind.without_array().without_object(); + if let Some(array) = input_kind.as_array() { + array_elements = array_elements.union(array.reduced_kind()); } - MergeStrategy::FlatUnique => { - let mut array_elements = input_kind.without_array().without_object(); - if let Some(array) = input_kind.as_array() { - array_elements = array_elements.union(array.reduced_kind()); - } - if let Some(object) = input_kind.as_object() { - array_elements = array_elements.union(object.reduced_kind()); - } - Kind::array(Collection::empty().with_unknown(array_elements)) + if let Some(object) = input_kind.as_object() { + array_elements = array_elements.union(object.reduced_kind()); } - }; + Kind::array(Collection::empty().with_unknown(array_elements)) + } + }; - // all of the merge strategies are optional. They won't produce a value unless a value actually exists - let new_kind = if input_kind.contains_undefined() { - new_kind.or_undefined() - } else { - new_kind - }; + // all of the merge strategies are optional. They won't produce a value unless a value actually exists + let new_kind = if input_kind.contains_undefined() { + new_kind.or_undefined() + } else { + new_kind + }; - schema_definition = schema_definition.with_field(&key, new_kind, None); - } + schema_definition = schema_definition.with_field(&key, new_kind, None); + } - output_definitions.insert(output.clone(), schema_definition); + // the same schema definition is used for all inputs + let mut output_definitions = HashMap::new(); + for (output, _input) in input_definitions { + output_definitions.insert(output.clone(), schema_definition.clone()); } vec![TransformOutput::new(DataType::Log, output_definitions)] @@ -474,12 +482,15 @@ impl TaskTransform for Reduce { #[cfg(test)] mod test { + use enrichment::TableRegistry; use serde_json::json; + use std::sync::Arc; use tokio::sync::mpsc; use tokio_stream::wrappers::ReceiverStream; use vrl::value::Kind; use super::*; + use crate::config::schema::Definition; use crate::event::{LogEvent, Value}; use crate::test_util::components::assert_transform_compliance; use crate::transforms::test::create_topology; @@ -528,18 +539,33 @@ group_by = [ "request_id" ] .schema_definitions(true) .clone(); + let new_schema_definition = reduce_config.outputs( + TableRegistry::default(), + &[(OutputId::from("in"), Definition::default_legacy_namespace())], + LogNamespace::Legacy, + )[0] + .clone() + .log_schema_definitions + .get(&OutputId::from("in")) + .unwrap() + .clone(); + let (tx, rx) = mpsc::channel(1); let (topology, mut out) = create_topology(ReceiverStream::new(rx), reduce_config).await; let mut e_1 = LogEvent::from("test message 1"); e_1.insert("counter", 1); e_1.insert("request_id", "1"); - let metadata_1 = e_1.metadata().clone(); + let mut metadata_1 = e_1.metadata().clone(); + metadata_1.set_upstream_id(Arc::new(OutputId::from("transform"))); + metadata_1.set_schema_definition(&Arc::new(new_schema_definition.clone())); let mut e_2 = LogEvent::from("test message 2"); e_2.insert("counter", 2); e_2.insert("request_id", "2"); - let metadata_2 = e_2.metadata().clone(); + let mut metadata_2 = e_2.metadata().clone(); + metadata_2.set_upstream_id(Arc::new(OutputId::from("transform"))); + metadata_2.set_schema_definition(&Arc::new(new_schema_definition.clone())); let mut e_3 = LogEvent::from("test message 3"); e_3.insert("counter", 3); @@ -603,6 +629,18 @@ merge_strategies.baz = "max" assert_transform_compliance(async move { let (tx, rx) = mpsc::channel(1); + + let new_schema_definition = reduce_config.outputs( + TableRegistry::default(), + &[(OutputId::from("in"), Definition::default_legacy_namespace())], + LogNamespace::Legacy, + )[0] + .clone() + .log_schema_definitions + .get(&OutputId::from("in")) + .unwrap() + .clone(); + let (topology, mut out) = create_topology(ReceiverStream::new(rx), reduce_config).await; let mut e_1 = LogEvent::from("test message 1"); @@ -610,7 +648,9 @@ merge_strategies.baz = "max" e_1.insert("bar", "first bar"); e_1.insert("baz", 2); e_1.insert("request_id", "1"); - let metadata = e_1.metadata().clone(); + let mut metadata = e_1.metadata().clone(); + metadata.set_upstream_id(Arc::new(OutputId::from("transform"))); + metadata.set_schema_definition(&Arc::new(new_schema_definition.clone())); tx.send(e_1.into()).await.unwrap(); let mut e_2 = LogEvent::from("test message 2"); @@ -660,17 +700,32 @@ group_by = [ "request_id" ] assert_transform_compliance(async move { let (tx, rx) = mpsc::channel(1); + let new_schema_definition = reduce_config.outputs( + TableRegistry::default(), + &[(OutputId::from("in"), Definition::default_legacy_namespace())], + LogNamespace::Legacy, + )[0] + .clone() + .log_schema_definitions + .get(&OutputId::from("in")) + .unwrap() + .clone(); + let (topology, mut out) = create_topology(ReceiverStream::new(rx), reduce_config).await; let mut e_1 = LogEvent::from("test message 1"); e_1.insert("counter", 1); e_1.insert("request_id", "1"); - let metadata_1 = e_1.metadata().clone(); + let mut metadata_1 = e_1.metadata().clone(); + metadata_1.set_upstream_id(Arc::new(OutputId::from("transform"))); + metadata_1.set_schema_definition(&Arc::new(new_schema_definition.clone())); tx.send(e_1.into()).await.unwrap(); let mut e_2 = LogEvent::from("test message 2"); e_2.insert("counter", 2); - let metadata_2 = e_2.metadata().clone(); + let mut metadata_2 = e_2.metadata().clone(); + metadata_2.set_upstream_id(Arc::new(OutputId::from("transform"))); + metadata_2.set_schema_definition(&Arc::new(new_schema_definition)); tx.send(e_2.into()).await.unwrap(); let mut e_3 = LogEvent::from("test message 3"); @@ -852,20 +907,37 @@ merge_strategies.bar = "concat" assert_transform_compliance(async move { let (tx, rx) = mpsc::channel(1); + + let new_schema_definition = reduce_config.outputs( + TableRegistry::default(), + &[(OutputId::from("in"), Definition::default_legacy_namespace())], + LogNamespace::Legacy, + )[0] + .clone() + .log_schema_definitions + .get(&OutputId::from("in")) + .unwrap() + .clone(); + let (topology, mut out) = create_topology(ReceiverStream::new(rx), reduce_config).await; let mut e_1 = LogEvent::from("test message 1"); e_1.insert("foo", json!([1, 3])); e_1.insert("bar", json!([1, 3])); e_1.insert("request_id", "1"); - let metadata_1 = e_1.metadata().clone(); + let mut metadata_1 = e_1.metadata().clone(); + metadata_1.set_upstream_id(Arc::new(OutputId::from("transform"))); + metadata_1.set_schema_definition(&Arc::new(new_schema_definition.clone())); + tx.send(e_1.into()).await.unwrap(); let mut e_2 = LogEvent::from("test message 2"); e_2.insert("foo", json!([2, 4])); e_2.insert("bar", json!([2, 4])); e_2.insert("request_id", "2"); - let metadata_2 = e_2.metadata().clone(); + let mut metadata_2 = e_2.metadata().clone(); + metadata_2.set_upstream_id(Arc::new(OutputId::from("transform"))); + metadata_2.set_schema_definition(&Arc::new(new_schema_definition)); tx.send(e_2.into()).await.unwrap(); let mut e_3 = LogEvent::from("test message 3"); diff --git a/src/transforms/remap.rs b/src/transforms/remap.rs index a21c135919b28..d718c89f5560d 100644 --- a/src/transforms/remap.rs +++ b/src/transforms/remap.rs @@ -1,5 +1,4 @@ use std::collections::HashMap; -use std::sync::Arc; use std::{ collections::BTreeMap, fs::File, @@ -346,8 +345,6 @@ where drop_on_error: bool, drop_on_abort: bool, reroute_dropped: bool, - default_schema_definition: Arc, - dropped_schema_definition: Arc, runner: Runner, metric_tag_values: MetricTagValues, } @@ -414,28 +411,6 @@ where program: Program, runner: Runner, ) -> crate::Result { - let default_schema_definition = context - .schema_definitions - .get(&None) - .expect("default schema required") - // TODO we can now have multiple possible definitions. - // This is going to need to be updated to store these possible definitions and then - // choose the correct one based on the input the event has come from. - .iter() - .map(|(_output, definition)| definition.clone()) - .next() - .unwrap_or_else(Definition::any); - - let dropped_schema_definition = context - .schema_definitions - .get(&Some(DROPPED.to_owned())) - .or_else(|| context.schema_definitions.get(&None)) - .expect("dropped schema required") - .iter() - .map(|(_output, definition)| definition.clone()) - .next() - .unwrap_or_else(Definition::any); - Ok(Remap { component_key: context.key.clone(), program, @@ -445,8 +420,6 @@ where drop_on_error: config.drop_on_error, drop_on_abort: config.drop_on_abort, reroute_dropped: config.reroute_dropped, - default_schema_definition: Arc::new(default_schema_definition), - dropped_schema_definition: Arc::new(dropped_schema_definition), runner, metric_tag_values: config.metric_tag_values, }) @@ -562,13 +535,11 @@ where match result { Ok(_) => match target.into_events(log_namespace) { - TargetEvents::One(event) => { - push_default(event, output, &self.default_schema_definition) + TargetEvents::One(event) => push_default(event, output), + TargetEvents::Logs(events) => events.for_each(|event| push_default(event, output)), + TargetEvents::Traces(events) => { + events.for_each(|event| push_default(event, output)) } - TargetEvents::Logs(events) => events - .for_each(|event| push_default(event, output, &self.default_schema_definition)), - TargetEvents::Traces(events) => events - .for_each(|event| push_default(event, output, &self.default_schema_definition)), }, Err(reason) => { let (reason, error, drop) = match reason { @@ -592,12 +563,12 @@ where if !drop { let event = original_event.expect("event will be set"); - push_default(event, output, &self.default_schema_definition); + push_default(event, output); } else if self.reroute_dropped { let mut event = original_event.expect("event will be set"); self.annotate_dropped(&mut event, reason, error); - push_dropped(event, output, &self.dropped_schema_definition); + push_dropped(event, output); } } } @@ -605,29 +576,13 @@ where } #[inline] -fn push_default( - mut event: Event, - output: &mut TransformOutputsBuf, - schema_definition: &Arc, -) { - event - .metadata_mut() - .set_schema_definition(schema_definition); - - output.push(event) +fn push_default(event: Event, output: &mut TransformOutputsBuf) { + output.push(None, event) } #[inline] -fn push_dropped( - mut event: Event, - output: &mut TransformOutputsBuf, - schema_definition: &Arc, -) { - event - .metadata_mut() - .set_schema_definition(schema_definition); - - output.push_named(DROPPED, event) +fn push_dropped(event: Event, output: &mut TransformOutputsBuf) { + output.push(Some(DROPPED), event); } #[derive(Debug, Snafu)] @@ -644,6 +599,7 @@ pub enum BuildError { #[cfg(test)] mod tests { use std::collections::{HashMap, HashSet}; + use std::sync::Arc; use indoc::{formatdoc, indoc}; use vector_core::{config::GlobalOptions, event::EventMetadata, metric_tags}; @@ -765,10 +721,6 @@ mod tests { let result1 = transform_one(&mut tform, event1).unwrap(); assert_eq!(get_field_string(&result1, "message"), "event1"); assert_eq!(get_field_string(&result1, "foo"), "bar"); - assert_eq!( - result1.metadata().schema_definition(), - &test_default_schema_definition() - ); assert!(tform.runner().runtime.is_empty()); let event2 = { @@ -778,10 +730,6 @@ mod tests { let result2 = transform_one(&mut tform, event2).unwrap(); assert_eq!(get_field_string(&result2, "message"), "event2"); assert_eq!(result2.as_log().get("foo"), Some(&Value::Null)); - assert_eq!( - result2.metadata().schema_definition(), - &test_default_schema_definition() - ); assert!(tform.runner().runtime.is_empty()); } @@ -856,11 +804,6 @@ mod tests { assert_eq!(get_field_string(&result, "foo"), "bar"); assert_eq!(get_field_string(&result, "bar"), "baz"); assert_eq!(get_field_string(&result, "copy"), "buz"); - - assert_eq!( - result.metadata().schema_definition(), - &test_default_schema_definition() - ); } #[test] @@ -894,17 +837,8 @@ mod tests { let r = result.next().unwrap(); assert_eq!(get_field_string(&r, "message"), "foo"); - assert_eq!( - r.metadata().schema_definition(), - &test_default_schema_definition() - ); let r = result.next().unwrap(); assert_eq!(get_field_string(&r, "message"), "bar"); - - assert_eq!( - r.metadata().schema_definition(), - &test_default_schema_definition() - ); } #[test] @@ -1070,7 +1004,9 @@ mod tests { "zork", MetricKind::Incremental, MetricValue::Counter { value: 1.0 }, - metadata.with_schema_definition(&Arc::new(test_default_schema_definition())), + // The schema definition is set in the topology, which isn't used in this test. Setting the definition + // to the actual value to skip the assertion here + metadata ) .with_namespace(Some("zerk")) .with_tags(Some(metric_tags! { @@ -1280,8 +1216,11 @@ mod tests { "counter", MetricKind::Absolute, MetricValue::Counter { value: 1.0 }, - EventMetadata::default() - .with_schema_definition(&Arc::new(test_default_schema_definition())), + // The schema definition is set in the topology, which isn't used in this test. Setting the definition + // to the actual value to skip the assertion here + EventMetadata::default().with_schema_definition(&Arc::new( + output.metadata().schema_definition().clone() + )), ) .with_tags(Some(metric_tags! { "hello" => "world", @@ -1298,8 +1237,11 @@ mod tests { "counter", MetricKind::Absolute, MetricValue::Counter { value: 1.0 }, - EventMetadata::default() - .with_schema_definition(&Arc::new(test_dropped_schema_definition())), + // The schema definition is set in the topology, which isn't used in this test. Setting the definition + // to the actual value to skip the assertion here + EventMetadata::default().with_schema_definition(&Arc::new( + output.metadata().schema_definition().clone() + )), ) .with_tags(Some(metric_tags! { "hello" => "goodbye", @@ -1319,8 +1261,11 @@ mod tests { "counter", MetricKind::Absolute, MetricValue::Counter { value: 1.0 }, - EventMetadata::default() - .with_schema_definition(&Arc::new(test_dropped_schema_definition())), + // The schema definition is set in the topology, which isn't used in this test. Setting the definition + // to the actual value to skip the assertion here + EventMetadata::default().with_schema_definition(&Arc::new( + output.metadata().schema_definition().clone() + )), ) .with_tags(Some(metric_tags! { "not_hello" => "oops", diff --git a/src/transforms/route.rs b/src/transforms/route.rs index adcac43ff504c..e410277914a8f 100644 --- a/src/transforms/route.rs +++ b/src/transforms/route.rs @@ -42,13 +42,13 @@ impl SyncTransform for Route { for (output_name, condition) in &self.conditions { let (result, event) = condition.check(event.clone()); if result { - output.push_named(output_name, event); + output.push(Some(output_name), event); } else { check_failed += 1; } } if check_failed == self.conditions.len() { - output.push_named(UNMATCHED_ROUTE, event); + output.push(Some(UNMATCHED_ROUTE), event); } } } diff --git a/src/transforms/tag_cardinality_limit/tests.rs b/src/transforms/tag_cardinality_limit/tests.rs index 8488658e8ea55..5753d0176dd3b 100644 --- a/src/transforms/tag_cardinality_limit/tests.rs +++ b/src/transforms/tag_cardinality_limit/tests.rs @@ -1,9 +1,12 @@ use std::sync::Arc; use vector_common::config::ComponentKey; +use vector_core::config::OutputId; use vector_core::metric_tags; use super::*; +use crate::config::schema::Definition; +use crate::config::LogNamespace; use crate::event::metric::TagValue; use crate::event::{metric, Event, Metric, MetricTags}; use crate::test_util::components::assert_transform_compliance; @@ -13,6 +16,7 @@ use crate::transforms::tag_cardinality_limit::config::{ use crate::transforms::test::create_topology; use tokio::sync::mpsc; use tokio_stream::wrappers::ReceiverStream; +use vrl::compiler::prelude::Kind; #[test] fn generate_config() { @@ -88,6 +92,16 @@ async fn drop_event(config: TagCardinalityLimitConfig) { event1.set_source_id(Arc::new(ComponentKey::from("in"))); event2.set_source_id(Arc::new(ComponentKey::from("in"))); + event1.set_upstream_id(Arc::new(OutputId::from("transform"))); + event2.set_upstream_id(Arc::new(OutputId::from("transform"))); + + event1.metadata_mut().set_schema_definition(&Arc::new( + Definition::new_with_default_metadata(Kind::any_object(), [LogNamespace::Legacy]), + )); + event2.metadata_mut().set_schema_definition(&Arc::new( + Definition::new_with_default_metadata(Kind::any_object(), [LogNamespace::Legacy]), + )); + assert_eq!(new_event1, Some(event1)); assert_eq!(new_event2, Some(event2)); // Third value rejected since value_limit is 2. @@ -135,6 +149,20 @@ async fn drop_tag(config: TagCardinalityLimitConfig) { event2.set_source_id(Arc::new(ComponentKey::from("in"))); event3.set_source_id(Arc::new(ComponentKey::from("in"))); + event1.set_upstream_id(Arc::new(OutputId::from("transform"))); + event2.set_upstream_id(Arc::new(OutputId::from("transform"))); + event3.set_upstream_id(Arc::new(OutputId::from("transform"))); + + event1.metadata_mut().set_schema_definition(&Arc::new( + Definition::new_with_default_metadata(Kind::any_object(), [LogNamespace::Legacy]), + )); + event2.metadata_mut().set_schema_definition(&Arc::new( + Definition::new_with_default_metadata(Kind::any_object(), [LogNamespace::Legacy]), + )); + event3.metadata_mut().set_schema_definition(&Arc::new( + Definition::new_with_default_metadata(Kind::any_object(), [LogNamespace::Legacy]), + )); + assert_eq!(new_event1, Some(event1)); assert_eq!(new_event2, Some(event2)); // The third event should have been modified to remove "tag1" @@ -207,6 +235,21 @@ async fn drop_tag_multi_value(config: TagCardinalityLimitConfig) { event2.set_source_id(Arc::new(ComponentKey::from("in"))); event3.set_source_id(Arc::new(ComponentKey::from("in"))); + event1.set_upstream_id(Arc::new(OutputId::from("transform"))); + event2.set_upstream_id(Arc::new(OutputId::from("transform"))); + event3.set_upstream_id(Arc::new(OutputId::from("transform"))); + + // definitions aren't valid for metrics yet, it's just set to the default (anything). + event1.metadata_mut().set_schema_definition(&Arc::new( + Definition::new_with_default_metadata(Kind::any_object(), [LogNamespace::Legacy]), + )); + event2.metadata_mut().set_schema_definition(&Arc::new( + Definition::new_with_default_metadata(Kind::any_object(), [LogNamespace::Legacy]), + )); + event3.metadata_mut().set_schema_definition(&Arc::new( + Definition::new_with_default_metadata(Kind::any_object(), [LogNamespace::Legacy]), + )); + drop(tx); topology.stop().await; @@ -257,6 +300,21 @@ async fn separate_value_limit_per_tag(config: TagCardinalityLimitConfig) { event2.set_source_id(Arc::new(ComponentKey::from("in"))); event3.set_source_id(Arc::new(ComponentKey::from("in"))); + event1.set_upstream_id(Arc::new(OutputId::from("transform"))); + event2.set_upstream_id(Arc::new(OutputId::from("transform"))); + event3.set_upstream_id(Arc::new(OutputId::from("transform"))); + + // definitions aren't valid for metrics yet, it's just set to the default (anything). + event1.metadata_mut().set_schema_definition(&Arc::new( + Definition::new_with_default_metadata(Kind::any_object(), [LogNamespace::Legacy]), + )); + event2.metadata_mut().set_schema_definition(&Arc::new( + Definition::new_with_default_metadata(Kind::any_object(), [LogNamespace::Legacy]), + )); + event3.metadata_mut().set_schema_definition(&Arc::new( + Definition::new_with_default_metadata(Kind::any_object(), [LogNamespace::Legacy]), + )); + assert_eq!(new_event1, Some(event1)); assert_eq!(new_event2, Some(event2)); assert_eq!(new_event3, Some(event3));