Skip to content

Commit

Permalink
RANGER-4914: Tagsync support for Ozone OFS paths
Browse files Browse the repository at this point in the history
  • Loading branch information
fateh288 committed Sep 12, 2024
1 parent 6e94858 commit bad1f24
Show file tree
Hide file tree
Showing 2 changed files with 259 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

public class AtlasOzoneResourceMapper extends AtlasResourceMapper {
private static final Logger LOG = LoggerFactory.getLogger(AtlasOzoneResourceMapper.class);
Expand All @@ -51,9 +52,32 @@ public class AtlasOzoneResourceMapper extends AtlasResourceMapper {
private static final int IDX_CLUSTER_NAME = 3;
private static final int RESOURCE_COUNT = 4;

// This flag results in ofs atlas qualifiedName to parse paths similar to o3fs
public static final String PROP_LEGACY_PARSING = "ranger.tagsync.atlas.ozone.legacy.parsing.enabled";
public static final String PROP_OFS_KEY_DELIMITER = "ranger.tagsync.atlas.ozone.ofs.key_entity.separator";
public static final String PROP_OFS_BUCKET_DELIMITER = "ranger.tagsync.atlas.ozone.ofs.bucket_entity.separator";

private String ofsKeyDelimiter = "/";
private String ofsBucketDelimiter = "\\.";
private boolean legacyParsingEnabled = false;

public AtlasOzoneResourceMapper() {
super("ozone", SUPPORTED_ENTITY_TYPES);
}
@Override
public void initialize(Properties properties) {
super.initialize(properties);

if (this.properties != null) {
this.legacyParsingEnabled = Boolean.parseBoolean((String) this.properties.getOrDefault(PROP_LEGACY_PARSING, Boolean.toString(legacyParsingEnabled)));
this.ofsKeyDelimiter = (String) this.properties.getOrDefault(PROP_OFS_KEY_DELIMITER, this.ofsKeyDelimiter);
this.ofsBucketDelimiter = (String) this.properties.getOrDefault(PROP_OFS_BUCKET_DELIMITER, this.ofsBucketDelimiter);
}

LOG.info("ofsKeyDelimiter={}", this.ofsKeyDelimiter);
LOG.info("ofsBucketDelimiter={}", this.ofsBucketDelimiter);
LOG.info("legacyParsingEnabled={}",this.legacyParsingEnabled);
}

@Override
public RangerServiceResource buildResource(final RangerAtlasEntity entity) throws Exception {
Expand Down Expand Up @@ -134,8 +158,33 @@ public RangerServiceResource buildResource(final RangerAtlasEntity entity) throw
* o3fs://<volume name>@cm (ozone_key)
* o3fs://<volume name>.<bucket name>@<clusterName> (ozone_bucket)
* o3fs://<bucket name>.<volume name>.<ozone service id>/<key path>@<clusterName> (ozone_key)
* ofs://myvolume@cl1
* ofs://myvolume.mybucket@cl1
* ofs://ozone1/myvolume/mybucket/key1@cl1
* ofs://ozone1/myvolume/mybucket/mykey/key1/@cl1
*/
private String[] parseQualifiedName(String qualifiedName, String entityType) {
int idxProtocolSep = qualifiedName.indexOf(SEP_PROTOCOL);
String prefix = idxProtocolSep != -1 ? qualifiedName.substring(0, idxProtocolSep) : "";

if (LOG.isDebugEnabled()) {
LOG.debug("Prefix for qualifiedName={} is {}", qualifiedName, prefix);
}

if (this.legacyParsingEnabled){
return parseQualifiedNameO3FS(qualifiedName, entityType);
} else if (prefix.equals("ofs")) {
return parseQualifiedNameOFS(qualifiedName, entityType);
} else {
return parseQualifiedNameO3FS(qualifiedName, entityType);
}
}

private String[] parseQualifiedNameOFS(String qualifiedName, String entityType) {
if (LOG.isDebugEnabled()) {
LOG.debug("==> parseQualifiedNameOFS(qualifiedName={}, entityType={})", qualifiedName, entityType);
}

String[] ret = new String[RESOURCE_COUNT];

if(StringUtils.isNotBlank(qualifiedName)) {
Expand All @@ -147,26 +196,88 @@ private String[] parseQualifiedName(String qualifiedName, String entityType) {
int idxProtocolSep = qualifiedName.indexOf(SEP_PROTOCOL);

if (idxProtocolSep != -1) {
int idxResourceStart = idxProtocolSep + SEP_PROTOCOL.length();
if (StringUtils.equals(entityType, ENTITY_TYPE_OZONE_VOLUME)) {
int idxResourceStart = idxProtocolSep + SEP_PROTOCOL.length();

if (StringUtils.equals(entityType, ENTITY_TYPE_OZONE_VOLUME)) { // ofs://vol1@cl1
ret[IDX_VOLUME] = qualifiedName.substring(idxResourceStart, idxClusterNameSep);
} else if (StringUtils.equals(entityType, ENTITY_TYPE_OZONE_BUCKET)) {
String[] resources = qualifiedName.substring(idxResourceStart, idxClusterNameSep).split(QUALIFIED_NAME_DELIMITER);
ret[IDX_VOLUME] = resources.length > 0 ? resources[0] : null;
ret[IDX_BUCKET] = resources.length > 1 ? resources[1] : null;
} else if (StringUtils.equals(entityType, ENTITY_TYPE_OZONE_KEY)) {
} else if (StringUtils.equals(entityType, ENTITY_TYPE_OZONE_BUCKET)) { // ofs://vol1.buck1@cl1
// anything before first "." is volume name, after that is bucket name. So, "." in volume name is invalid when tagging buckets
String[] resources = qualifiedName.substring(idxResourceStart, idxClusterNameSep).split(this.ofsBucketDelimiter,2);

ret[IDX_VOLUME] = resources.length > 0 ? resources[0] : null;
ret[IDX_BUCKET] = resources.length > 1 ? resources[1] : null;
} else if (StringUtils.equals(entityType, ENTITY_TYPE_OZONE_KEY)) { // ofs://svcid/vol1/buck1/d1/d2/key1@cl1
// This is a special case wherein the delimiter is a "/" instead of a "." in the qualifiedName in ofs path
idxResourceStart = qualifiedName.indexOf(this.ofsKeyDelimiter, idxProtocolSep + SEP_PROTOCOL.length()) + 1;

String resourceString = qualifiedName.substring(idxResourceStart, idxClusterNameSep);
String[] resources = resourceString.split(this.ofsKeyDelimiter, 3);

ret[IDX_VOLUME] = resources.length > 0 ? resources[0] : null;
ret[IDX_BUCKET] = resources.length > 1 ? resources[1] : null;
ret[IDX_KEY] = resources.length > 2 ? resources[2] : null;
}
}
}
}

if (LOG.isDebugEnabled()) {
LOG.debug("<== parseQualifiedNameOFS(qualifiedName={}, entityType={}): volume={}, bucket={}, key={}, clusterName={}", qualifiedName, entityType, ret[IDX_VOLUME], ret[IDX_BUCKET], ret[IDX_KEY], ret[IDX_CLUSTER_NAME]);
}

return ret;
}

private String[] parseQualifiedNameO3FS(String qualifiedName, String entityType){
if (LOG.isDebugEnabled()) {
LOG.debug("==> parseQualifiedNameO3FS(qualifiedName={}, entityType={})", qualifiedName, entityType);
}

String[] ret = new String[RESOURCE_COUNT];

if(StringUtils.isNotBlank(qualifiedName)) {
int idxClusterNameSep = qualifiedName.lastIndexOf(CLUSTER_DELIMITER);

if (idxClusterNameSep != -1) {
ret[IDX_CLUSTER_NAME] = qualifiedName.substring(idxClusterNameSep + CLUSTER_DELIMITER.length());

int idxProtocolSep = qualifiedName.indexOf(SEP_PROTOCOL);

if (idxProtocolSep != -1) {
int idxResourceStart = idxProtocolSep + SEP_PROTOCOL.length();

if (StringUtils.equals(entityType, ENTITY_TYPE_OZONE_VOLUME)) { // o3fs://vol1@cl1
ret[IDX_VOLUME] = qualifiedName.substring(idxResourceStart, idxClusterNameSep);
} else if (StringUtils.equals(entityType, ENTITY_TYPE_OZONE_BUCKET)) { // o3fs://vol1.buck1@cl1
String[] resources = qualifiedName.substring(idxResourceStart, idxClusterNameSep).split(QUALIFIED_NAME_DELIMITER);
ret[IDX_BUCKET] = resources.length > 0 ? resources[0] : null;
ret[IDX_VOLUME] = resources.length > 1 ? resources[1] : null;
int idxRelativePath = qualifiedName.indexOf(SEP_RELATIVE_PATH, idxResourceStart);
if (idxRelativePath != -1) {
ret[IDX_KEY] = qualifiedName.substring(idxRelativePath+1, idxClusterNameSep);

ret[IDX_VOLUME] = resources.length > 0 ? resources[0] : null;
ret[IDX_BUCKET] = resources.length > 1 ? resources[1] : null;
} else if (StringUtils.equals(entityType, ENTITY_TYPE_OZONE_KEY)) { // o3fs://buck1.vol1.svc1/d1/d2/key1@cl1
String[] resources = qualifiedName.substring(idxResourceStart, idxClusterNameSep).split(QUALIFIED_NAME_DELIMITER, 3);

ret[IDX_BUCKET] = resources.length > 0 ? resources[0] : null;
ret[IDX_VOLUME] = resources.length > 1 ? resources[1] : null;
ret[IDX_KEY] = resources.length > 2 ? resources[2] : null;

if (ret[IDX_KEY] != null) { // skip svcid
int idxKeySep = ret[IDX_KEY].indexOf(SEP_RELATIVE_PATH);

if (idxKeySep != -1) {
ret[IDX_KEY] = ret[IDX_KEY].substring(idxKeySep + SEP_RELATIVE_PATH.length());
} else {
ret[IDX_KEY] = null;
}
}
}
}
}
}

if (LOG.isDebugEnabled()) {
LOG.debug("<== parseQualifiedNameO3FS(qualifiedName={}, entityType={}): volume={}, bucket={}, key={}, clusterName={}", qualifiedName, entityType, ret[IDX_VOLUME], ret[IDX_BUCKET], ret[IDX_KEY], ret[IDX_CLUSTER_NAME]);
}

return ret;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,11 @@
import org.apache.ranger.tagsync.source.atlas.AtlasOzoneResourceMapper;
import org.apache.ranger.tagsync.source.atlasrest.RangerAtlasEntity;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;

import java.util.Collections;
import java.util.Properties;

import static org.apache.ranger.tagsync.source.atlas.AtlasOzoneResourceMapper.*;
import static org.apache.ranger.tagsync.source.atlas.AtlasResourceMapper.ENTITY_ATTRIBUTE_QUALIFIED_NAME;
Expand All @@ -37,13 +39,24 @@ public class TestOzoneResourceMapper {
private static final String KEY_QUALIFIED_NAME = "o3fs://mybucket.myvolume.ozone1/mykey.txt@cl1" ;
private static final String KEY_PATH_QUALIFIED_NAME = "o3fs://mybucket.myvolume.ozone1/mykey/key1/@cl1";

private static final String VOLUME_QUALIFIED_NAME_OFS = "ofs://myvolume@cl1";
private static final String BUCKET_QUALIFIED_NAME_OFS = "ofs://myvolume.mybucket@cl1";
private static final String KEY_QUALIFIED_NAME_OFS = "ofs://ozone1/myvolume/mybucket/mykey.txt@cl1";
private static final String KEY_PATH_QUALIFIED_NAME_OFS = "ofs://ozone1/myvolume/mybucket/mykey/key1/@cl1";

private static final String SERVICE_NAME = "cl1_ozone";
private static final String VOLUME_NAME = "myvolume";
private static final String BUCKET_NAME = "mybucket";
private static final String KEY_NAME = "mykey.txt";
private static final String KEY_PATH = "mykey/key1/";

AtlasOzoneResourceMapper resourceMapper = new AtlasOzoneResourceMapper();

static AtlasOzoneResourceMapper resourceMapper = new AtlasOzoneResourceMapper();

@BeforeClass
public static void init(){
resourceMapper.initialize(new Properties());
}

@Test
public void testVolumeEntity() throws Exception {
Expand All @@ -54,7 +67,15 @@ public void testVolumeEntity() throws Exception {
assertResourceElementCount(resource, 1);
assertResourceElementValue(resource, RANGER_TYPE_OZONE_VOLUME, VOLUME_NAME);
}
@Test
public void testVolumeEntityOFS() throws Exception {
RangerAtlasEntity entity = getEntity(ENTITY_TYPE_OZONE_VOLUME, VOLUME_QUALIFIED_NAME_OFS);
RangerServiceResource resource = resourceMapper.buildResource(entity);

Assert.assertEquals(SERVICE_NAME, resource.getServiceName());
assertResourceElementCount(resource, 1);
assertResourceElementValue(resource, RANGER_TYPE_OZONE_VOLUME, VOLUME_NAME);
}
@Test
public void testBucketEntity() throws Exception {
RangerAtlasEntity entity = getEntity(ENTITY_TYPE_OZONE_BUCKET, BUCKET_QUALIFIED_NAME);
Expand All @@ -65,7 +86,16 @@ public void testBucketEntity() throws Exception {
assertResourceElementValue(resource, RANGER_TYPE_OZONE_VOLUME, VOLUME_NAME);
assertResourceElementValue(resource, RANGER_TYPE_OZONE_BUCKET, BUCKET_NAME);
}
@Test
public void testBucketEntityOFS() throws Exception {
RangerAtlasEntity entity = getEntity(ENTITY_TYPE_OZONE_BUCKET, BUCKET_QUALIFIED_NAME_OFS);
RangerServiceResource resource = resourceMapper.buildResource(entity);

Assert.assertEquals(SERVICE_NAME, resource.getServiceName());
assertResourceElementCount(resource, 2);
assertResourceElementValue(resource, RANGER_TYPE_OZONE_VOLUME, VOLUME_NAME);
assertResourceElementValue(resource, RANGER_TYPE_OZONE_BUCKET, BUCKET_NAME);
}
@Test
public void testKeyEntity() throws Exception {
RangerAtlasEntity entity = getEntity(ENTITY_TYPE_OZONE_KEY, KEY_QUALIFIED_NAME);
Expand All @@ -77,7 +107,17 @@ public void testKeyEntity() throws Exception {
assertResourceElementValue(resource, RANGER_TYPE_OZONE_BUCKET, BUCKET_NAME);
assertResourceElementValue(resource, RANGER_TYPE_OZONE_KEY, KEY_NAME);
}
@Test
public void testKeyEntityOFS() throws Exception {
RangerAtlasEntity entity = getEntity(ENTITY_TYPE_OZONE_KEY, KEY_QUALIFIED_NAME_OFS);
RangerServiceResource resource = resourceMapper.buildResource(entity);

Assert.assertEquals(SERVICE_NAME, resource.getServiceName());
assertResourceElementCount(resource, 3);
assertResourceElementValue(resource, RANGER_TYPE_OZONE_VOLUME, VOLUME_NAME);
assertResourceElementValue(resource, RANGER_TYPE_OZONE_BUCKET, BUCKET_NAME);
assertResourceElementValue(resource, RANGER_TYPE_OZONE_KEY, KEY_NAME);
}
@Test
public void testKey2Entity() throws Exception {
RangerAtlasEntity entity = getEntity(ENTITY_TYPE_OZONE_KEY, KEY_PATH_QUALIFIED_NAME);
Expand All @@ -89,7 +129,101 @@ public void testKey2Entity() throws Exception {
assertResourceElementValue(resource, RANGER_TYPE_OZONE_BUCKET, BUCKET_NAME);
assertResourceElementValue(resource, RANGER_TYPE_OZONE_KEY, KEY_PATH);
}

@Test
public void testKey2EntityOFS() throws Exception {
RangerAtlasEntity entity = getEntity(ENTITY_TYPE_OZONE_KEY, KEY_PATH_QUALIFIED_NAME_OFS);
RangerServiceResource resource = resourceMapper.buildResource(entity);
Assert.assertEquals(SERVICE_NAME, resource.getServiceName());
assertResourceElementCount(resource, 3);
assertResourceElementValue(resource, RANGER_TYPE_OZONE_VOLUME, VOLUME_NAME);
assertResourceElementValue(resource, RANGER_TYPE_OZONE_BUCKET, BUCKET_NAME);
assertResourceElementValue(resource, RANGER_TYPE_OZONE_KEY, KEY_PATH);
}
@Test
public void testKeyEntityOFSLegacyDotDelimiter() throws Exception {
AtlasOzoneResourceMapper legacyResourceMapper = new AtlasOzoneResourceMapper();
Properties legacyProperties = new Properties();
legacyProperties.setProperty(PROP_LEGACY_PARSING, "true");
legacyResourceMapper.initialize(legacyProperties);
String qualifiedName = "ofs://mybucket.myvolume.ozone1/mykey.txt@cl1";
RangerAtlasEntity entity = getEntity(ENTITY_TYPE_OZONE_KEY, qualifiedName);
RangerServiceResource resource = legacyResourceMapper.buildResource(entity);
Assert.assertEquals(SERVICE_NAME, resource.getServiceName());
assertResourceElementCount(resource, 3);
assertResourceElementValue(resource, RANGER_TYPE_OZONE_VOLUME, VOLUME_NAME);
assertResourceElementValue(resource, RANGER_TYPE_OZONE_BUCKET, BUCKET_NAME);
assertResourceElementValue(resource, RANGER_TYPE_OZONE_KEY, KEY_NAME);
}
@Test
public void testInvalidKeyEntityOFSLegacyDotDelimiter() throws Exception {
AtlasOzoneResourceMapper legacyResourceMapper = new AtlasOzoneResourceMapper();
Properties legacyProperties = new Properties();
legacyProperties.setProperty(PROP_LEGACY_PARSING, "true");
legacyResourceMapper.initialize(legacyProperties);
RangerAtlasEntity entity = getEntity(ENTITY_TYPE_OZONE_KEY, KEY_PATH_QUALIFIED_NAME_OFS);
try {
RangerServiceResource resource = legacyResourceMapper.buildResource(entity);
Assert.assertFalse("Expected buildResource() to fail. But it returned " + resource+". "
+ "'/' not supported as delimiter when legacy flag is enabled", true);
} catch (Exception excp) {
System.out.println("Exception was as expected: "+ KEY_PATH_QUALIFIED_NAME_OFS+
" cannot be parsed when property"+ PROP_LEGACY_PARSING +" is true");
}
}
@Test
public void testVolumeEntityWithDotOFS() throws Exception {
String qualifiedName = "ofs://myvolume.volpostfix@cl1";
String expectedVolumeName = "myvolume.volpostfix";
RangerAtlasEntity entity = getEntity(ENTITY_TYPE_OZONE_VOLUME, qualifiedName);
RangerServiceResource resource = resourceMapper.buildResource(entity);
Assert.assertEquals(SERVICE_NAME, resource.getServiceName());
assertResourceElementCount(resource, 1);
assertResourceElementValue(resource, RANGER_TYPE_OZONE_VOLUME, expectedVolumeName);
}
@Test
public void testBucketEntityWithDotOFS() throws Exception {
String qualifiedName = "ofs://myvolume.bucketprefix.mybucket.bucketpostfix@cl1";
String expectedVolumeName = "myvolume";
String expectedBucketName = "bucketprefix.mybucket.bucketpostfix";
RangerAtlasEntity entity = getEntity(ENTITY_TYPE_OZONE_BUCKET, qualifiedName);
RangerServiceResource resource = resourceMapper.buildResource(entity);
Assert.assertEquals(SERVICE_NAME, resource.getServiceName());
assertResourceElementCount(resource, 2);
assertResourceElementValue(resource, RANGER_TYPE_OZONE_VOLUME, expectedVolumeName);
assertResourceElementValue(resource, RANGER_TYPE_OZONE_BUCKET, expectedBucketName);
}
@Test
public void testKeyEntityWithDotOFS() throws Exception {
String qualifiedName = "ofs://ozone1/myvolume.volumepostfix/mybucket.bucketpostfix/keypath/keyprefix.mykey.txt@cl1";
String expectedVolumeName = "myvolume.volumepostfix";
String expectedBucketName = "mybucket.bucketpostfix";
String expectedKeyName = "keypath/keyprefix.mykey.txt";
RangerAtlasEntity entity = getEntity(ENTITY_TYPE_OZONE_KEY, qualifiedName);
RangerServiceResource resource = resourceMapper.buildResource(entity);
Assert.assertEquals(SERVICE_NAME, resource.getServiceName());
assertResourceElementCount(resource, 3);
assertResourceElementValue(resource, RANGER_TYPE_OZONE_VOLUME, expectedVolumeName);
assertResourceElementValue(resource, RANGER_TYPE_OZONE_BUCKET, expectedBucketName);
assertResourceElementValue(resource, RANGER_TYPE_OZONE_KEY, expectedKeyName);
}
@Test
public void testBucketEntityWithSlashOFS() throws Exception {
//future work : scenario when atlas fixes the bucket delimiter from "." to "/";
AtlasOzoneResourceMapper afterDelimiterFixResourceMapper = new AtlasOzoneResourceMapper();
Properties properties = new Properties();
properties.setProperty(PROP_OFS_BUCKET_DELIMITER, "/");
afterDelimiterFixResourceMapper.initialize(properties);
//both volume and bucket name could have a "." in it after this fix
String qualifiedName = "ofs://myvolume.volumepostfix/mybucket.bucketpostfix@cl1";
String expectedVolumeName = "myvolume.volumepostfix";
String expectedBucketName = "mybucket.bucketpostfix";
RangerAtlasEntity entity = getEntity(ENTITY_TYPE_OZONE_BUCKET, qualifiedName);
RangerServiceResource resource = afterDelimiterFixResourceMapper.buildResource(entity);
Assert.assertEquals(SERVICE_NAME, resource.getServiceName());
assertResourceElementCount(resource, 2);
assertResourceElementValue(resource, RANGER_TYPE_OZONE_VOLUME, expectedVolumeName);
assertResourceElementValue(resource, RANGER_TYPE_OZONE_BUCKET, expectedBucketName);
}
@Test
public void testInvalidEntityType() {
assertException(getEntity("Unknown", KEY_PATH_QUALIFIED_NAME), "unrecognized entity-type");
Expand Down

0 comments on commit bad1f24

Please sign in to comment.