Skip to content

Commit

Permalink
Allow passing custom record reader to be inited/closed in SegmentProc…
Browse files Browse the repository at this point in the history
…essorFramework (#12529)
  • Loading branch information
swaminathanmanish authored Mar 2, 2024
1 parent 60d23ca commit 9a8fa79
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.apache.pinot.spi.data.readers.FileFormat;
import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.data.readers.RecordReader;
import org.apache.pinot.spi.data.readers.RecordReaderFactory;
import org.apache.pinot.spi.data.readers.RecordReaderFileConfig;
import org.apache.pinot.spi.utils.ReadMode;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
Expand Down Expand Up @@ -194,9 +195,11 @@ public void testRecordReaderFileConfigInit() throws Exception {
FileUtils.forceMkdir(workingDir);
ClassLoader classLoader = getClass().getClassLoader();
URL resource = classLoader.getResource("data/dimBaseballTeams.csv");
RecordReaderFileConfig reader = new RecordReaderFileConfig(FileFormat.CSV,
new File(resource.toURI()),
RecordReader recordReader = RecordReaderFactory.getRecordReader(FileFormat.CSV, new File(resource.toURI()),
null, null);
RecordReaderFileConfig recordReaderFileConfig = new RecordReaderFileConfig(FileFormat.CSV,
new File(resource.toURI()),
null, null, recordReader);
TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName("myTable").
setTimeColumnName("time").build();

Expand All @@ -208,13 +211,15 @@ public void testRecordReaderFileConfigInit() throws Exception {

SegmentProcessorConfig config =
new SegmentProcessorConfig.Builder().setTableConfig(tableConfig).setSchema(schema).build();
SegmentProcessorFramework framework = new SegmentProcessorFramework(config, workingDir, ImmutableList.of(reader),
Collections.emptyList(), null);
SegmentProcessorFramework framework = new SegmentProcessorFramework(config, workingDir,
ImmutableList.of(recordReaderFileConfig), Collections.emptyList(), null);
List<File> outputSegments = framework.process();
assertEquals(outputSegments.size(), 1);
ImmutableSegment segment = ImmutableSegmentLoader.load(outputSegments.get(0), ReadMode.mmap);
SegmentMetadata segmentMetadata = segment.getSegmentMetadata();
assertEquals(segmentMetadata.getTotalDocs(), 52);
// Verify reader is closed
assertEquals(recordReaderFileConfig.isRecordReaderClosedFromRecordReaderFileConfig(), true);
}

@Test
Expand Down Expand Up @@ -686,7 +691,7 @@ public void testConfigurableMapperOutputSize()
ClassLoader classLoader = getClass().getClassLoader();
URL resource = classLoader.getResource("data/dimBaseballTeams.csv");
RecordReaderFileConfig recordReaderFileConfig =
new RecordReaderFileConfig(FileFormat.CSV, new File(resource.toURI()), null, null);
new RecordReaderFileConfig(FileFormat.CSV, new File(resource.toURI()), null, null, null);
TableConfig tableConfig =
new TableConfigBuilder(TableType.OFFLINE).setTableName("myTable").setTimeColumnName("time").build();
Schema schema =
Expand Down Expand Up @@ -738,7 +743,7 @@ public void testConfigurableMapperOutputSize()
// output size threshold configured).

expectedTotalDocsCount = 52;
recordReaderFileConfig = new RecordReaderFileConfig(FileFormat.CSV, new File(resource.toURI()), null, null);
recordReaderFileConfig = new RecordReaderFileConfig(FileFormat.CSV, new File(resource.toURI()), null, null, null);

segmentConfig = new SegmentConfig.Builder().setIntermediateFileSizeThreshold(19).setSegmentNamePrefix("testPrefix")
.setSegmentNamePostfix("testPostfix").build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,8 @@


/**
* Wraps RecordReader info to instantiate a reader. Users can either pass in the
* RecordReader instance directly or the info required to initialize the RecordReader, so that the
* RecordReader can be initialized just when its about to be used, which avoids early/eager
* initialization/memory allocation.
* Placeholder for all RecordReader configs. Manages the lifecycle of a RecordReader by initing/closing within the
* Segment creation framework.
*/
public class RecordReaderFileConfig {
public final FileFormat _fileFormat;
Expand All @@ -44,20 +42,22 @@ public class RecordReaderFileConfig {

// Pass in the info needed to initialize the reader
public RecordReaderFileConfig(FileFormat fileFormat, File dataFile, Set<String> fieldsToRead,
@Nullable RecordReaderConfig recordReaderConfig) {
@Nullable RecordReaderConfig recordReaderConfig, @Nullable RecordReader recordReader) {
_fileFormat = fileFormat;
_dataFile = dataFile;
_fieldsToRead = fieldsToRead;
_recordReaderConfig = recordReaderConfig;
_recordReader = null;
// This is not a delegate RecordReader i.e. RecordReaderFileConfig owns the RecordReader, so it should be closed
// by RecordReaderFileConfig as well.
// Users can pass in custom readers
_recordReader = recordReader;
// RecordReaderFileConfig owns the lifecycle of RecordReader, to be inited and closed.
_isDelegateReader = false;
_isRecordReaderInitialized = false;
_isRecordReaderClosed = false;
}

// Pass in the reader instance directly
// Keeping this for backwards compatibility. We want the lifecycle of the reader to be managed internally
// (inited/closed) by SegmentProcessorFramework.
@Deprecated
public RecordReaderFileConfig(RecordReader recordReader) {
_recordReader = recordReader;
_fileFormat = null;
Expand All @@ -76,7 +76,12 @@ public RecordReaderFileConfig(RecordReader recordReader) {
public RecordReader getRecordReader()
throws Exception {
if (!_isRecordReaderInitialized) {
_recordReader = RecordReaderFactory.getRecordReader(_fileFormat, _dataFile, _fieldsToRead, _recordReaderConfig);
if (_recordReader == null) {
// Record reader instance to be created and inited
_recordReader = RecordReaderFactory.getRecordReader(_fileFormat, _dataFile, _fieldsToRead, _recordReaderConfig);
} else {
_recordReader.init(_dataFile, _fieldsToRead, _recordReaderConfig);
}
_isRecordReaderInitialized = true;
}
return _recordReader;
Expand Down

0 comments on commit 9a8fa79

Please sign in to comment.