Skip to content

Commit

Permalink
Merge branch 'main' into enable_black_again
Browse files Browse the repository at this point in the history
  • Loading branch information
johevemi authored Sep 13, 2023
2 parents c1c005c + 3a7e994 commit be9fa8e
Show file tree
Hide file tree
Showing 13 changed files with 383 additions and 88 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,15 @@
using Energinet.DataHub.Core.Databricks.SqlStatementExecution;
using Energinet.DataHub.Core.Databricks.SqlStatementExecution.Internal.Models;
using Energinet.DataHub.Wholesale.Batches.Interfaces;
using Energinet.DataHub.Wholesale.Batches.Interfaces.Models;
using Energinet.DataHub.Wholesale.CalculationResults.Infrastructure.SqlStatements;
using Energinet.DataHub.Wholesale.CalculationResults.Infrastructure.SqlStatements.DeltaTableConstants;
using Energinet.DataHub.Wholesale.CalculationResults.Infrastructure.SqlStatements.Mappers;
using Energinet.DataHub.Wholesale.CalculationResults.Interfaces.CalculationResults;
using Energinet.DataHub.Wholesale.CalculationResults.Interfaces.CalculationResults.Model;
using Energinet.DataHub.Wholesale.Common.Databricks.Options;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using NodaTime;
using NodaTime.Extensions;

namespace Energinet.DataHub.Wholesale.CalculationResults.Infrastructure.CalculationResults;
Expand All @@ -46,6 +47,20 @@ public async IAsyncEnumerable<CalculationResult> GetAsync(Guid batchId)
{
var batch = await _batchesClient.GetAsync(batchId).ConfigureAwait(false);
var sql = CreateBatchResultsSql(batchId);
await foreach (var p in GetInternalAsync(sql, batch.PeriodStart.ToInstant(), batch.PeriodEnd.ToInstant()))
yield return p;
_logger.LogDebug("Fetched all calculation results for batch {BatchId}", batchId);
}

public async IAsyncEnumerable<CalculationResult> GetAsync(CalculationResultQuery query)
{
var sql = CreateRequestSql(query);
await foreach (var p in GetInternalAsync(sql, query.StartOfPeriod, query.EndOfPeriod))
yield return p;
}

private async IAsyncEnumerable<CalculationResult> GetInternalAsync(string sql, Instant periodStart, Instant periodEnd)
{
var timeSeriesPoints = new List<TimeSeriesPoint>();
SqlResultRow? currentRow = null;
var resultCount = 0;
Expand All @@ -56,7 +71,7 @@ public async IAsyncEnumerable<CalculationResult> GetAsync(Guid batchId)

if (currentRow != null && BelongsToDifferentResults(currentRow, nextRow))
{
yield return CreateCalculationResult(batch, currentRow, timeSeriesPoints);
yield return CreateCalculationResult(currentRow, timeSeriesPoints, periodStart, periodEnd);
resultCount++;
timeSeriesPoints = new List<TimeSeriesPoint>();
}
Expand All @@ -67,11 +82,25 @@ public async IAsyncEnumerable<CalculationResult> GetAsync(Guid batchId)

if (currentRow != null)
{
yield return CreateCalculationResult(batch, currentRow, timeSeriesPoints);
yield return CreateCalculationResult(currentRow, timeSeriesPoints, periodStart, periodEnd);
resultCount++;
}

_logger.LogDebug("Fetched all {ResultCount} results for batch {BatchId}", resultCount, batchId);
_logger.LogDebug("Fetched {ResultCount} calculation results", resultCount);
}

private string CreateRequestSql(CalculationResultQuery query)
{
return $@"
SELECT {string.Join(", ", SqlColumnNames)}
FROM {_deltaTableOptions.SCHEMA_NAME}.{_deltaTableOptions.ENERGY_RESULTS_TABLE_NAME}
WHERE {EnergyResultColumnNames.TimeSeriesType} = '{query.TimeSeriesType.ToLower()}'
AND {EnergyResultColumnNames.AggregationLevel} = '{MapAggregationLevel(query.AggregationLevel)}'
AND {EnergyResultColumnNames.GridArea} = '{query.GridArea}'
AND {EnergyResultColumnNames.Time} >= '{query.StartOfPeriod.ToString()}'
AND {EnergyResultColumnNames.Time} <= '{query.EndOfPeriod.ToString()}'
ORDER BY {EnergyResultColumnNames.CalculationResultId}, {EnergyResultColumnNames.Time}
";
}

