Skip to content

Commit

Permalink
Merge pull request #1346 from Zocdoc/issue_1345
Browse files Browse the repository at this point in the history
Adds metadata to KinesisFirehose response record to support dynamic partitioning
  • Loading branch information
normj authored Jan 23, 2023
2 parents afd59e5 + f683d4b commit 84c9c40
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,16 @@ public class FirehoseRecord
#endif
public string Base64EncodedData { get; set; }

/// <summary>
/// The response record metadata.
/// </summary>
[DataMember(Name = "metadata")]
#if NETCOREAPP_3_1
[System.Text.Json.Serialization.JsonPropertyName("metadata")]
#endif
public FirehoseResponseRecordMetadata Metadata { get; set; }


/// <summary>
/// Base64 encodes the data and sets the Base64EncodedData property.
/// </summary>
Expand All @@ -101,5 +111,22 @@ public void EncodeData(string data)
this.Base64EncodedData = Convert.ToBase64String(Encoding.UTF8.GetBytes(data));
}
}

/// <summary>
/// The response record metadata after processing KinesisFirehoseEvent.Records
/// </summary>
[DataContract]
public class FirehoseResponseRecordMetadata
{
/// <summary>
/// Key Value pairs used for Dynamic Partitioning
/// https://docs.aws.amazon.com/firehose/latest/dev/dynamic-partitioning.html
/// </summary>
[DataMember(Name = "partitionKeys")]
#if NETCOREAPP_3_1
[System.Text.Json.Serialization.JsonPropertyName("partitionKeys")]
#endif
public Dictionary<string, string> PartitionKeys { get; set; }
}
}
}
3 changes: 2 additions & 1 deletion Libraries/test/EventsTests.Shared/EventTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1559,7 +1559,8 @@ public void KinesisFirehoseResponseTest(Type serializerType)
Assert.Equal("49572672223665514422805246926656954630972486059535892482", kinesisResponse.Records[0].RecordId);
Assert.Equal(KinesisFirehoseResponse.TRANSFORMED_STATE_OK, kinesisResponse.Records[0].Result);
Assert.Equal("SEVMTE8gV09STEQ=", kinesisResponse.Records[0].Base64EncodedData);

Assert.Equal("iamValue1", kinesisResponse.Records[0].Metadata.PartitionKeys["iamKey1"]);
Assert.Equal("iamValue2", kinesisResponse.Records[0].Metadata.PartitionKeys["iamKey2"]);


MemoryStream ms = new MemoryStream();
Expand Down
10 changes: 8 additions & 2 deletions Libraries/test/EventsTests.Shared/kinesis-firehose-response.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,13 @@
{
"recordId": "49572672223665514422805246926656954630972486059535892482",
"result": "Ok",
"data": "SEVMTE8gV09STEQ="
"data": "SEVMTE8gV09STEQ=",
"metadata": {
"partitionKeys": {
"iamKey1": "iamValue1",
"iamKey2": "iamValue2"
}
}
}
]
}
}

0 comments on commit 84c9c40

Please sign in to comment.