Skip to content

Commit

Permalink
Merge pull request #321 from christophd/issue/fix-aws2-ddb-streams-so…
Browse files Browse the repository at this point in the history
…urce-kamelet

CMLK-1907: Fix AWS DDB Streams Source Kamelet
  • Loading branch information
christophd authored May 2, 2024
2 parents 93f7cad + 1686002 commit 0524fdf
Show file tree
Hide file tree
Showing 7 changed files with 204 additions and 12 deletions.
53 changes: 47 additions & 6 deletions aws-ddb-streams-source.kamelet.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ spec:
enum: ["ap-south-1", "eu-south-1", "us-gov-east-1", "me-central-1", "ca-central-1", "eu-central-1", "us-iso-west-1", "us-west-1", "us-west-2", "af-south-1", "eu-north-1", "eu-west-3", "eu-west-2", "eu-west-1", "ap-northeast-3", "ap-northeast-2", "ap-northeast-1", "me-south-1", "sa-east-1", "ap-east-1", "cn-north-1", "us-gov-west-1", "ap-southeast-1", "ap-southeast-2", "us-iso-east-1", "ap-southeast-3", "us-east-1", "us-east-2", "cn-northwest-1", "us-isob-east-1", "aws-global", "aws-cn-global", "aws-us-gov-global", "aws-iso-global", "aws-iso-b-global"]
streamIteratorType:
title: Stream Iterator Type
description: Defines where in the DynamoDB stream to start getting records. There are two enums and the value can be one of FROM_LATEST and FROM_START. Note that using FROM_START can cause a significant delay before the stream has caught up to real-time.
description: Defines where in the DynamoDB stream to start getting records. There are two enums and the value can be one of FROM_LATEST and FROM_START. Note that using FROM_START can cause a significant delay before the stream has caught up to real-time.
type: string
default: FROM_LATEST
useDefaultCredentialsProvider:
Expand All @@ -95,10 +95,52 @@ spec:
description: The number of milliseconds before the next poll from the database.
type: integer
default: 500
types:
dataTypes:
out:
mediaType: application/json
default: json
headers:
CamelAwsDdbStreamEventSource:
title: The DDB Stream Event Source
description: The Amazon Web Services service from which the stream record originated. For DynamoDB Streams, this is aws:dynamodb.
type: string
CamelAwsDdbStreamEventId:
title: The DDB Stream Event Id
description: A globally unique identifier for the event that was recorded in this stream record.
type: string
types:
json:
format: "application-json"
description: Default Json representation of a DDB Stream Event.
mediaType: application/json
cloudevents:
format: "aws2-ddbstream:application-cloudevents"
description: |-
Data type transformer converts AWS Dynamo DB Streams get records response to CloudEvent v1_0 data format. The data
type sets Camel specific CloudEvent headers with values extracted from AWS Dynamo DB Streams get records.
headers:
CamelCloudEventID:
title: CloudEvent ID
description: The Camel exchange id set as event id
type: string
CamelCloudEventType:
title: CloudEvent Type
description: The event type
default: "org.apache.camel.event.aws.ddbstream.getRecords"
type: string
CamelCloudEventSource:
title: CloudEvent Source
description: The event source. By default, the DDB Stream Event source receipt handle with prefix "aws.ddbstream.".
type: string
CamelCloudEventSubject:
title: CloudEvent Subject
description: The event subject. The DDB Stream Event Id.
type: string
CamelCloudEventTime:
title: CloudEvent Time
description: The exchange creation timestamp as event time.
type: string
dependencies:
- mvn:org.apache.camel.kamelets:camel-kamelets-utils:2.3.0
- "camel:gson"
- "camel:aws2-ddb"
- "camel:kamelet"
Expand All @@ -115,7 +157,6 @@ spec:
overrideEndpoint: "{{overrideEndpoint}}"
delay: "{{delay}}"
steps:
- marshal:
json:
library: Gson
- transform:
toType: "aws2-ddb:application-x-struct"
- to: "kamelet:sink"
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ spec:
enum: ["ap-south-1", "eu-south-1", "us-gov-east-1", "me-central-1", "ca-central-1", "eu-central-1", "us-iso-west-1", "us-west-1", "us-west-2", "af-south-1", "eu-north-1", "eu-west-3", "eu-west-2", "eu-west-1", "ap-northeast-3", "ap-northeast-2", "ap-northeast-1", "me-south-1", "sa-east-1", "ap-east-1", "cn-north-1", "us-gov-west-1", "ap-southeast-1", "ap-southeast-2", "us-iso-east-1", "ap-southeast-3", "us-east-1", "us-east-2", "cn-northwest-1", "us-isob-east-1", "aws-global", "aws-cn-global", "aws-us-gov-global", "aws-iso-global", "aws-iso-b-global"]
streamIteratorType:
title: Stream Iterator Type
description: Defines where in the DynamoDB stream to start getting records. There are two enums and the value can be one of FROM_LATEST and FROM_START. Note that using FROM_START can cause a significant delay before the stream has caught up to real-time.
description: Defines where in the DynamoDB stream to start getting records. There are two enums and the value can be one of FROM_LATEST and FROM_START. Note that using FROM_START can cause a significant delay before the stream has caught up to real-time.
type: string
default: FROM_LATEST
useDefaultCredentialsProvider:
Expand All @@ -95,10 +95,52 @@ spec:
description: The number of milliseconds before the next poll from the database.
type: integer
default: 500
types:
dataTypes:
out:
mediaType: application/json
default: json
headers:
CamelAwsDdbStreamEventSource:
title: The DDB Stream Event Source
description: The Amazon Web Services service from which the stream record originated. For DynamoDB Streams, this is aws:dynamodb.
type: string
CamelAwsDdbStreamEventId:
title: The DDB Stream Event Id
description: A globally unique identifier for the event that was recorded in this stream record.
type: string
types:
json:
format: "application-json"
description: Default Json representation of a DDB Stream Event.
mediaType: application/json
cloudevents:
format: "aws2-ddbstream:application-cloudevents"
description: |-
Data type transformer converts AWS Dynamo DB Streams get records response to CloudEvent v1_0 data format. The data
type sets Camel specific CloudEvent headers with values extracted from AWS Dynamo DB Streams get records.
headers:
CamelCloudEventID:
title: CloudEvent ID
description: The Camel exchange id set as event id
type: string
CamelCloudEventType:
title: CloudEvent Type
description: The event type
default: "org.apache.camel.event.aws.ddbstream.getRecords"
type: string
CamelCloudEventSource:
title: CloudEvent Source
description: The event source. By default, the DDB Stream Event source receipt handle with prefix "aws.ddbstream.".
type: string
CamelCloudEventSubject:
title: CloudEvent Subject
description: The event subject. The DDB Stream Event Id.
type: string
CamelCloudEventTime:
title: CloudEvent Time
description: The exchange creation timestamp as event time.
type: string
dependencies:
- mvn:org.apache.camel.kamelets:camel-kamelets-utils:2.3.0
- "camel:gson"
- "camel:aws2-ddb"
- "camel:kamelet"
Expand All @@ -115,7 +157,6 @@ spec:
overrideEndpoint: "{{overrideEndpoint}}"
delay: "{{delay}}"
steps:
- marshal:
json:
library: Gson
- transform:
toType: "aws2-ddb:application-x-struct"
- to: "kamelet:sink"
7 changes: 7 additions & 0 deletions library/camel-kamelets-utils/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,13 @@
<scope>provided</scope>
</dependency>