private string CreateBatchResultsSql(Guid batchId)
Expand All @@ -84,6 +113,13 @@ private string CreateBatchResultsSql(Guid batchId)
";
}

private string MapAggregationLevel(AggregationLevel queryAggregationLevel)
{
if (queryAggregationLevel == AggregationLevel.GridArea)
return DeltaTableAggregationLevel.GridArea;
throw new InvalidOperationException($"Unsupported aggregation level: {queryAggregationLevel}");
}

public static string[] SqlColumnNames { get; } =
{
EnergyResultColumnNames.BatchId,
Expand All @@ -96,6 +132,7 @@ private string CreateBatchResultsSql(Guid batchId)
EnergyResultColumnNames.Quantity,
EnergyResultColumnNames.QuantityQuality,
EnergyResultColumnNames.CalculationResultId,
EnergyResultColumnNames.BatchProcessType,
};

public static bool BelongsToDifferentResults(SqlResultRow row, SqlResultRow otherRow)
Expand All @@ -112,27 +149,31 @@ private static TimeSeriesPoint CreateTimeSeriesPoint(SqlResultRow row)
}

private static CalculationResult CreateCalculationResult(
BatchDto batch,
SqlResultRow sqlResultRow,
List<TimeSeriesPoint> timeSeriesPoints)
List<TimeSeriesPoint> timeSeriesPoints,
Instant periodStart,
Instant periodEnd)
{
var id = SqlResultValueConverters.ToGuid(sqlResultRow[EnergyResultColumnNames.CalculationResultId]);
var timeSeriesType = SqlResultValueConverters.ToTimeSeriesType(sqlResultRow[EnergyResultColumnNames.TimeSeriesType]);
var energySupplierId = sqlResultRow[EnergyResultColumnNames.EnergySupplierId];
var balanceResponsibleId = sqlResultRow[EnergyResultColumnNames.BalanceResponsibleId];
var gridArea = sqlResultRow[EnergyResultColumnNames.GridArea];
var fromGridArea = sqlResultRow[EnergyResultColumnNames.FromGridArea];
var batchId = sqlResultRow[EnergyResultColumnNames.BatchId];
var processType = sqlResultRow[EnergyResultColumnNames.BatchProcessType];

return new CalculationResult(
id,
batch.BatchId,
Guid.Parse(batchId),
gridArea,
timeSeriesType,
energySupplierId,
balanceResponsibleId,
timeSeriesPoints.ToArray(),
batch.ProcessType,
batch.PeriodStart.ToInstant(),
batch.PeriodEnd.ToInstant(),
ProcessTypeMapper.FromDeltaTableValue(processType),
periodStart,
periodEnd,
fromGridArea);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,14 @@
using Energinet.DataHub.Wholesale.CalculationResults.Infrastructure.CalculationResults;
using Energinet.DataHub.Wholesale.CalculationResults.Infrastructure.SqlStatements.DeltaTableConstants;
using Energinet.DataHub.Wholesale.CalculationResults.IntegrationTests.Fixtures;
using Energinet.DataHub.Wholesale.CalculationResults.Interfaces.CalculationResults.Model;
using Energinet.DataHub.Wholesale.Common.Databricks.Options;
using FluentAssertions;
using FluentAssertions.Execution;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Moq;
using NodaTime;
using Xunit;

namespace Energinet.DataHub.Wholesale.CalculationResults.IntegrationTests.Infrastructure.CalculationResults;
Expand Down Expand Up @@ -76,6 +78,78 @@ public async Task GetAsync_ReturnsExpectedCalculationResult(
.Equal(FirstQuantity, SecondQuantity, ThirdQuantity, FourthQuantity, FifthQuantity, SixthQuantity);
}

[Theory]
[InlineAutoMoqData]
public async Task GetAsync_RequestFromGridOperatorTotalProduction_ReturnsResult(
Mock<ILogger<DatabricksSqlStatusResponseParser>> loggerMock,
Mock<IBatchesClient> batchesClientMock,
Mock<ILogger<CalculationResultQueries>> calculationResultQueriesLoggerMock)
{
// Arrange
var gridAreaFilter = "101";
var timeSeriesTypeFilter = TimeSeriesType.Production;
var startOfPeriodFilter = Instant.FromUtc(2022, 1, 1, 0, 0);
var endOfPeriodFilter = Instant.FromUtc(2022, 1, 2, 0, 0);
var deltaTableOptions = _fixture.DatabricksSchemaManager.DeltaTableOptions;
await AddCreatedRowsInArbitraryOrderAsync(deltaTableOptions);
var sqlStatementClient = _fixture.CreateSqlStatementClient(loggerMock, new Mock<ILogger<SqlStatementClient>>());
var request = CreateRequest(
gridArea: gridAreaFilter,
timeSeriesType: timeSeriesTypeFilter.ToString(),
startOfPeriod: startOfPeriodFilter,
endOfPeriod: endOfPeriodFilter);
var sut = new CalculationResultQueries(sqlStatementClient, batchesClientMock.Object, deltaTableOptions, calculationResultQueriesLoggerMock.Object);

// Act
var actual = await sut.GetAsync(request).ToListAsync();

// Assert
actual.Should().NotBeEmpty();
actual.Should().OnlyContain(
result => result.GridArea.Equals(gridAreaFilter)
&& result.PeriodStart == startOfPeriodFilter
&& result.PeriodEnd == endOfPeriodFilter
&& result.TimeSeriesType.Equals(timeSeriesTypeFilter));
}

[Theory]
[InlineAutoMoqData]
public async Task GetAsync_RequestFromGridOperatorTotalProductionInWrongPeriod_ReturnsNoResults(
Mock<ILogger<DatabricksSqlStatusResponseParser>> loggerMock,
Mock<IBatchesClient> batchesClientMock,
Mock<ILogger<CalculationResultQueries>> calculationResultQueriesLoggerMock)
{
// Arrange
var deltaTableOptions = _fixture.DatabricksSchemaManager.DeltaTableOptions;
await AddCreatedRowsInArbitraryOrderAsync(deltaTableOptions);
var sqlStatementClient = _fixture.CreateSqlStatementClient(loggerMock, new Mock<ILogger<SqlStatementClient>>());

var request = CreateRequest(
startOfPeriod: Instant.FromUtc(2020, 1, 1, 1, 1),
endOfPeriod: Instant.FromUtc(2021, 1, 2, 1, 1));
var sut = new CalculationResultQueries(sqlStatementClient, batchesClientMock.Object, deltaTableOptions, calculationResultQueriesLoggerMock.Object);

// Act
var actual = await sut.GetAsync(request).ToListAsync();

// Assert
actual.Should().BeEmpty();
}

private CalculationResultQuery CreateRequest(
string? timeSeriesType = null,
Instant? startOfPeriod = null,
Instant? endOfPeriod = null,
string gridArea = "101")
{
return new CalculationResultQuery(
TimeSeriesType: timeSeriesType ?? nameof(TimeSeriesType.Production),
StartOfPeriod: startOfPeriod ?? Instant.FromUtc(2022, 1, 1, 0, 0),
EndOfPeriod: endOfPeriod ?? Instant.FromUtc(2022, 1, 2, 0, 0),
GridArea: gridArea,
AggregationLevel: AggregationLevel.GridArea);
}

private async Task AddCreatedRowsInArbitraryOrderAsync(IOptions<DeltaTableOptions> options)
{
const string firstCalculationResultId = "b55b6f74-386f-49eb-8b56-63fae62e4fc7";
Expand All @@ -98,6 +172,7 @@ private async Task AddCreatedRowsInArbitraryOrderAsync(IOptions<DeltaTableOption

// mix up the order of the rows
var rows = new List<IEnumerable<string>> { row3, row5, row1, row2, row6, row4, };

await _fixture.DatabricksSchemaManager.InsertAsync<EnergyResultColumnNames>(options.Value.ENERGY_RESULTS_TABLE_NAME, rows);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,10 @@ public interface ICalculationResultQueries
/// Get all results for a given batch.
/// </summary>
IAsyncEnumerable<CalculationResult> GetAsync(Guid batchId);

/// <summary>
/// Gets all result for a given request.
/// </summary>
/// <param name="query"></param>
IAsyncEnumerable<CalculationResult> GetAsync(CalculationResultQuery query);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
// Copyright 2020 Energinet DataHub A/S
//
// Licensed under the Apache License, Version 2.0 (the "License2");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

namespace Energinet.DataHub.Wholesale.CalculationResults.Interfaces.CalculationResults.Model;

public enum AggregationLevel
{
GridArea,
EnergySupplierAndGridArea,
BalanceResponsibleAndGridArea,
EnergySupplierAndBalanceResponsibleAndGridArea,
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
// Copyright 2020 Energinet DataHub A/S
//
// Licensed under the Apache License, Version 2.0 (the "License2");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

using NodaTime;

namespace Energinet.DataHub.Wholesale.CalculationResults.Interfaces.CalculationResults.Model;

public record CalculationResultQuery(
string TimeSeriesType,
Instant StartOfPeriod,
Instant EndOfPeriod,
string GridArea,
AggregationLevel AggregationLevel);
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ public CalculationResultQueriesTests()
_row0BatchId = "b78787d5-b544-44ac-87c2-7720aab86ed1";
_calculationResultId0 = "9913f3bb-1208-400b-9cbe-50300e386d26";
const string calculationResultId1 = "8c2bb7c6-d8e5-462c-9bce-8537f93ef8e7";
var row0 = new[] { _row0BatchId, "100", "200", "non_profiled_consumption", string.Empty, string.Empty, "2022-05-16T22:00:00.000Z", "1.111", "measured", _calculationResultId0 };
var row1 = new[] { "b78787d5-b544-44ac-87c2-7720aab86ed2", "200", "100", "non_profiled_consumption", string.Empty, string.Empty, "2022-05-16T22:00:00.000Z", "2.222", "measured", calculationResultId1 };
var row0 = new[] { _row0BatchId, "100", "200", "non_profiled_consumption", string.Empty, string.Empty, "2022-05-16T22:00:00.000Z", "1.111", "measured", _calculationResultId0, DeltaTableProcessType.Aggregation };
var row1 = new[] { "b78787d5-b544-44ac-87c2-7720aab86ed2", "200", "100", "non_profiled_consumption", string.Empty, string.Empty, "2022-05-16T22:00:00.000Z", "2.222", "measured", calculationResultId1, DeltaTableProcessType.BalanceFixing };
var rows = new List<string[]> { row0, row1, };

// Using the columns from the CalculationResultQueries class to ensure that the test is not broken if the columns are changed
Expand All @@ -51,11 +51,17 @@ public CalculationResultQueriesTests()
[Theory]
[InlineAutoMoqData]
public async Task GetAsync_WhenNoRows_ReturnsNoResults(
Guid batchId,
BatchDto batch,
[Frozen] Mock<IBatchesClient> batchesClientMock,
[Frozen] Mock<ISqlStatementClient> sqlStatementClientMock,
CalculationResultQueries sut)
{
// Arrange
var batchId = Guid.Parse(_row0BatchId);
batch = batch with { BatchId = batchId };
batchesClientMock
.Setup(client => client.GetAsync(batchId))
.ReturnsAsync(batch);
sqlStatementClientMock
.Setup(x => x.ExecuteAsync(It.IsAny<string>()))
.Returns(GetRowsAsync(0));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference>
<PackageReference Include="System.Linq.Async" Version="6.0.1" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\..\Batches\Batches.Interfaces\Batches.Interfaces.csproj" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,4 @@ namespace Energinet.DataHub.Wholesale.Events.Application.InboxEvents;
public record AggregatedTimeSeriesRequest(
Period Period,
TimeSeriesType TimeSeriesType,
AggregationPerGridArea? AggregationPerGridArea,
AggregationPerEnergySupplierPerGridArea? AggregationPerEnergySupplierPerGridArea,
AggregationPerBalanceResponsiblePartyPerGridArea? AggregationPerBalanceResponsiblePartyPerGridArea,
AggregationPerEnergySupplierPerBalanceResponsiblePartyPerGridArea?
AggregationPerEnergySupplierPerBalanceResponsiblePartyPerGridArea);
AggregationPerGridArea? AggregationPerGridArea);
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

using Azure.Messaging.ServiceBus;
using Energinet.DataHub.Wholesale.CalculationResults.Interfaces.CalculationResults.Model;

namespace Energinet.DataHub.Wholesale.Events.Application.InboxEvents;

Expand All @@ -21,8 +22,8 @@ public interface IAggregatedTimeSeriesMessageFactory
/// <summary>
/// Creates a service bus message based on aggregated time series
/// </summary>
/// <param name="aggregatedTimeSeries"></param>
/// <param name="calculationResults"></param>
/// <param name="referenceId"></param>
/// <param name="isRejected">Temporary switch for generating accepted or rejected message</param>
public ServiceBusMessage Create(List<object> aggregatedTimeSeries, string referenceId, bool isRejected);
public ServiceBusMessage Create(IList<CalculationResult> calculationResults, string referenceId, bool isRejected);
}
Loading

0 comments on commit be9fa8e

Please sign in to comment.