diff --git a/dqops/src/main/java/com/dqops/data/normalization/CommonTableNormalizationService.java b/dqops/src/main/java/com/dqops/data/normalization/CommonTableNormalizationService.java index e6ab7b6412..560ba96d42 100644 --- a/dqops/src/main/java/com/dqops/data/normalization/CommonTableNormalizationService.java +++ b/dqops/src/main/java/com/dqops/data/normalization/CommonTableNormalizationService.java @@ -106,12 +106,12 @@ StringColumn createTimeSeriesUuidColumn(LongColumn sortedDataGroupingHashColumn, * @param rowCount Row count. * @return ID column, filled with values. */ - StringColumn createStatisticsRowIdColumn(LongColumn sortedDataGroupingHashColumn, - DateTimeColumn sortedTimePeriodColumn, - long checkHash, - long tableHash, - long columnHash, - int rowCount); + StringColumn createSensorReadoutRowIdColumn(LongColumn sortedDataGroupingHashColumn, + DateTimeColumn sortedTimePeriodColumn, + long checkHash, + long tableHash, + long columnHash, + int rowCount); /** * Creates and fills the "id" column by combining hashes for the error table. diff --git a/dqops/src/main/java/com/dqops/data/normalization/CommonTableNormalizationServiceImpl.java b/dqops/src/main/java/com/dqops/data/normalization/CommonTableNormalizationServiceImpl.java index dae500c38c..444d24a602 100644 --- a/dqops/src/main/java/com/dqops/data/normalization/CommonTableNormalizationServiceImpl.java +++ b/dqops/src/main/java/com/dqops/data/normalization/CommonTableNormalizationServiceImpl.java @@ -255,12 +255,12 @@ public StringColumn createTimeSeriesUuidColumn(LongColumn sortedDataGroupingHash * @return ID column, filled with values. */ @Override - public StringColumn createStatisticsRowIdColumn(LongColumn sortedDataGroupingHashColumn, - DateTimeColumn sortedTimePeriodColumn, - long checkHash, - long tableHash, - long columnHash, - int rowCount) { + public StringColumn createSensorReadoutRowIdColumn(LongColumn sortedDataGroupingHashColumn, + DateTimeColumn sortedTimePeriodColumn, + long checkHash, + long tableHash, + long columnHash, + int rowCount) { StringColumn idColumn = StringColumn.create(CommonColumnNames.ID_COLUMN_NAME, rowCount); for (int i = 0; i < rowCount ; i++) { diff --git a/dqops/src/main/java/com/dqops/data/readouts/normalization/SensorReadoutsNormalizationServiceImpl.java b/dqops/src/main/java/com/dqops/data/readouts/normalization/SensorReadoutsNormalizationServiceImpl.java index 129058eac0..9a96f77245 100644 --- a/dqops/src/main/java/com/dqops/data/readouts/normalization/SensorReadoutsNormalizationServiceImpl.java +++ b/dqops/src/main/java/com/dqops/data/readouts/normalization/SensorReadoutsNormalizationServiceImpl.java @@ -285,7 +285,7 @@ public SensorReadoutsNormalizedResult normalizeResults(SensorExecutionResult sen sortedNormalizedTable.addColumns(updatedByColumn); DateTimeColumn sortedTimePeriodColumn = (DateTimeColumn) sortedNormalizedTable.column(SensorReadoutsColumnNames.TIME_PERIOD_COLUMN_NAME); - StringColumn idColumn = this.commonNormalizationService.createStatisticsRowIdColumn(sortedDataStreamHashColumn, sortedTimePeriodColumn, checkHash, tableHash, + StringColumn idColumn = this.commonNormalizationService.createSensorReadoutRowIdColumn(sortedDataStreamHashColumn, sortedTimePeriodColumn, checkHash, tableHash, columnHash != null ? columnHash.longValue() : 0L, resultsRowCount); sortedNormalizedTable.insertColumn(0, idColumn); diff --git a/dqops/src/main/java/com/dqops/data/storage/ParquetPartitionStorageServiceImpl.java b/dqops/src/main/java/com/dqops/data/storage/ParquetPartitionStorageServiceImpl.java index 3cbfedd8b2..e01ac7020d 100644 --- a/dqops/src/main/java/com/dqops/data/storage/ParquetPartitionStorageServiceImpl.java +++ b/dqops/src/main/java/com/dqops/data/storage/ParquetPartitionStorageServiceImpl.java @@ -36,9 +36,11 @@ import com.dqops.metadata.storage.localfiles.userhome.LocalUserHomeFileStorageService; import com.dqops.utils.datetime.LocalDateTimeTruncateUtility; import com.dqops.utils.exceptions.DqoRuntimeException; +import com.dqops.utils.tables.TableColumnUtility; import com.dqops.utils.tables.TableCompressUtility; import com.dqops.utils.tables.TableCopyUtility; import com.dqops.utils.tables.TableMergeUtility; +import lombok.extern.slf4j.Slf4j; import net.tlabs.tablesaw.parquet.TablesawParquetReadOptions; import net.tlabs.tablesaw.parquet.TablesawParquetReader; import org.apache.commons.lang3.ArrayUtils; @@ -64,6 +66,7 @@ * Service that supports reading and writing parquet file partitions from a local file system. */ @Service +@Slf4j public class ParquetPartitionStorageServiceImpl implements ParquetPartitionStorageService { /** * The number of retries to write a parquet file when another thread was trying to write the same file. @@ -489,6 +492,29 @@ else if (datePartitioningColumnOriginal instanceof DateColumn) { } } + // fix some invalid records + Column idColumn = dataToSave.column(storageSettings.getIdStringColumnName()); + Selection idValueMissing = idColumn.isMissing(); + if (!idValueMissing.isEmpty()) { + dataToSave = dataToSave.dropWhere(idValueMissing); + + log.warn("Missing ID column values found when saving a partition, ID column name: " + + storageSettings.getIdStringColumnName() + ". Table: " + storageSettings.getTableType() + ", partition: " + loadedPartition.getPartitionId()); + } + + // fix some invalid records + LongColumn connectionHashColumn = (LongColumn) TableColumnUtility.findColumn(dataToSave, CommonColumnNames.CONNECTION_HASH_COLUMN_NAME); + if (connectionHashColumn != null) { + Selection connectionHashMissing = connectionHashColumn.isMissing(); + if (!connectionHashMissing.isEmpty()) { + dataToSave = dataToSave.dropWhere(connectionHashMissing); + + log.warn("Missing connection_hash column values found when saving a partition. Table: " + + storageSettings.getTableType() + ", partition: " + loadedPartition.getPartitionId()); + } + } + + Configuration hadoopConfiguration = this.hadoopConfigurationProvider.getHadoopConfiguration(); byte[] parquetFileContent = new DqoTablesawParquetWriter(hadoopConfiguration).writeToByteArray(dataToSave); diff --git a/dqops/src/main/java/com/dqops/execution/checks/TableCheckExecutionServiceImpl.java b/dqops/src/main/java/com/dqops/execution/checks/TableCheckExecutionServiceImpl.java index d5c60b8210..8f0517a510 100644 --- a/dqops/src/main/java/com/dqops/execution/checks/TableCheckExecutionServiceImpl.java +++ b/dqops/src/main/java/com/dqops/execution/checks/TableCheckExecutionServiceImpl.java @@ -227,8 +227,8 @@ public CheckExecutionSummary executeChecksOnTable(ExecutionContext executionCont SensorReadoutsSnapshot outputSensorReadoutsSnapshot = this.sensorReadoutsSnapshotFactory.createSnapshot(connectionName, physicalTableName, userDomainIdentity); outputSensorReadoutsSnapshot.ensureMonthsAreLoaded(LocalDate.now(), LocalDate.now()); // just load the current month, so the historic cache will not make another call - SensorReadoutsSnapshot historicReadoutsSnapshot = this.sensorReadoutsSnapshotFactory.createReadOnlySnapshot(connectionName, physicalTableName, - SensorReadoutsColumnNames.SENSOR_READOUT_COLUMN_NAMES_HISTORIC_DATA, userDomainIdentity); +// SensorReadoutsSnapshot historicReadoutsSnapshot = this.sensorReadoutsSnapshotFactory.createReadOnlySnapshot(connectionName, physicalTableName, +// SensorReadoutsColumnNames.SENSOR_READOUT_COLUMN_NAMES_HISTORIC_DATA, userDomainIdentity); Table allNormalizedSensorResultsTable = outputSensorReadoutsSnapshot.getTableDataChanges().getNewOrChangedRows(); IntColumn severityColumnTemporary = IntColumn.create(CheckResultsColumnNames.SEVERITY_COLUMN_NAME); @@ -248,13 +248,13 @@ public CheckExecutionSummary executeChecksOnTable(ExecutionContext executionCont List> singleTableChecks = checks.stream().filter(c -> !c.isTableComparisonCheck()) .collect(Collectors.toList()); executeSingleTableChecks(executionContext, userHome, userTimeWindowFilters, progressListener, dummySensorExecution, executionTarget, jobCancellationToken, - checkExecutionSummary, singleTableChecks, tableSpec, historicReadoutsSnapshot, allNormalizedSensorResultsTable, checkResultsSnapshot, + checkExecutionSummary, singleTableChecks, tableSpec, outputSensorReadoutsSnapshot, allNormalizedSensorResultsTable, checkResultsSnapshot, allRuleEvaluationResultsTable, allErrorsTable, executionStatistics, checksForErrorSampling, collectErrorSamples); List> tableComparisonChecks = checks.stream().filter(c -> c.isTableComparisonCheck()) .collect(Collectors.toList()); executeTableComparisonChecks(executionContext, userHome, userTimeWindowFilters, progressListener, dummySensorExecution, executionTarget, jobCancellationToken, - checkExecutionSummary, tableComparisonChecks, tableSpec, historicReadoutsSnapshot, allNormalizedSensorResultsTable, checkResultsSnapshot, + checkExecutionSummary, tableComparisonChecks, tableSpec, outputSensorReadoutsSnapshot, allNormalizedSensorResultsTable, checkResultsSnapshot, allRuleEvaluationResultsTable, allErrorsTable, executionStatistics); if (outputSensorReadoutsSnapshot.getTableDataChanges().hasChanges() && !dummySensorExecution) { diff --git a/dqops/src/main/java/com/dqops/metadata/id/HierarchyId.java b/dqops/src/main/java/com/dqops/metadata/id/HierarchyId.java index a34d31d513..af1eedbea9 100644 --- a/dqops/src/main/java/com/dqops/metadata/id/HierarchyId.java +++ b/dqops/src/main/java/com/dqops/metadata/id/HierarchyId.java @@ -188,7 +188,11 @@ public long hashCode64() { List elementHashes = Arrays.stream(this.elements) .map(element -> Hashing.farmHashFingerprint64().hashString(element.toString(), StandardCharsets.UTF_8)) .collect(Collectors.toList()); - return Math.abs(Hashing.combineOrdered(elementHashes).asLong()); // we return only positive hashes which limits the hash space to 2^63, but positive hashes are easier for users + long hash = Math.abs(Hashing.combineOrdered(elementHashes).asLong()); + if (hash == Long.MAX_VALUE) { + hash = Long.MAX_VALUE - 1L; + } + return hash; // we return only positive hashes which limits the hash space to 2^63, but positive hashes are easier for users } /**