<!-- Dependencies for Gson serialization type adapter -->
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-gson</artifactId>
<scope>provided</scope>
</dependency>

<!-- Test scoped dependencies -->
<dependency>
<groupId>org.junit.jupiter</groupId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.camel.kamelets.utils.serialization.gson;

import java.lang.reflect.Type;
import java.time.Instant;

import com.google.gson.JsonDeserializationContext;
import com.google.gson.JsonDeserializer;
import com.google.gson.JsonElement;
import com.google.gson.JsonParseException;
import com.google.gson.JsonPrimitive;
import com.google.gson.JsonSerializationContext;
import com.google.gson.JsonSerializer;

public class JavaTimeInstantTypeAdapter implements JsonSerializer<Instant>, JsonDeserializer<Instant> {

@Override
public JsonElement serialize(final Instant time, final Type typeOfSrc,
final JsonSerializationContext context) {
return new JsonPrimitive(time.getEpochSecond() * 1000);
}

@Override
public Instant deserialize(final JsonElement json, final Type typeOfT,
final JsonDeserializationContext context) throws JsonParseException {
return Instant.ofEpochMilli(json.getAsLong());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.camel.kamelets.utils.transform.aws2.ddb;

import java.time.Instant;

import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import org.apache.camel.Message;
import org.apache.camel.kamelets.utils.serialization.gson.JavaTimeInstantTypeAdapter;
import org.apache.camel.spi.DataType;
import org.apache.camel.spi.DataTypeTransformer;
import org.apache.camel.spi.Transformer;

@DataTypeTransformer(name = "aws2-ddb:application-x-struct")
public class Ddb2JsonStructDataTypeTransformer extends Transformer {

private final Gson gson = new GsonBuilder()
.registerTypeAdapter(Instant.class, new JavaTimeInstantTypeAdapter())
.create();

@Override
public void transform(Message message, DataType fromType, DataType toType) {
if (message.getBody() instanceof String) {
return;
}

message.setBody(gson.toJson(message.getBody()));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
class=org.apache.camel.kamelets.utils.transform.aws2.ddb.Ddb2JsonStructDataTypeTransformer
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
{
"transformer": {
"kind": "transformer",
"name": "aws2-ddb:application-x-struct",
"title": "Aws2 Ddb (Application Json Struct)",
"description": "Transforms DynamoDB record into a Json node",
"deprecated": false,
"javaType": "org.apache.camel.kamelets.utils.transform.aws2.ddb.Ddb2JsonStructDataTypeTransformer",
"groupId": "org.apache.camel",
"artifactId": "camel-aws2-ddb",
"version": "4.6.0-SNAPSHOT"
}
}

0 comments on commit 0524fdf

Please sign in to comment.