Skip to content

Commit

Permalink
Fixing the data model in case of some data issues.
Browse files Browse the repository at this point in the history
  • Loading branch information
piotrczarnas committed Oct 20, 2024
1 parent 53413f8 commit 2b2a9ad
Show file tree
Hide file tree
Showing 6 changed files with 48 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -106,12 +106,12 @@ StringColumn createTimeSeriesUuidColumn(LongColumn sortedDataGroupingHashColumn,
* @param rowCount Row count.
* @return ID column, filled with values.
*/
StringColumn createStatisticsRowIdColumn(LongColumn sortedDataGroupingHashColumn,
DateTimeColumn sortedTimePeriodColumn,
long checkHash,
long tableHash,
long columnHash,
int rowCount);
StringColumn createSensorReadoutRowIdColumn(LongColumn sortedDataGroupingHashColumn,
DateTimeColumn sortedTimePeriodColumn,
long checkHash,
long tableHash,
long columnHash,
int rowCount);

/**
* Creates and fills the "id" column by combining hashes for the error table.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -255,12 +255,12 @@ public StringColumn createTimeSeriesUuidColumn(LongColumn sortedDataGroupingHash
* @return ID column, filled with values.
*/
@Override
public StringColumn createStatisticsRowIdColumn(LongColumn sortedDataGroupingHashColumn,
DateTimeColumn sortedTimePeriodColumn,
long checkHash,
long tableHash,
long columnHash,
int rowCount) {
public StringColumn createSensorReadoutRowIdColumn(LongColumn sortedDataGroupingHashColumn,
DateTimeColumn sortedTimePeriodColumn,
long checkHash,
long tableHash,
long columnHash,
int rowCount) {
StringColumn idColumn = StringColumn.create(CommonColumnNames.ID_COLUMN_NAME, rowCount);

for (int i = 0; i < rowCount ; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ public SensorReadoutsNormalizedResult normalizeResults(SensorExecutionResult sen
sortedNormalizedTable.addColumns(updatedByColumn);

DateTimeColumn sortedTimePeriodColumn = (DateTimeColumn) sortedNormalizedTable.column(SensorReadoutsColumnNames.TIME_PERIOD_COLUMN_NAME);
StringColumn idColumn = this.commonNormalizationService.createStatisticsRowIdColumn(sortedDataStreamHashColumn, sortedTimePeriodColumn, checkHash, tableHash,
StringColumn idColumn = this.commonNormalizationService.createSensorReadoutRowIdColumn(sortedDataStreamHashColumn, sortedTimePeriodColumn, checkHash, tableHash,
columnHash != null ? columnHash.longValue() : 0L, resultsRowCount);
sortedNormalizedTable.insertColumn(0, idColumn);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,11 @@
import com.dqops.metadata.storage.localfiles.userhome.LocalUserHomeFileStorageService;
import com.dqops.utils.datetime.LocalDateTimeTruncateUtility;
import com.dqops.utils.exceptions.DqoRuntimeException;
import com.dqops.utils.tables.TableColumnUtility;
import com.dqops.utils.tables.TableCompressUtility;
import com.dqops.utils.tables.TableCopyUtility;
import com.dqops.utils.tables.TableMergeUtility;
import lombok.extern.slf4j.Slf4j;
import net.tlabs.tablesaw.parquet.TablesawParquetReadOptions;
import net.tlabs.tablesaw.parquet.TablesawParquetReader;
import org.apache.commons.lang3.ArrayUtils;
Expand All @@ -64,6 +66,7 @@
* Service that supports reading and writing parquet file partitions from a local file system.
*/
@Service
@Slf4j
public class ParquetPartitionStorageServiceImpl implements ParquetPartitionStorageService {
/**
* The number of retries to write a parquet file when another thread was trying to write the same file.
Expand Down Expand Up @@ -489,6 +492,29 @@ else if (datePartitioningColumnOriginal instanceof DateColumn) {
}
}

// fix some invalid records
Column<?> idColumn = dataToSave.column(storageSettings.getIdStringColumnName());
Selection idValueMissing = idColumn.isMissing();
if (!idValueMissing.isEmpty()) {
dataToSave = dataToSave.dropWhere(idValueMissing);

log.warn("Missing ID column values found when saving a partition, ID column name: " +
storageSettings.getIdStringColumnName() + ". Table: " + storageSettings.getTableType() + ", partition: " + loadedPartition.getPartitionId());
}

// fix some invalid records
LongColumn connectionHashColumn = (LongColumn) TableColumnUtility.findColumn(dataToSave, CommonColumnNames.CONNECTION_HASH_COLUMN_NAME);
if (connectionHashColumn != null) {
Selection connectionHashMissing = connectionHashColumn.isMissing();
if (!connectionHashMissing.isEmpty()) {
dataToSave = dataToSave.dropWhere(connectionHashMissing);

log.warn("Missing connection_hash column values found when saving a partition. Table: " +
storageSettings.getTableType() + ", partition: " + loadedPartition.getPartitionId());
}
}


Configuration hadoopConfiguration = this.hadoopConfigurationProvider.getHadoopConfiguration();
byte[] parquetFileContent = new DqoTablesawParquetWriter(hadoopConfiguration).writeToByteArray(dataToSave);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,8 +227,8 @@ public CheckExecutionSummary executeChecksOnTable(ExecutionContext executionCont

SensorReadoutsSnapshot outputSensorReadoutsSnapshot = this.sensorReadoutsSnapshotFactory.createSnapshot(connectionName, physicalTableName, userDomainIdentity);
outputSensorReadoutsSnapshot.ensureMonthsAreLoaded(LocalDate.now(), LocalDate.now()); // just load the current month, so the historic cache will not make another call
SensorReadoutsSnapshot historicReadoutsSnapshot = this.sensorReadoutsSnapshotFactory.createReadOnlySnapshot(connectionName, physicalTableName,
SensorReadoutsColumnNames.SENSOR_READOUT_COLUMN_NAMES_HISTORIC_DATA, userDomainIdentity);
// SensorReadoutsSnapshot historicReadoutsSnapshot = this.sensorReadoutsSnapshotFactory.createReadOnlySnapshot(connectionName, physicalTableName,
// SensorReadoutsColumnNames.SENSOR_READOUT_COLUMN_NAMES_HISTORIC_DATA, userDomainIdentity);

Table allNormalizedSensorResultsTable = outputSensorReadoutsSnapshot.getTableDataChanges().getNewOrChangedRows();
IntColumn severityColumnTemporary = IntColumn.create(CheckResultsColumnNames.SEVERITY_COLUMN_NAME);
Expand All @@ -248,13 +248,13 @@ public CheckExecutionSummary executeChecksOnTable(ExecutionContext executionCont
List<AbstractCheckSpec<?, ?, ?, ?>> singleTableChecks = checks.stream().filter(c -> !c.isTableComparisonCheck())
.collect(Collectors.toList());
executeSingleTableChecks(executionContext, userHome, userTimeWindowFilters, progressListener, dummySensorExecution, executionTarget, jobCancellationToken,
checkExecutionSummary, singleTableChecks, tableSpec, historicReadoutsSnapshot, allNormalizedSensorResultsTable, checkResultsSnapshot,
checkExecutionSummary, singleTableChecks, tableSpec, outputSensorReadoutsSnapshot, allNormalizedSensorResultsTable, checkResultsSnapshot,
allRuleEvaluationResultsTable, allErrorsTable, executionStatistics, checksForErrorSampling, collectErrorSamples);

List<AbstractCheckSpec<?, ?, ?, ?>> tableComparisonChecks = checks.stream().filter(c -> c.isTableComparisonCheck())
.collect(Collectors.toList());
executeTableComparisonChecks(executionContext, userHome, userTimeWindowFilters, progressListener, dummySensorExecution, executionTarget, jobCancellationToken,
checkExecutionSummary, tableComparisonChecks, tableSpec, historicReadoutsSnapshot, allNormalizedSensorResultsTable, checkResultsSnapshot,
checkExecutionSummary, tableComparisonChecks, tableSpec, outputSensorReadoutsSnapshot, allNormalizedSensorResultsTable, checkResultsSnapshot,
allRuleEvaluationResultsTable, allErrorsTable, executionStatistics);

if (outputSensorReadoutsSnapshot.getTableDataChanges().hasChanges() && !dummySensorExecution) {
Expand Down
6 changes: 5 additions & 1 deletion dqops/src/main/java/com/dqops/metadata/id/HierarchyId.java
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,11 @@ public long hashCode64() {
List<HashCode> elementHashes = Arrays.stream(this.elements)
.map(element -> Hashing.farmHashFingerprint64().hashString(element.toString(), StandardCharsets.UTF_8))
.collect(Collectors.toList());
return Math.abs(Hashing.combineOrdered(elementHashes).asLong()); // we return only positive hashes which limits the hash space to 2^63, but positive hashes are easier for users
long hash = Math.abs(Hashing.combineOrdered(elementHashes).asLong());
if (hash == Long.MAX_VALUE) {
hash = Long.MAX_VALUE - 1L;
}
return hash; // we return only positive hashes which limits the hash space to 2^63, but positive hashes are easier for users
}

/**
Expand Down

0 comments on commit 2b2a9ad

Please sign in to comment.