Skip to content

Commit

Permalink
Refactor GlobalClockRule (#32932)
Browse files Browse the repository at this point in the history
  • Loading branch information
terrymanu committed Sep 20, 2024
1 parent 366ea48 commit b7294f7
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,20 +50,14 @@ public final class GlobalClockTransactionHook extends TransactionHookAdapter {

@Override
public void init(final Properties props) {
if (!Boolean.parseBoolean(props.getProperty("enabled"))) {
enabled = false;
return;
}
DatabaseType databaseType = TypedSPILoader.getService(DatabaseType.class, props.getProperty("trunkType"));
Optional<GlobalClockTransactionExecutor> globalClockTransactionExecutor = DatabaseTypedSPILoader.findService(GlobalClockTransactionExecutor.class, databaseType);
if (!globalClockTransactionExecutor.isPresent()) {
enabled = false;
return;
}
enabled = true;
this.globalClockTransactionExecutor = globalClockTransactionExecutor.get();
globalClockProvider = TypedSPILoader.getService(GlobalClockProvider.class, String.join(".", props.getProperty("type"), props.getProperty("provider")));

}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,40 +36,46 @@
/**
* Global clock rule.
*/
@Getter
public final class GlobalClockRule implements GlobalRule {

@Getter
private final GlobalClockRuleConfiguration configuration;

private final GlobalClockProvider provider;

public GlobalClockRule(final GlobalClockRuleConfiguration ruleConfig, final Map<String, ShardingSphereDatabase> databases) {
configuration = ruleConfig;
if (ruleConfig.isEnabled()) {
TypedSPILoader.getService(GlobalClockProvider.class, getGlobalClockProviderType(), configuration.getProps());
provider = TypedSPILoader.getService(GlobalClockProvider.class, getGlobalClockProviderType(), configuration.getProps());
TypedSPILoader.getService(TransactionHook.class, "GLOBAL_CLOCK", createProperties(databases));
} else {
provider = null;
}
}

private String getGlobalClockProviderType() {
return String.join(".", configuration.getType(), configuration.getProvider());
}

private Properties createProperties(final Map<String, ShardingSphereDatabase> databases) {
Properties result = new Properties();
DatabaseType storageType = findStorageType(databases.values()).orElseGet(DatabaseTypeEngine::getDefaultStorageType);
result.setProperty("trunkType", storageType.getTrunkDatabaseType().orElse(storageType).getType());
result.setProperty("enabled", String.valueOf(configuration.isEnabled()));
result.setProperty("type", configuration.getType());
result.setProperty("provider", configuration.getProvider());
return result;
}

private Optional<DatabaseType> findStorageType(final Collection<ShardingSphereDatabase> databases) {
return databases.stream()
.flatMap(each -> each.getResourceMetaData().getStorageUnits().values().stream()).findFirst().map(StorageUnit::getStorageType);
return databases.stream().flatMap(each -> each.getResourceMetaData().getStorageUnits().values().stream()).findFirst().map(StorageUnit::getStorageType);
}

/**
* Get global clock provider type.
* Get current timestamp.
*
* @return global clock provider type
* @return current timestamp
*/
public String getGlobalClockProviderType() {
return String.join(".", configuration.getType(), configuration.getProvider());
public long getCurrentTimestamp() {
return null == provider ? 0L : provider.getCurrentTimestamp();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,12 @@
import org.apache.commons.codec.binary.Base64;
import org.apache.shardingsphere.distsql.handler.engine.query.DistSQLQueryExecutor;
import org.apache.shardingsphere.distsql.statement.ral.queryable.export.ExportMetaDataStatement;
import org.apache.shardingsphere.globalclock.provider.GlobalClockProvider;
import org.apache.shardingsphere.globalclock.rule.GlobalClockRule;
import org.apache.shardingsphere.infra.config.rule.RuleConfiguration;
import org.apache.shardingsphere.infra.merge.result.impl.local.LocalDataQueryResultRow;
import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
import org.apache.shardingsphere.infra.spi.type.ordered.OrderedSPILoader;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
import org.apache.shardingsphere.infra.util.json.JsonUtils;
import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
import org.apache.shardingsphere.infra.yaml.config.swapper.rule.YamlRuleConfigurationSwapper;
Expand Down Expand Up @@ -129,11 +127,8 @@ private String generateRulesData(final Collection<RuleConfiguration> rules) {
private void generateSnapshotInfo(final ShardingSphereMetaData metaData, final ExportedClusterInfo exportedClusterInfo) {
GlobalClockRule globalClockRule = metaData.getGlobalRuleMetaData().getSingleRule(GlobalClockRule.class);
if (globalClockRule.getConfiguration().isEnabled()) {
GlobalClockProvider globalClockProvider = TypedSPILoader.getService(GlobalClockProvider.class,
globalClockRule.getGlobalClockProviderType(), globalClockRule.getConfiguration().getProps());
long csn = globalClockProvider.getCurrentTimestamp();
ExportedSnapshotInfo snapshotInfo = new ExportedSnapshotInfo();
snapshotInfo.setCsn(String.valueOf(csn));
snapshotInfo.setCsn(String.valueOf(globalClockRule.getCurrentTimestamp()));
snapshotInfo.setCreateTime(LocalDateTime.now());
exportedClusterInfo.setSnapshotInfo(snapshotInfo);
}
Expand Down

0 comments on commit b7294f7

Please sign in to comment.