Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RANGER-4914: Tagsync support for Ozone OFS paths #385

Merged
merged 1 commit into from
Sep 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

public class AtlasOzoneResourceMapper extends AtlasResourceMapper {
private static final Logger LOG = LoggerFactory.getLogger(AtlasOzoneResourceMapper.class);
Expand All @@ -51,9 +52,32 @@ public class AtlasOzoneResourceMapper extends AtlasResourceMapper {
private static final int IDX_CLUSTER_NAME = 3;
private static final int RESOURCE_COUNT = 4;

// This flag results in ofs atlas qualifiedName to parse paths similar to o3fs
public static final String PROP_LEGACY_PARSING = "ranger.tagsync.atlas.ozone.legacy.parsing.enabled";
public static final String PROP_OFS_KEY_DELIMITER = "ranger.tagsync.atlas.ozone.ofs.key_entity.separator";
public static final String PROP_OFS_BUCKET_DELIMITER = "ranger.tagsync.atlas.ozone.ofs.bucket_entity.separator";

private String ofsKeyDelimiter = "/";
private String ofsBucketDelimiter = "\\.";
private boolean legacyParsingEnabled = false;

public AtlasOzoneResourceMapper() {
super("ozone", SUPPORTED_ENTITY_TYPES);
}
@Override
public void initialize(Properties properties) {
super.initialize(properties);

if (this.properties != null) {
this.legacyParsingEnabled = Boolean.parseBoolean((String) this.properties.getOrDefault(PROP_LEGACY_PARSING, Boolean.toString(legacyParsingEnabled)));
this.ofsKeyDelimiter = (String) this.properties.getOrDefault(PROP_OFS_KEY_DELIMITER, this.ofsKeyDelimiter);
this.ofsBucketDelimiter = (String) this.properties.getOrDefault(PROP_OFS_BUCKET_DELIMITER, this.ofsBucketDelimiter);
}

LOG.info("ofsKeyDelimiter={}", this.ofsKeyDelimiter);
LOG.info("ofsBucketDelimiter={}", this.ofsBucketDelimiter);
LOG.info("legacyParsingEnabled={}",this.legacyParsingEnabled);
}

@Override
public RangerServiceResource buildResource(final RangerAtlasEntity entity) throws Exception {
Expand Down Expand Up @@ -134,8 +158,33 @@ public RangerServiceResource buildResource(final RangerAtlasEntity entity) throw
* o3fs://<volume name>@cm (ozone_key)
* o3fs://<volume name>.<bucket name>@<clusterName> (ozone_bucket)
* o3fs://<bucket name>.<volume name>.<ozone service id>/<key path>@<clusterName> (ozone_key)
* ofs://myvolume@cl1
* ofs://myvolume.mybucket@cl1
* ofs://ozone1/myvolume/mybucket/key1@cl1
* ofs://ozone1/myvolume/mybucket/mykey/key1/@cl1
*/
private String[] parseQualifiedName(String qualifiedName, String entityType) {
int idxProtocolSep = qualifiedName.indexOf(SEP_PROTOCOL);
String prefix = idxProtocolSep != -1 ? qualifiedName.substring(0, idxProtocolSep) : "";

if (LOG.isDebugEnabled()) {
LOG.debug("Prefix for qualifiedName={} is {}", qualifiedName, prefix);
}

if (this.legacyParsingEnabled){
return parseQualifiedNameO3FS(qualifiedName, entityType);
} else if (prefix.equals("ofs")) {
return parseQualifiedNameOFS(qualifiedName, entityType);
} else {
return parseQualifiedNameO3FS(qualifiedName, entityType);
}
}

private String[] parseQualifiedNameOFS(String qualifiedName, String entityType) {
if (LOG.isDebugEnabled()) {
LOG.debug("==> parseQualifiedNameOFS(qualifiedName={}, entityType={})", qualifiedName, entityType);
}

String[] ret = new String[RESOURCE_COUNT];

if(StringUtils.isNotBlank(qualifiedName)) {
Expand All @@ -147,26 +196,88 @@ private String[] parseQualifiedName(String qualifiedName, String entityType) {
int idxProtocolSep = qualifiedName.indexOf(SEP_PROTOCOL);

if (idxProtocolSep != -1) {
int idxResourceStart = idxProtocolSep + SEP_PROTOCOL.length();
if (StringUtils.equals(entityType, ENTITY_TYPE_OZONE_VOLUME)) {
int idxResourceStart = idxProtocolSep + SEP_PROTOCOL.length();

if (StringUtils.equals(entityType, ENTITY_TYPE_OZONE_VOLUME)) { // ofs://vol1@cl1
ret[IDX_VOLUME] = qualifiedName.substring(idxResourceStart, idxClusterNameSep);
} else if (StringUtils.equals(entityType, ENTITY_TYPE_OZONE_BUCKET)) {
String[] resources = qualifiedName.substring(idxResourceStart, idxClusterNameSep).split(QUALIFIED_NAME_DELIMITER);
ret[IDX_VOLUME] = resources.length > 0 ? resources[0] : null;
ret[IDX_BUCKET] = resources.length > 1 ? resources[1] : null;
} else if (StringUtils.equals(entityType, ENTITY_TYPE_OZONE_KEY)) {
} else if (StringUtils.equals(entityType, ENTITY_TYPE_OZONE_BUCKET)) { // ofs://vol1.buck1@cl1
// anything before first "." is volume name, after that is bucket name. So, "." in volume name is invalid when tagging buckets
String[] resources = qualifiedName.substring(idxResourceStart, idxClusterNameSep).split(this.ofsBucketDelimiter,2);

ret[IDX_VOLUME] = resources.length > 0 ? resources[0] : null;
ret[IDX_BUCKET] = resources.length > 1 ? resources[1] : null;
} else if (StringUtils.equals(entityType, ENTITY_TYPE_OZONE_KEY)) { // ofs://svcid/vol1/buck1/d1/d2/key1@cl1
// This is a special case wherein the delimiter is a "/" instead of a "." in the qualifiedName in ofs path
idxResourceStart = qualifiedName.indexOf(this.ofsKeyDelimiter, idxProtocolSep + SEP_PROTOCOL.length()) + 1;

String resourceString = qualifiedName.substring(idxResourceStart, idxClusterNameSep);
String[] resources = resourceString.split(this.ofsKeyDelimiter, 3);

ret[IDX_VOLUME] = resources.length > 0 ? resources[0] : null;
ret[IDX_BUCKET] = resources.length > 1 ? resources[1] : null;
ret[IDX_KEY] = resources.length > 2 ? resources[2] : null;
}
}
}
}

if (LOG.isDebugEnabled()) {
LOG.debug("<== parseQualifiedNameOFS(qualifiedName={}, entityType={}): volume={}, bucket={}, key={}, clusterName={}", qualifiedName, entityType, ret[IDX_VOLUME], ret[IDX_BUCKET], ret[IDX_KEY], ret[IDX_CLUSTER_NAME]);
}

return ret;
}

private String[] parseQualifiedNameO3FS(String qualifiedName, String entityType){
if (LOG.isDebugEnabled()) {
LOG.debug("==> parseQualifiedNameO3FS(qualifiedName={}, entityType={})", qualifiedName, entityType);
}

String[] ret = new String[RESOURCE_COUNT];

if(StringUtils.isNotBlank(qualifiedName)) {
int idxClusterNameSep = qualifiedName.lastIndexOf(CLUSTER_DELIMITER);

if (idxClusterNameSep != -1) {
ret[IDX_CLUSTER_NAME] = qualifiedName.substring(idxClusterNameSep + CLUSTER_DELIMITER.length());

int idxProtocolSep = qualifiedName.indexOf(SEP_PROTOCOL);

if (idxProtocolSep != -1) {
int idxResourceStart = idxProtocolSep + SEP_PROTOCOL.length();

if (StringUtils.equals(entityType, ENTITY_TYPE_OZONE_VOLUME)) { // o3fs://vol1@cl1
ret[IDX_VOLUME] = qualifiedName.substring(idxResourceStart, idxClusterNameSep);
} else if (StringUtils.equals(entityType, ENTITY_TYPE_OZONE_BUCKET)) { // o3fs://vol1.buck1@cl1
String[] resources = qualifiedName.substring(idxResourceStart, idxClusterNameSep).split(QUALIFIED_NAME_DELIMITER);
ret[IDX_BUCKET] = resources.length > 0 ? resources[0] : null;
ret[IDX_VOLUME] = resources.length > 1 ? resources[1] : null;
int idxRelativePath = qualifiedName.indexOf(SEP_RELATIVE_PATH, idxResourceStart);
if (idxRelativePath != -1) {
ret[IDX_KEY] = qualifiedName.substring(idxRelativePath+1, idxClusterNameSep);

ret[IDX_VOLUME] = resources.length > 0 ? resources[0] : null;
ret[IDX_BUCKET] = resources.length > 1 ? resources[1] : null;
} else if (StringUtils.equals(entityType, ENTITY_TYPE_OZONE_KEY)) { // o3fs://buck1.vol1.svc1/d1/d2/key1@cl1
String[] resources = qualifiedName.substring(idxResourceStart, idxClusterNameSep).split(QUALIFIED_NAME_DELIMITER, 3);

ret[IDX_BUCKET] = resources.length > 0 ? resources[0] : null;
ret[IDX_VOLUME] = resources.length > 1 ? resources[1] : null;
ret[IDX_KEY] = resources.length > 2 ? resources[2] : null;

if (ret[IDX_KEY] != null) { // skip svcid
int idxKeySep = ret[IDX_KEY].indexOf(SEP_RELATIVE_PATH);

if (idxKeySep != -1) {
ret[IDX_KEY] = ret[IDX_KEY].substring(idxKeySep + SEP_RELATIVE_PATH.length());
} else {
ret[IDX_KEY] = null;
}
}
}
}
}
}

if (LOG.isDebugEnabled()) {
LOG.debug("<== parseQualifiedNameO3FS(qualifiedName={}, entityType={}): volume={}, bucket={}, key={}, clusterName={}", qualifiedName, entityType, ret[IDX_VOLUME], ret[IDX_BUCKET], ret[IDX_KEY], ret[IDX_CLUSTER_NAME]);
}

return ret;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,11 @@
import org.apache.ranger.tagsync.source.atlas.AtlasOzoneResourceMapper;
import org.apache.ranger.tagsync.source.atlasrest.RangerAtlasEntity;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;

import java.util.Collections;
import java.util.Properties;

import static org.apache.ranger.tagsync.source.atlas.AtlasOzoneResourceMapper.*;
import static org.apache.ranger.tagsync.source.atlas.AtlasResourceMapper.ENTITY_ATTRIBUTE_QUALIFIED_NAME;
Expand All @@ -37,13 +39,24 @@ public class TestOzoneResourceMapper {
private static final String KEY_QUALIFIED_NAME = "o3fs://mybucket.myvolume.ozone1/mykey.txt@cl1" ;
private static final String KEY_PATH_QUALIFIED_NAME = "o3fs://mybucket.myvolume.ozone1/mykey/key1/@cl1";

private static final String VOLUME_QUALIFIED_NAME_OFS = "ofs://myvolume@cl1";
private static final String BUCKET_QUALIFIED_NAME_OFS = "ofs://myvolume.mybucket@cl1";
private static final String KEY_QUALIFIED_NAME_OFS = "ofs://ozone1/myvolume/mybucket/mykey.txt@cl1";
private static final String KEY_PATH_QUALIFIED_NAME_OFS = "ofs://ozone1/myvolume/mybucket/mykey/key1/@cl1";

private static final String SERVICE_NAME = "cl1_ozone";
private static final String VOLUME_NAME = "myvolume";
private static final String BUCKET_NAME = "mybucket";
private static final String KEY_NAME = "mykey.txt";
private static final String KEY_PATH = "mykey/key1/";

AtlasOzoneResourceMapper resourceMapper = new AtlasOzoneResourceMapper();

static AtlasOzoneResourceMapper resourceMapper = new AtlasOzoneResourceMapper();

@BeforeClass
public static void init(){
resourceMapper.initialize(new Properties());
}

@Test
public void testVolumeEntity() throws Exception {
Expand All @@ -54,7 +67,15 @@ public void testVolumeEntity() throws Exception {
assertResourceElementCount(resource, 1);
assertResourceElementValue(resource, RANGER_TYPE_OZONE_VOLUME, VOLUME_NAME);
}
@Test
public void testVolumeEntityOFS() throws Exception {
RangerAtlasEntity entity = getEntity(ENTITY_TYPE_OZONE_VOLUME, VOLUME_QUALIFIED_NAME_OFS);
RangerServiceResource resource = resourceMapper.buildResource(entity);

Assert.assertEquals(SERVICE_NAME, resource.getServiceName());
assertResourceElementCount(resource, 1);
assertResourceElementValue(resource, RANGER_TYPE_OZONE_VOLUME, VOLUME_NAME);
}
@Test
public void testBucketEntity() throws Exception {
RangerAtlasEntity entity = getEntity(ENTITY_TYPE_OZONE_BUCKET, BUCKET_QUALIFIED_NAME);
Expand All @@ -65,7 +86,16 @@ public void testBucketEntity() throws Exception {
assertResourceElementValue(resource, RANGER_TYPE_OZONE_VOLUME, VOLUME_NAME);
assertResourceElementValue(resource, RANGER_TYPE_OZONE_BUCKET, BUCKET_NAME);
}
@Test
public void testBucketEntityOFS() throws Exception {
RangerAtlasEntity entity = getEntity(ENTITY_TYPE_OZONE_BUCKET, BUCKET_QUALIFIED_NAME_OFS);
RangerServiceResource resource = resourceMapper.buildResource(entity);

Assert.assertEquals(SERVICE_NAME, resource.getServiceName());
assertResourceElementCount(resource, 2);
assertResourceElementValue(resource, RANGER_TYPE_OZONE_VOLUME, VOLUME_NAME);
assertResourceElementValue(resource, RANGER_TYPE_OZONE_BUCKET, BUCKET_NAME);
}
@Test
public void testKeyEntity() throws Exception {
RangerAtlasEntity entity = getEntity(ENTITY_TYPE_OZONE_KEY, KEY_QUALIFIED_NAME);
Expand All @@ -77,7 +107,17 @@ public void testKeyEntity() throws Exception {
assertResourceElementValue(resource, RANGER_TYPE_OZONE_BUCKET, BUCKET_NAME);
assertResourceElementValue(resource, RANGER_TYPE_OZONE_KEY, KEY_NAME);
}
@Test
public void testKeyEntityOFS() throws Exception {
RangerAtlasEntity entity = getEntity(ENTITY_TYPE_OZONE_KEY, KEY_QUALIFIED_NAME_OFS);
RangerServiceResource resource = resourceMapper.buildResource(entity);

Assert.assertEquals(SERVICE_NAME, resource.getServiceName());
assertResourceElementCount(resource, 3);
assertResourceElementValue(resource, RANGER_TYPE_OZONE_VOLUME, VOLUME_NAME);
assertResourceElementValue(resource, RANGER_TYPE_OZONE_BUCKET, BUCKET_NAME);
assertResourceElementValue(resource, RANGER_TYPE_OZONE_KEY, KEY_NAME);
}
@Test
public void testKey2Entity() throws Exception {
RangerAtlasEntity entity = getEntity(ENTITY_TYPE_OZONE_KEY, KEY_PATH_QUALIFIED_NAME);
Expand All @@ -89,7 +129,101 @@ public void testKey2Entity() throws Exception {
assertResourceElementValue(resource, RANGER_TYPE_OZONE_BUCKET, BUCKET_NAME);
assertResourceElementValue(resource, RANGER_TYPE_OZONE_KEY, KEY_PATH);
}

@Test
public void testKey2EntityOFS() throws Exception {
RangerAtlasEntity entity = getEntity(ENTITY_TYPE_OZONE_KEY, KEY_PATH_QUALIFIED_NAME_OFS);
RangerServiceResource resource = resourceMapper.buildResource(entity);
Assert.assertEquals(SERVICE_NAME, resource.getServiceName());
assertResourceElementCount(resource, 3);
assertResourceElementValue(resource, RANGER_TYPE_OZONE_VOLUME, VOLUME_NAME);
assertResourceElementValue(resource, RANGER_TYPE_OZONE_BUCKET, BUCKET_NAME);
assertResourceElementValue(resource, RANGER_TYPE_OZONE_KEY, KEY_PATH);
}
@Test
public void testKeyEntityOFSLegacyDotDelimiter() throws Exception {
AtlasOzoneResourceMapper legacyResourceMapper = new AtlasOzoneResourceMapper();
Properties legacyProperties = new Properties();
legacyProperties.setProperty(PROP_LEGACY_PARSING, "true");
legacyResourceMapper.initialize(legacyProperties);
String qualifiedName = "ofs://mybucket.myvolume.ozone1/mykey.txt@cl1";
RangerAtlasEntity entity = getEntity(ENTITY_TYPE_OZONE_KEY, qualifiedName);
RangerServiceResource resource = legacyResourceMapper.buildResource(entity);
Assert.assertEquals(SERVICE_NAME, resource.getServiceName());
assertResourceElementCount(resource, 3);
assertResourceElementValue(resource, RANGER_TYPE_OZONE_VOLUME, VOLUME_NAME);
assertResourceElementValue(resource, RANGER_TYPE_OZONE_BUCKET, BUCKET_NAME);
assertResourceElementValue(resource, RANGER_TYPE_OZONE_KEY, KEY_NAME);
}
@Test
public void testInvalidKeyEntityOFSLegacyDotDelimiter() throws Exception {
AtlasOzoneResourceMapper legacyResourceMapper = new AtlasOzoneResourceMapper();
Properties legacyProperties = new Properties();
legacyProperties.setProperty(PROP_LEGACY_PARSING, "true");
legacyResourceMapper.initialize(legacyProperties);
RangerAtlasEntity entity = getEntity(ENTITY_TYPE_OZONE_KEY, KEY_PATH_QUALIFIED_NAME_OFS);
try {
RangerServiceResource resource = legacyResourceMapper.buildResource(entity);
Assert.assertFalse("Expected buildResource() to fail. But it returned " + resource+". "
+ "'/' not supported as delimiter when legacy flag is enabled", true);
} catch (Exception excp) {
System.out.println("Exception was as expected: "+ KEY_PATH_QUALIFIED_NAME_OFS+
" cannot be parsed when property"+ PROP_LEGACY_PARSING +" is true");
}
}
@Test
public void testVolumeEntityWithDotOFS() throws Exception {
String qualifiedName = "ofs://myvolume.volpostfix@cl1";
String expectedVolumeName = "myvolume.volpostfix";
RangerAtlasEntity entity = getEntity(ENTITY_TYPE_OZONE_VOLUME, qualifiedName);
RangerServiceResource resource = resourceMapper.buildResource(entity);
Assert.assertEquals(SERVICE_NAME, resource.getServiceName());
assertResourceElementCount(resource, 1);
assertResourceElementValue(resource, RANGER_TYPE_OZONE_VOLUME, expectedVolumeName);
}
@Test
public void testBucketEntityWithDotOFS() throws Exception {
String qualifiedName = "ofs://myvolume.bucketprefix.mybucket.bucketpostfix@cl1";
String expectedVolumeName = "myvolume";
String expectedBucketName = "bucketprefix.mybucket.bucketpostfix";
RangerAtlasEntity entity = getEntity(ENTITY_TYPE_OZONE_BUCKET, qualifiedName);
RangerServiceResource resource = resourceMapper.buildResource(entity);
Assert.assertEquals(SERVICE_NAME, resource.getServiceName());
assertResourceElementCount(resource, 2);
assertResourceElementValue(resource, RANGER_TYPE_OZONE_VOLUME, expectedVolumeName);
assertResourceElementValue(resource, RANGER_TYPE_OZONE_BUCKET, expectedBucketName);
}
@Test
public void testKeyEntityWithDotOFS() throws Exception {
String qualifiedName = "ofs://ozone1/myvolume.volumepostfix/mybucket.bucketpostfix/keypath/keyprefix.mykey.txt@cl1";
String expectedVolumeName = "myvolume.volumepostfix";
String expectedBucketName = "mybucket.bucketpostfix";
String expectedKeyName = "keypath/keyprefix.mykey.txt";
RangerAtlasEntity entity = getEntity(ENTITY_TYPE_OZONE_KEY, qualifiedName);
RangerServiceResource resource = resourceMapper.buildResource(entity);
Assert.assertEquals(SERVICE_NAME, resource.getServiceName());
assertResourceElementCount(resource, 3);
assertResourceElementValue(resource, RANGER_TYPE_OZONE_VOLUME, expectedVolumeName);
assertResourceElementValue(resource, RANGER_TYPE_OZONE_BUCKET, expectedBucketName);
assertResourceElementValue(resource, RANGER_TYPE_OZONE_KEY, expectedKeyName);
}
@Test
public void testBucketEntityWithSlashOFS() throws Exception {
//future work : scenario when atlas fixes the bucket delimiter from "." to "/";
AtlasOzoneResourceMapper afterDelimiterFixResourceMapper = new AtlasOzoneResourceMapper();
Properties properties = new Properties();
properties.setProperty(PROP_OFS_BUCKET_DELIMITER, "/");
afterDelimiterFixResourceMapper.initialize(properties);
//both volume and bucket name could have a "." in it after this fix
String qualifiedName = "ofs://myvolume.volumepostfix/mybucket.bucketpostfix@cl1";
String expectedVolumeName = "myvolume.volumepostfix";
String expectedBucketName = "mybucket.bucketpostfix";
RangerAtlasEntity entity = getEntity(ENTITY_TYPE_OZONE_BUCKET, qualifiedName);
RangerServiceResource resource = afterDelimiterFixResourceMapper.buildResource(entity);
Assert.assertEquals(SERVICE_NAME, resource.getServiceName());
assertResourceElementCount(resource, 2);
assertResourceElementValue(resource, RANGER_TYPE_OZONE_VOLUME, expectedVolumeName);
assertResourceElementValue(resource, RANGER_TYPE_OZONE_BUCKET, expectedBucketName);
}
@Test
public void testInvalidEntityType() {
assertException(getEntity("Unknown", KEY_PATH_QUALIFIED_NAME), "unrecognized entity-type");
Expand Down