From 2feb26a3bcb6a20a3c1c806217be4911944a4cb9 Mon Sep 17 00:00:00 2001 From: Matt Casters Date: Wed, 12 Feb 2020 17:16:59 +0100 Subject: [PATCH] issues #56, #52, #53 --- .../core/partition/SinglePartitionFn.java | 13 + .../core/transform/StepBatchTransform.java | 711 ++++++++++++++++++ .../beam/core/transform/StepTransform.java | 227 ++---- .../messages/messages_en_US.properties | 2 +- .../kettle/beam/perspective/BeamHelper.java | 30 + .../beam/perspective/BeamSpoonPlugin.java | 6 +- .../perspective/TransDrawExtensionPoint.java | 46 ++ .../handler/BeamGenericStepHandler.java | 66 +- .../beam/steps/kafka/BeamConsumeMeta.java | 2 +- .../java/org/kettle/beam/util/BeamConst.java | 4 + .../resources/beam_transgraph_overlays.xul | 26 + 11 files changed, 947 insertions(+), 186 deletions(-) create mode 100644 src/main/java/org/kettle/beam/core/partition/SinglePartitionFn.java create mode 100644 src/main/java/org/kettle/beam/core/transform/StepBatchTransform.java create mode 100644 src/main/java/org/kettle/beam/perspective/TransDrawExtensionPoint.java create mode 100755 src/main/resources/beam_transgraph_overlays.xul diff --git a/src/main/java/org/kettle/beam/core/partition/SinglePartitionFn.java b/src/main/java/org/kettle/beam/core/partition/SinglePartitionFn.java new file mode 100644 index 0000000..127f99d --- /dev/null +++ b/src/main/java/org/kettle/beam/core/partition/SinglePartitionFn.java @@ -0,0 +1,13 @@ +package org.kettle.beam.core.partition; + +import org.apache.beam.sdk.transforms.Partition; +import org.kettle.beam.core.KettleRow; + +public class SinglePartitionFn implements Partition.PartitionFn { + + private static final long serialVersionUID = 95100000000000001L; + + @Override public int partitionFor( KettleRow elem, int numPartitions ) { + return 0; + } +} diff --git a/src/main/java/org/kettle/beam/core/transform/StepBatchTransform.java b/src/main/java/org/kettle/beam/core/transform/StepBatchTransform.java new file mode 100644 index 0000000..3c01c38 --- /dev/null +++ b/src/main/java/org/kettle/beam/core/transform/StepBatchTransform.java @@ -0,0 +1,711 @@ +package org.kettle.beam.core.transform; + +import org.apache.beam.sdk.metrics.Counter; +import org.apache.beam.sdk.metrics.Metrics; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.TupleTagList; +import org.apache.commons.lang.StringUtils; +import org.joda.time.Instant; +import org.kettle.beam.core.BeamKettle; +import org.kettle.beam.core.KettleRow; +import org.kettle.beam.core.metastore.SerializableMetaStore; +import org.kettle.beam.core.shared.VariableValue; +import org.kettle.beam.core.util.JsonRowMeta; +import org.kettle.beam.core.util.KettleBeamUtil; +import org.pentaho.di.core.exception.KettleException; +import org.pentaho.di.core.exception.KettleStepException; +import org.pentaho.di.core.logging.LogLevel; +import org.pentaho.di.core.plugins.PluginRegistry; +import org.pentaho.di.core.plugins.StepPluginType; +import org.pentaho.di.core.row.RowMetaInterface; +import org.pentaho.di.core.row.ValueMetaInterface; +import org.pentaho.di.trans.RowProducer; +import org.pentaho.di.trans.SingleThreadedTransExecutor; +import org.pentaho.di.trans.Trans; +import org.pentaho.di.trans.TransHopMeta; +import org.pentaho.di.trans.TransMeta; +import org.pentaho.di.trans.step.RowAdapter; +import org.pentaho.di.trans.step.RowListener; +import org.pentaho.di.trans.step.StepMeta; +import org.pentaho.di.trans.step.StepMetaDataCombi; +import org.pentaho.di.trans.step.StepMetaInterface; +import org.pentaho.di.trans.steps.dummytrans.DummyTransMeta; +import org.pentaho.di.trans.steps.injector.InjectorMeta; +import org.pentaho.metastore.api.IMetaStore; + +import java.util.ArrayList; +import java.util.List; +import java.util.Queue; +import java.util.Timer; +import java.util.TimerTask; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicLong; + +public class StepBatchTransform extends StepTransform { + + public StepBatchTransform() { + super(); + } + + public StepBatchTransform( List variableValues, String metastoreJson, List stepPluginClasses, List xpPluginClasses, + int batchSize, int flushIntervalMs, String stepname, String stepPluginId, String stepMetaInterfaceXml, String inputRowMetaJson, boolean inputStep, + List targetSteps, List infoSteps, List infoRowMetaJsons, List>> infoCollectionViews ) { + super(variableValues, metastoreJson, stepPluginClasses, xpPluginClasses, batchSize, flushIntervalMs, stepname, stepPluginId, + stepMetaInterfaceXml, inputRowMetaJson, inputStep, targetSteps, infoSteps, infoRowMetaJsons, infoCollectionViews); + } + + @Override public PCollectionTuple expand( PCollection input ) { + try { + // Only initialize once on this node/vm + // + BeamKettle.init( stepPluginClasses, xpPluginClasses ); + + // Similar for the output : treate a TupleTag list for the target steps... + // + TupleTag mainOutputTupleTag = new TupleTag( KettleBeamUtil.createMainOutputTupleId( stepname ) ) { + }; + List> targetTupleTags = new ArrayList<>(); + TupleTagList targetTupleTagList = null; + for ( String targetStep : targetSteps ) { + String tupleId = KettleBeamUtil.createTargetTupleId( stepname, targetStep ); + TupleTag tupleTag = new TupleTag( tupleId ) { + }; + targetTupleTags.add( tupleTag ); + if ( targetTupleTagList == null ) { + targetTupleTagList = TupleTagList.of( tupleTag ); + } else { + targetTupleTagList = targetTupleTagList.and( tupleTag ); + } + } + if ( targetTupleTagList == null ) { + targetTupleTagList = TupleTagList.empty(); + } + + // Create a new step function, initializes the step + // + StepBatchFn stepBatchFn = new StepBatchFn( variableValues, metastoreJson, stepPluginClasses, xpPluginClasses, + stepname, stepPluginId, stepMetaInterfaceXml, inputRowMetaJson, inputStep, + targetSteps, infoSteps, infoRowMetaJsons ); + + // The actual step functionality + // + ParDo.SingleOutput parDoStepFn = ParDo.of( stepBatchFn ); + + // Add optional side inputs... + // + if ( infoCollectionViews.size() > 0 ) { + parDoStepFn = parDoStepFn.withSideInputs( infoCollectionViews ); + } + + // Specify the main output and targeted outputs + // + ParDo.MultiOutput multiOutput = parDoStepFn.withOutputTags( mainOutputTupleTag, targetTupleTagList ); + + // Apply the multi output parallel do step function to the main input stream + // + PCollectionTuple collectionTuple = input.apply( multiOutput ); + + // In the tuple is everything we need to find. + // Just make sure to retrieve the PCollections using the correct Tuple ID + // Use KettleBeamUtil.createTargetTupleId()... to make sure + // + return collectionTuple; + } catch ( Exception e ) { + numErrors.inc(); + LOG.error( "Error transforming data in step '" + stepname + "'", e ); + throw new RuntimeException( "Error transforming data in step", e ); + } + + } + + private class StepBatchFn extends DoFn { + + private static final long serialVersionUID = 95700000000000002L; + + public static final String INJECTOR_STEP_NAME = "_INJECTOR_"; + + protected List variableValues; + protected String metastoreJson; + protected List stepPluginClasses; + protected List xpPluginClasses; + protected String stepname; + protected String stepPluginId; + protected String stepMetaInterfaceXml; + protected String inputRowMetaJson; + protected List targetSteps; + protected List infoSteps; + protected List infoRowMetaJsons; + protected boolean inputStep; + protected boolean initialize; + + protected List> infoCollections; + + // Log and count parse errors. + private final Counter numErrors = Metrics.counter( "main", "StepProcessErrors" ); + + private transient TransMeta transMeta; + private transient StepMeta stepMeta; + private transient RowMetaInterface inputRowMeta; + private transient RowMetaInterface outputRowMeta; + private transient List stepCombis; + private transient Trans trans; + private transient RowProducer rowProducer; + private transient RowListener rowListener; + private transient List resultRows; + private transient List> targetResultRowsList; + private transient List targetRowMetas; + private transient List infoRowMetas; + private transient List infoRowProducers; + + private transient TupleTag mainTupleTag; + private transient List> tupleTagList; + + private transient Counter initCounter; + private transient Counter readCounter; + private transient Counter writtenCounter; + private transient Counter flushBufferCounter; + + private transient SingleThreadedTransExecutor executor; + + private transient Queue rowBuffer; + private transient BoundedWindow batchWindow; + + private transient AtomicLong lastTimerCheck; + private transient Timer timer; + + public StepBatchFn() { + } + + + // I created a private class because instances of this one need access to infoCollectionViews + // + + public StepBatchFn( List variableValues, String metastoreJson, List stepPluginClasses, List xpPluginClasses, String stepname, String stepPluginId, + String stepMetaInterfaceXml, String inputRowMetaJson, boolean inputStep, + List targetSteps, List infoSteps, List infoRowMetaJsons ) { + this(); + this.variableValues = variableValues; + this.metastoreJson = metastoreJson; + this.stepPluginClasses = stepPluginClasses; + this.xpPluginClasses = xpPluginClasses; + this.stepname = stepname; + this.stepPluginId = stepPluginId; + this.stepMetaInterfaceXml = stepMetaInterfaceXml; + this.inputRowMetaJson = inputRowMetaJson; + this.inputStep = inputStep; + this.targetSteps = targetSteps; + this.infoSteps = infoSteps; + this.infoRowMetaJsons = infoRowMetaJsons; + this.initialize = true; + } + + /** + * Reset the row buffer every time we start a new bundle to prevent the output of double rows + * + * @param startBundleContext + */ + @StartBundle + public void startBundle( StartBundleContext startBundleContext ) { + Metrics.counter( "startBundle", stepname ).inc(); + if ( "ScriptValueMod".equals( stepPluginId ) && trans != null ) { + initialize = true; + } + } + + @Setup + public void setup() { + try { + rowBuffer = new ConcurrentLinkedQueue(); + } catch ( Exception e ) { + numErrors.inc(); + LOG.info( "Step '" + stepname + "' : setup error :" + e.getMessage() ); + throw new RuntimeException( "Unable to set up step " + stepname, e ); + } + } + + @Teardown + public void tearDown() { + if ( timer != null ) { + timer.cancel(); + } + } + + @ProcessElement + public void processElement( ProcessContext context, BoundedWindow window ) { + + try { + + if ( initialize ) { + initialize = false; + + // Initialize Kettle and load extra plugins as well + // + BeamKettle.init( stepPluginClasses, xpPluginClasses ); + + // The content of the metastore is JSON serialized and inflated below. + // + IMetaStore metaStore = new SerializableMetaStore( metastoreJson ); + + // Create a very simple new transformation to run single threaded... + // Single threaded... + // + transMeta = new TransMeta(); + transMeta.setMetaStore( metaStore ); + + // When the first row ends up in the buffer we start the timer. + // If the rows are flushed out we reset back to -1; + // + lastTimerCheck = new AtomicLong( -1L ); + + // Give steps variables from above + // + for ( VariableValue variableValue : variableValues ) { + if ( StringUtils.isNotEmpty( variableValue.getVariable() ) ) { + transMeta.setVariable( variableValue.getVariable(), variableValue.getValue() ); + } + } + + // Input row metadata... + // + inputRowMeta = JsonRowMeta.fromJson( inputRowMetaJson ); + infoRowMetas = new ArrayList<>(); + for ( String infoRowMetaJson : infoRowMetaJsons ) { + RowMetaInterface infoRowMeta = JsonRowMeta.fromJson( infoRowMetaJson ); + infoRowMetas.add( infoRowMeta ); + } + + // Create an Injector step with the right row layout... + // This will help all steps see the row layout statically... + // + StepMeta mainInjectorStepMeta = null; + if ( !inputStep ) { + mainInjectorStepMeta = createInjectorStep( transMeta, INJECTOR_STEP_NAME, inputRowMeta, 200, 200 ); + } + + // Our main step writes to a bunch of targets + // Add a dummy step for each one so the step can target them + // + int targetLocationY = 200; + List targetStepMetas = new ArrayList<>(); + for ( String targetStep : targetSteps ) { + DummyTransMeta dummyMeta = new DummyTransMeta(); + StepMeta targetStepMeta = new StepMeta( targetStep, dummyMeta ); + targetStepMeta.setLocation( 600, targetLocationY ); + targetStepMeta.setDraw( true ); + targetLocationY += 150; + + targetStepMetas.add( targetStepMeta ); + transMeta.addStep( targetStepMeta ); + } + + // The step might read information from info steps + // Steps like "Stream Lookup" or "Validator" + // They read all the data on input from a side input + // + List> infoDataSets = new ArrayList<>(); + List infoStepMetas = new ArrayList<>(); + for ( int i = 0; i < infoSteps.size(); i++ ) { + String infoStep = infoSteps.get( i ); + PCollectionView> cv = infoCollectionViews.get( i ); + + // Get the data from the side input, from the info step(s) + // + List infoDataSet = context.sideInput( cv ); + infoDataSets.add( infoDataSet ); + + RowMetaInterface infoRowMeta = infoRowMetas.get( i ); + + // Add an Injector step for every info step so the step can read from it + // + StepMeta infoStepMeta = createInjectorStep( transMeta, infoStep, infoRowMeta, 200, 350 + 150 * i ); + infoStepMetas.add( infoStepMeta ); + } + + stepCombis = new ArrayList<>(); + + // The main step inflated from XML metadata... + // + PluginRegistry registry = PluginRegistry.getInstance(); + StepMetaInterface stepMetaInterface = registry.loadClass( StepPluginType.class, stepPluginId, StepMetaInterface.class ); + if ( stepMetaInterface == null ) { + throw new KettleException( "Unable to load step plugin with ID " + stepPluginId + ", this plugin isn't in the plugin registry or classpath" ); + } + + KettleBeamUtil.loadStepMetadataFromXml( stepname, stepMetaInterface, stepMetaInterfaceXml, transMeta.getMetaStore() ); + + stepMeta = new StepMeta( stepname, stepMetaInterface ); + stepMeta.setStepID( stepPluginId ); + stepMeta.setLocation( 400, 200 ); + stepMeta.setDraw( true ); + transMeta.addStep( stepMeta ); + if ( !inputStep ) { + transMeta.addTransHop( new TransHopMeta( mainInjectorStepMeta, stepMeta ) ); + } + // The target hops as well + // + for ( StepMeta targetStepMeta : targetStepMetas ) { + transMeta.addTransHop( new TransHopMeta( stepMeta, targetStepMeta ) ); + } + + // And the info hops... + // + for ( StepMeta infoStepMeta : infoStepMetas ) { + transMeta.addTransHop( new TransHopMeta( infoStepMeta, stepMeta ) ); + } + + stepMetaInterface.searchInfoAndTargetSteps( transMeta.getSteps() ); + + // This one is single threaded folks + // + transMeta.setTransformationType( TransMeta.TransformationType.SingleThreaded ); + + // Create the transformation... + // + trans = new Trans( transMeta ); + trans.setLogLevel( LogLevel.ERROR ); + trans.setMetaStore( transMeta.getMetaStore() ); + trans.prepareExecution( null ); + + // Create producers so we can efficiently pass data + // + rowProducer = null; + if ( !inputStep ) { + rowProducer = trans.addRowProducer( INJECTOR_STEP_NAME, 0 ); + } + infoRowProducers = new ArrayList<>(); + for ( String infoStep : infoSteps ) { + RowProducer infoRowProducer = trans.addRowProducer( infoStep, 0 ); + infoRowProducers.add( infoRowProducer ); + } + + // Find the right interfaces for execution later... + // + if ( !inputStep ) { + StepMetaDataCombi injectorCombi = findCombi( trans, INJECTOR_STEP_NAME ); + stepCombis.add( injectorCombi ); + } + + StepMetaDataCombi stepCombi = findCombi( trans, stepname ); + stepCombis.add( stepCombi ); + outputRowMeta = transMeta.getStepFields( stepname ); + + if ( targetSteps.isEmpty() ) { + rowListener = new RowAdapter() { + @Override public void rowWrittenEvent( RowMetaInterface rowMeta, Object[] row ) throws KettleStepException { + resultRows.add( row ); + } + }; + stepCombi.step.addRowListener( rowListener ); + } + + // Create a list of TupleTag to direct the target rows + // + mainTupleTag = new TupleTag( KettleBeamUtil.createMainOutputTupleId( stepname ) ) { + }; + tupleTagList = new ArrayList<>(); + + // The lists in here will contain all the rows that ended up in the various target steps (if any) + // + targetRowMetas = new ArrayList<>(); + targetResultRowsList = new ArrayList<>(); + + for ( String targetStep : targetSteps ) { + StepMetaDataCombi targetCombi = findCombi( trans, targetStep ); + stepCombis.add( targetCombi ); + targetRowMetas.add( transMeta.getStepFields( stepCombi.stepname ) ); + + String tupleId = KettleBeamUtil.createTargetTupleId( stepname, targetStep ); + TupleTag tupleTag = new TupleTag( tupleId ) { + }; + tupleTagList.add( tupleTag ); + final List targetResultRows = new ArrayList<>(); + targetResultRowsList.add( targetResultRows ); + + targetCombi.step.addRowListener( new RowAdapter() { + @Override public void rowReadEvent( RowMetaInterface rowMeta, Object[] row ) throws KettleStepException { + // We send the target row to a specific list... + // + targetResultRows.add( row ); + } + } ); + } + + executor = new SingleThreadedTransExecutor( trans ); + + // Initialize the steps... + // + executor.init(); + + initCounter = Metrics.counter( "init", stepname ); + readCounter = Metrics.counter( "read", stepname ); + writtenCounter = Metrics.counter( "written", stepname ); + flushBufferCounter = Metrics.counter( "flushBuffer", stepname ); + + initCounter.inc(); + + // Doesn't really start the threads in single threaded mode + // Just sets some flags all over the place + // + trans.startThreads(); + + resultRows = new ArrayList<>(); + + // Copy the info data sets to the info steps... + // We do this only once so all subsequent rows can use this. + // + for ( int i = 0; i < infoSteps.size(); i++ ) { + RowProducer infoRowProducer = infoRowProducers.get( i ); + List infoDataSet = infoDataSets.get( i ); + StepMetaDataCombi combi = findCombi( trans, infoSteps.get( i ) ); + RowMetaInterface infoRowMeta = infoRowMetas.get( i ); + + // Pass and process the rows in the info steps + // + for ( KettleRow infoRowData : infoDataSet ) { + infoRowProducer.putRow( infoRowMeta, infoRowData.getRow() ); + combi.step.processRow( combi.meta, combi.data ); + } + + // By calling finished() steps like Stream Lookup know no more rows are going to come + // and they can start to work with the info data set + // + infoRowProducer.finished(); + + // Call once more to flag input as done, step as finished. + // + combi.step.processRow( combi.meta, combi.data ); + } + + // Install a timer to check every second if the buffer is stale and needs to be flushed... + // + if ( flushIntervalMs > 0 ) { + TimerTask timerTask = new TimerTask() { + @Override public void run() { + // Check on the state of the buffer, flush if needed... + // + synchronized ( rowBuffer ) { + long difference = System.currentTimeMillis() - lastTimerCheck.get(); + if ( lastTimerCheck.get()<=0 || difference > flushIntervalMs ) { + try { + emptyRowBuffer( new StepProcessContext( context ) ); + } catch ( Exception e ) { + throw new RuntimeException( "Unable to flush row buffer when it got stale after " + difference + " ms", e ); + } + lastTimerCheck.set( System.currentTimeMillis() ); + } + } + } + }; + timer = new Timer( "Flush timer of step " + stepname ); + timer.schedule( timerTask, 100, 100 ); + } + } + + // Get one row from the context main input and make a copy so we can change it. + // + KettleRow originalInputRow = context.element(); + KettleRow inputRow = KettleBeamUtil.copyKettleRow( originalInputRow, inputRowMeta ); + readCounter.inc(); + + // Take care of the age of the buffer... + // + if ( flushIntervalMs > 0 && rowBuffer.isEmpty() ) { + lastTimerCheck.set( System.currentTimeMillis() ); + } + + // Add the row to the buffer. + // + synchronized ( rowBuffer ) { + rowBuffer.add( inputRow ); + batchWindow = window; + + synchronized ( rowBuffer ) { + if ( rowBuffer.size() >= batchSize ) { + emptyRowBuffer( new StepProcessContext( context ) ); + } + } + } + } catch ( Exception e ) { + numErrors.inc(); + LOG.info( "Step execution error :" + e.getMessage() ); + throw new RuntimeException( "Error executing StepBatchFn", e ); + } + } + + @FinishBundle + public void finishBundle( FinishBundleContext context ) { + try { + synchronized ( rowBuffer ) { + if ( !rowBuffer.isEmpty() ) { + // System.out.println( "Finishing bundle with " + rowBuffer.size() + " rows in the buffer" ); + emptyRowBuffer( new StepFinishBundleContext( context, batchWindow ) ); + } + } + } catch ( Exception e ) { + numErrors.inc(); + LOG.info( "Step finishing bundle error :" + e.getMessage() ); + throw new RuntimeException( "Error finalizing bundle of step '" + stepname + "'", e ); + } + } + + private transient int maxInputBufferSize = 0; + private transient int minInputBufferSize = Integer.MAX_VALUE; + + /** + * Attempt to empty the row buffer + * + * @param context + * @throws KettleException + */ + private synchronized void emptyRowBuffer( TupleOutputContext context ) throws KettleException { + synchronized ( rowBuffer ) { + + List buffer = new ArrayList<>(); + + // Copy the data to avoid race conditions + // + int size = rowBuffer.size(); + for ( int i = 0; i < size; i++ ) { + KettleRow kettleRow = rowBuffer.poll(); + buffer.add( kettleRow ); + } + + // Only do something if we have work to do + // + if ( buffer.isEmpty() ) { + return; + } + + if ( !rowBuffer.isEmpty() ) { + System.err.println( "Async action detected on rowBuffer" ); + } + + // Empty all the row buffers for another iteration + // + resultRows.clear(); + for ( int t = 0; t < targetSteps.size(); t++ ) { + targetResultRowsList.get( t ).clear(); + } + + // Pass the rows in the rowBuffer to the input RowSet + // + if ( !inputStep ) { + int bufferSize = buffer.size(); + if ( maxInputBufferSize < bufferSize ) { + Metrics.counter( "maxInputSize", stepname ).inc( bufferSize - maxInputBufferSize ); + maxInputBufferSize = bufferSize; + } + if ( minInputBufferSize > bufferSize ) { + if ( minInputBufferSize == Integer.MAX_VALUE ) { + Metrics.counter( "minInputSize", stepname ).inc( bufferSize ); + } else { + Metrics.counter( "minInputSize", stepname ).dec( bufferSize - minInputBufferSize ); + } + minInputBufferSize = bufferSize; + } + + for ( KettleRow inputRow : buffer ) { + rowProducer.putRow( inputRowMeta, inputRow.getRow() ); + } + } + + // Execute all steps in the transformation + // + executor.oneIteration(); + + // Evaluate the results... + // + + // Pass all rows in the output to the process context + // + for ( Object[] resultRow : resultRows ) { + + // Pass the row to the process context + // + context.output( mainTupleTag, new KettleRow( resultRow ) ); + writtenCounter.inc(); + } + + // Pass whatever ended up on the target nodes + // + for ( int t = 0; t < targetResultRowsList.size(); t++ ) { + List targetRowsList = targetResultRowsList.get( t ); + TupleTag tupleTag = tupleTagList.get( t ); + + for ( Object[] targetRow : targetRowsList ) { + context.output( tupleTag, new KettleRow( targetRow ) ); + } + } + + flushBufferCounter.inc(); + buffer.clear(); // gc + lastTimerCheck.set( System.currentTimeMillis() ); // No need to check sooner + } + } + + private StepMeta createInjectorStep( TransMeta transMeta, String injectorStepName, RowMetaInterface injectorRowMeta, int x, int y ) { + InjectorMeta injectorMeta = new InjectorMeta(); + injectorMeta.allocate( injectorRowMeta.size() ); + for ( int i = 0; i < injectorRowMeta.size(); i++ ) { + ValueMetaInterface valueMeta = injectorRowMeta.getValueMeta( i ); + injectorMeta.getFieldname()[ i ] = valueMeta.getName(); + injectorMeta.getType()[ i ] = valueMeta.getType(); + injectorMeta.getLength()[ i ] = valueMeta.getLength(); + injectorMeta.getPrecision()[ i ] = valueMeta.getPrecision(); + } + StepMeta injectorStepMeta = new StepMeta( injectorStepName, injectorMeta ); + injectorStepMeta.setLocation( x, y ); + injectorStepMeta.setDraw( true ); + transMeta.addStep( injectorStepMeta ); + + return injectorStepMeta; + } + + private StepMetaDataCombi findCombi( Trans trans, String stepname ) { + for ( StepMetaDataCombi combi : trans.getSteps() ) { + if ( combi.stepname.equals( stepname ) ) { + return combi; + } + } + throw new RuntimeException( "Configuration error, step '" + stepname + "' not found in transformation" ); + } + } + + private interface TupleOutputContext { + void output( TupleTag tupleTag, T output ); + } + + private class StepProcessContext implements TupleOutputContext { + + private DoFn.ProcessContext context; + + public StepProcessContext( DoFn.ProcessContext processContext ) { + this.context = processContext; + } + + @Override public void output( TupleTag tupleTag, KettleRow output ) { + context.output( tupleTag, output ); + } + } + + private class StepFinishBundleContext implements TupleOutputContext { + + private DoFn.FinishBundleContext context; + private BoundedWindow batchWindow; + + public StepFinishBundleContext( DoFn.FinishBundleContext context, BoundedWindow batchWindow ) { + this.context = context; + this.batchWindow = batchWindow; + } + + @Override public void output( TupleTag tupleTag, KettleRow output ) { + context.output( tupleTag, output, Instant.now(), batchWindow ); + } + } +} diff --git a/src/main/java/org/kettle/beam/core/transform/StepTransform.java b/src/main/java/org/kettle/beam/core/transform/StepTransform.java index 4f2631c..6d73edf 100644 --- a/src/main/java/org/kettle/beam/core/transform/StepTransform.java +++ b/src/main/java/org/kettle/beam/core/transform/StepTransform.java @@ -44,10 +44,6 @@ import java.util.ArrayList; import java.util.List; -import java.util.Timer; -import java.util.TimerTask; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; public class StepTransform extends PTransform, PCollectionTuple> { @@ -64,29 +60,29 @@ public class StepTransform extends PTransform, PCollectio protected List targetSteps; protected List infoSteps; protected List infoRowMetaJsons; - protected int flushIntervalSeconds; + protected int flushIntervalMs; // Used in the private StepFn class below // protected List>> infoCollectionViews; // Log and count errors. - private static final Logger LOG = LoggerFactory.getLogger( StepTransform.class ); - private static final Counter numErrors = Metrics.counter( "main", "StepErrors" ); + protected static final Logger LOG = LoggerFactory.getLogger( StepTransform.class ); + protected static final Counter numErrors = Metrics.counter( "main", "StepErrors" ); public StepTransform() { variableValues = new ArrayList<>(); } public StepTransform( List variableValues, String metastoreJson, List stepPluginClasses, List xpPluginClasses, - int batchSize, int flushIntervalSeconds, String stepname, String stepPluginId, String stepMetaInterfaceXml, String inputRowMetaJson, boolean inputStep, + int batchSize, int flushIntervalMs, String stepname, String stepPluginId, String stepMetaInterfaceXml, String inputRowMetaJson, boolean inputStep, List targetSteps, List infoSteps, List infoRowMetaJsons, List>> infoCollectionViews ) { this.variableValues = variableValues; this.metastoreJson = metastoreJson; this.stepPluginClasses = stepPluginClasses; this.xpPluginClasses = xpPluginClasses; this.batchSize = batchSize; - this.flushIntervalSeconds = flushIntervalSeconds; + this.flushIntervalMs = flushIntervalMs; this.stepname = stepname; this.stepPluginId = stepPluginId; this.stepMetaInterfaceXml = stepMetaInterfaceXml; @@ -164,6 +160,8 @@ public StepTransform( List variableValues, String metastoreJson, private class StepFn extends DoFn { + private static final long serialVersionUID = 95700000000000001L; + public static final String INJECTOR_STEP_NAME = "_INJECTOR_"; protected List variableValues; @@ -205,18 +203,9 @@ private class StepFn extends DoFn { private transient Counter initCounter; private transient Counter readCounter; private transient Counter writtenCounter; - private transient Counter startBundleCounter; - private transient Counter flushBufferCounter; private transient SingleThreadedTransExecutor executor; - private transient List rowBuffer; - private transient BoundedWindow batchWindow; - - private transient AtomicLong bufferStartTime; - private transient AtomicBoolean flushing; - private transient Timer timer; - public StepFn() { } @@ -250,28 +239,26 @@ public StepFn( List variableValues, String metastoreJson, List(); - if ( startBundleCounter == null ) { - startBundleCounter = Metrics.counter( "startBundle", stepname ); - } - startBundleCounter.inc(); + Metrics.counter( "startBundle", stepname ).inc(); if ( "ScriptValueMod".equals( stepPluginId ) && trans != null ) { initialize = true; } } + @Setup + public void setup() { + // Nothing + } + @Teardown public void tearDown() { - if ( timer != null ) { - timer.cancel(); - } + // Nothing } @ProcessElement public void processElement( ProcessContext context, BoundedWindow window ) { try { - if ( initialize ) { initialize = false; @@ -300,12 +287,10 @@ public void processElement( ProcessContext context, BoundedWindow window ) { // Input row metadata... // inputRowMeta = JsonRowMeta.fromJson( inputRowMetaJson ); - // System.out.println( "======== INPUT ROW META : "+inputRowMeta.toString() ); infoRowMetas = new ArrayList<>(); for ( String infoRowMetaJson : infoRowMetaJsons ) { RowMetaInterface infoRowMeta = JsonRowMeta.fromJson( infoRowMetaJson ); infoRowMetas.add( infoRowMeta ); - // System.out.println( "======== INFO ROW META : "+infoRowMeta.toString() ); } // Create an Injector step with the right row layout... @@ -365,7 +350,7 @@ public void processElement( ProcessContext context, BoundedWindow window ) { throw new KettleException( "Unable to load step plugin with ID " + stepPluginId + ", this plugin isn't in the plugin registry or classpath" ); } - KettleBeamUtil.loadStepMetadataFromXml( stepname, stepMetaInterface, stepMetaInterfaceXml, metaStore ); + KettleBeamUtil.loadStepMetadataFromXml( stepname, stepMetaInterface, stepMetaInterfaceXml, transMeta.getMetaStore() ); stepMeta = new StepMeta( stepname, stepMetaInterface ); stepMeta.setStepID( stepPluginId ); @@ -389,13 +374,6 @@ public void processElement( ProcessContext context, BoundedWindow window ) { stepMetaInterface.searchInfoAndTargetSteps( transMeta.getSteps() ); - - // For Matt debugging this spiderweb. - // - // FileOutputStream fos = new FileOutputStream( "/tmp/" + ( stepname.replace( "/", "-" ) ) + ".ktr" ); - // fos.write( transMeta.getXML().getBytes() ); - // fos.close(); - // This one is single threaded folks // transMeta.setTransformationType( TransMeta.TransformationType.SingleThreaded ); @@ -404,7 +382,7 @@ public void processElement( ProcessContext context, BoundedWindow window ) { // trans = new Trans( transMeta ); trans.setLogLevel( LogLevel.ERROR ); - trans.setMetaStore( metaStore ); + trans.setMetaStore( transMeta.getMetaStore() ); trans.prepareExecution( null ); // Create producers so we can efficiently pass data @@ -429,15 +407,10 @@ public void processElement( ProcessContext context, BoundedWindow window ) { StepMetaDataCombi stepCombi = findCombi( trans, stepname ); stepCombis.add( stepCombi ); outputRowMeta = transMeta.getStepFields( stepname ); - // System.out.println("======== OUTPUT ROW METADATA : "+outputRowMeta.toString()); if ( targetSteps.isEmpty() ) { rowListener = new RowAdapter() { @Override public void rowWrittenEvent( RowMetaInterface rowMeta, Object[] row ) throws KettleStepException { - - // if (infoSteps.size()>0 && resultRows.size()==0) { - // System.out.println( ">>>>>>>>> CAPTURE ROW: "+rowMeta.toString()); - // } resultRows.add( row ); } }; @@ -485,7 +458,6 @@ public void processElement( ProcessContext context, BoundedWindow window ) { initCounter = Metrics.counter( "init", stepname ); readCounter = Metrics.counter( "read", stepname ); writtenCounter = Metrics.counter( "written", stepname ); - flushBufferCounter = Metrics.counter( "flushBuffer", stepname ); initCounter.inc(); @@ -507,10 +479,7 @@ public void processElement( ProcessContext context, BoundedWindow window ) { // Pass and process the rows in the info steps // - // System.out.println("<<<<<<<<<<<<<<<<< Producing "+infoDataSet.size()+" info rows for step "+infoSteps.get(i)+" : "+infoRowMeta.toString()); for ( KettleRow infoRowData : infoDataSet ) { - - // System.out.println("<<<<< Row: "+infoRowMeta.getString( infoRowData.getRow() )); infoRowProducer.putRow( infoRowMeta, infoRowData.getRow() ); combi.step.processRow( combi.meta, combi.data ); } @@ -524,38 +493,6 @@ public void processElement( ProcessContext context, BoundedWindow window ) { // combi.step.processRow( combi.meta, combi.data ); } - - // Start with an empty buffer... - rowBuffer = new ArrayList<>(); - - // When the first row ends up in the buffer we start the timer. - // If the rows are flushed out we reset back to -1; - // - bufferStartTime = new AtomicLong( -1L ); - flushing = new AtomicBoolean( false ); - - // Install a timer to check every second if the buffer is stale and needs to be flushed... - // - if ( flushIntervalSeconds > 0 ) { - TimerTask timerTask = new TimerTask() { - @Override public void run() { - // Check on the state of the buffer, flush if needed... - // - if ( !flushing.get() && !rowBuffer.isEmpty() && bufferStartTime.get() > 0 ) { - long difference = System.currentTimeMillis() - bufferStartTime.get(); - if ( difference > flushIntervalSeconds * 1000 ) { - try { - emptyRowBuffer( new StepProcessContext( context ) ); - } catch ( Exception e ) { - throw new RuntimeException( "Unable to flush row buffer when it got stale after " + difference + " ms", e ); - } - } - } - } - }; - timer = new Timer( "Flush timer of step " + stepname ); - timer.schedule( timerTask, 1000, 1000 ); - } } // Get one row from the context main input and make a copy so we can change it. @@ -564,20 +501,7 @@ public void processElement( ProcessContext context, BoundedWindow window ) { KettleRow inputRow = KettleBeamUtil.copyKettleRow( originalInputRow, inputRowMeta ); readCounter.inc(); - // Take care of the age of the buffer... - // - if ( flushIntervalSeconds > 0 && rowBuffer.isEmpty() ) { - bufferStartTime.set( System.currentTimeMillis() ); - } - - // Add the row to the buffer. - // - rowBuffer.add( inputRow ); - batchWindow = window; - - if ( rowBuffer.size() >= batchSize ) { - emptyRowBuffer( new StepProcessContext( context ) ); - } + emptyRowBuffer( new StepProcessContext( context ), inputRow ); } catch ( Exception e ) { numErrors.inc(); LOG.info( "Step execution error :" + e.getMessage() ); @@ -588,100 +512,65 @@ public void processElement( ProcessContext context, BoundedWindow window ) { @FinishBundle public void finishBundle( FinishBundleContext context ) { try { - if ( rowBuffer.size() > 0 ) { - long start = System.currentTimeMillis(); - while (!emptyRowBuffer( new StepFinishBundleContext( context, batchWindow ) )) { - try { - Thread.sleep( 1 ); - } catch(InterruptedException e) { - // Ignore - } - long now = System.currentTimeMillis(); - double secondsPassed = (now-start)/1000; - if (secondsPassed>30) { - LOG.error( "Unable to empty row buffer while finishing bundle" ); - throw new RuntimeException( "Error emptying bundle in step "+stepname ); - } - } - } + // Nothing to do here } catch ( Exception e ) { numErrors.inc(); LOG.info( "Step finishing bundle error :" + e.getMessage() ); - throw new RuntimeException( "Error finishing bundle of StepFn", e ); + throw new RuntimeException( "Error finalizing bundle of step '" + stepname + "'", e ); } } private transient int maxInputBufferSize = 0; private transient int minInputBufferSize = Integer.MAX_VALUE; - private synchronized boolean emptyRowBuffer( TupleOutputContext context ) throws KettleException { - boolean emptied = false; - if ( !flushing.get() ) { - try { - flushing.set( true ); - - // Empty all the row buffers for another iteration - // - resultRows.clear(); - for ( int t = 0; t < targetSteps.size(); t++ ) { - targetResultRowsList.get( t ).clear(); - } - - // Pass the rows in the rowBuffer to the input RowSet - // - if ( !inputStep ) { - if ( maxInputBufferSize < rowBuffer.size() ) { - Metrics.counter( "maxInputSize", stepname ).inc( rowBuffer.size() - maxInputBufferSize ); - maxInputBufferSize = rowBuffer.size(); - } - if ( minInputBufferSize > rowBuffer.size() ) { - Metrics.counter( "minInputSize", stepname ).dec( minInputBufferSize - minInputBufferSize ); - minInputBufferSize = rowBuffer.size(); - } - - for ( KettleRow inputRow : rowBuffer ) { - rowProducer.putRow( inputRowMeta, inputRow.getRow() ); - } - } + /** + * Attempt to empty the row buffer + * + * @param context + * @param inputRow + * @throws KettleException + */ + private synchronized void emptyRowBuffer( TupleOutputContext context, KettleRow inputRow ) throws KettleException { + // Empty all the row buffers for another iteration + // + resultRows.clear(); + for ( int t = 0; t < targetSteps.size(); t++ ) { + targetResultRowsList.get( t ).clear(); + } - // Execute all steps in the transformation - // - executor.oneIteration(); + // Pass the rows in the rowBuffer to the input RowSet + // + if ( !inputStep ) { + rowProducer.putRow( inputRowMeta, inputRow.getRow() ); + } - // Evaluate the results... - // + // Execute all steps in the transformation + // + executor.oneIteration(); - // Pass all rows in the output to the process context - // - // System.out.println("Rows read in main output of step '"+stepname+"' : "+resultRows.size()); - for ( Object[] resultRow : resultRows ) { + // Evaluate the results... + // - // Pass the row to the process context - // - context.output( mainTupleTag, new KettleRow( resultRow ) ); - writtenCounter.inc(); - } + // Pass all rows in the output to the process context + // + for ( Object[] resultRow : resultRows ) { - // Pass whatever ended up on the target nodes - // - for ( int t = 0; t < targetResultRowsList.size(); t++ ) { - List targetRowsList = targetResultRowsList.get( t ); - TupleTag tupleTag = tupleTagList.get( t ); + // Pass the row to the process context + // + context.output( mainTupleTag, new KettleRow( resultRow ) ); + writtenCounter.inc(); + } - for ( Object[] targetRow : targetRowsList ) { - context.output( tupleTag, new KettleRow( targetRow ) ); - } - } + // Pass whatever ended up on the target nodes + // + for ( int t = 0; t < targetResultRowsList.size(); t++ ) { + List targetRowsList = targetResultRowsList.get( t ); + TupleTag tupleTag = tupleTagList.get( t ); - flushBufferCounter.inc(); - rowBuffer.clear(); - bufferStartTime.set( -1L ); - emptied = true; - } finally { - flushing.set( false ); + for ( Object[] targetRow : targetRowsList ) { + context.output( tupleTag, new KettleRow( targetRow ) ); } } - return emptied; } private StepMeta createInjectorStep( TransMeta transMeta, String injectorStepName, RowMetaInterface injectorRowMeta, int x, int y ) { diff --git a/src/main/java/org/kettle/beam/metastore/messages/messages_en_US.properties b/src/main/java/org/kettle/beam/metastore/messages/messages_en_US.properties index 3059ee4..51d2e32 100644 --- a/src/main/java/org/kettle/beam/metastore/messages/messages_en_US.properties +++ b/src/main/java/org/kettle/beam/metastore/messages/messages_en_US.properties @@ -32,7 +32,7 @@ BeamJobConfigDialog.XpPluginClasses.Button = Find BeamJobConfigDialog.FatJar.Label = Fat jar (file location) to use for execution BeamJobConfigDialog.FatJar.Button = Build -BeamJobConfigDialog.StreamingKettleStepsFlushInterval.Label = Streaming flush interval for buffered step data (s) +BeamJobConfigDialog.StreamingKettleStepsFlushInterval.Label = Streaming flush interval for buffered step data (ms) BeamJobConfigDialog.GcpProjectId.Label = Project ID BeamJobConfigDialog.GcpAppName.Label = App name diff --git a/src/main/java/org/kettle/beam/perspective/BeamHelper.java b/src/main/java/org/kettle/beam/perspective/BeamHelper.java index 0448564..89aff3e 100755 --- a/src/main/java/org/kettle/beam/perspective/BeamHelper.java +++ b/src/main/java/org/kettle/beam/perspective/BeamHelper.java @@ -73,6 +73,7 @@ import org.pentaho.di.core.variables.Variables; import org.pentaho.di.i18n.BaseMessages; import org.pentaho.di.trans.TransMeta; +import org.pentaho.di.trans.step.StepMeta; import org.pentaho.di.ui.core.dialog.EnterSelectionDialog; import org.pentaho.di.ui.core.dialog.ErrorDialog; import org.pentaho.di.ui.spoon.ISpoonMenuController; @@ -556,4 +557,33 @@ public void exportMetaStore() { } + private void setCurrentStepBeamFlag(String key, String value) { + TransGraph transGraph = spoon.getActiveTransGraph(); + if (transGraph==null) { + return; + } + StepMeta stepMeta = transGraph.getCurrentStep(); + if (stepMeta==null) { + return; + } + stepMeta.setAttribute(BeamConst.STRING_KETTLE_BEAM, key, value); + transGraph.redraw(); + } + + public void setBatching() { + setCurrentStepBeamFlag(BeamConst.STRING_STEP_FLAG_BATCH, "true"); + } + + public void clearBatching() { + setCurrentStepBeamFlag(BeamConst.STRING_STEP_FLAG_BATCH, "false"); + } + + public void setSingleThreaded() { + setCurrentStepBeamFlag(BeamConst.STRING_STEP_FLAG_SINGLE_THREADED, "true"); + } + + public void clearSingleThreaded() { + setCurrentStepBeamFlag(BeamConst.STRING_STEP_FLAG_SINGLE_THREADED, "false"); + } + } diff --git a/src/main/java/org/kettle/beam/perspective/BeamSpoonPlugin.java b/src/main/java/org/kettle/beam/perspective/BeamSpoonPlugin.java index 895d1b4..90cb6c5 100644 --- a/src/main/java/org/kettle/beam/perspective/BeamSpoonPlugin.java +++ b/src/main/java/org/kettle/beam/perspective/BeamSpoonPlugin.java @@ -28,7 +28,7 @@ import org.pentaho.ui.xul.XulException; @SpoonPlugin( id = "BeamSpoonPlugin", image = "" ) -@SpoonPluginCategories( { "spoon" } ) +@SpoonPluginCategories( { "spoon", "trans-graph" } ) public class BeamSpoonPlugin implements SpoonPluginInterface, SpoonLifecycleListener { private static final Class PKG = BeamSpoonPlugin.class; @@ -52,6 +52,10 @@ public void applyToContainer( String category, XulDomContainer container ) throw container.loadOverlay( "beam_spoon_overlays.xul", resourceBundle ); container.addEventHandler( BeamHelper.getInstance() ); } + if ( category.equals( "trans-graph" ) ) { + container.loadOverlay( "beam_transgraph_overlays.xul", resourceBundle ); + container.addEventHandler( BeamHelper.getInstance() ); + } } @Override diff --git a/src/main/java/org/kettle/beam/perspective/TransDrawExtensionPoint.java b/src/main/java/org/kettle/beam/perspective/TransDrawExtensionPoint.java new file mode 100644 index 0000000..2adf853 --- /dev/null +++ b/src/main/java/org/kettle/beam/perspective/TransDrawExtensionPoint.java @@ -0,0 +1,46 @@ +package org.kettle.beam.perspective; + +import org.apache.commons.lang.StringUtils; +import org.kettle.beam.util.BeamConst; +import org.pentaho.di.core.exception.KettleException; +import org.pentaho.di.core.extension.ExtensionPoint; +import org.pentaho.di.core.extension.ExtensionPointInterface; +import org.pentaho.di.core.gui.Point; +import org.pentaho.di.core.gui.PrimitiveGCInterface; +import org.pentaho.di.core.logging.LogChannelInterface; +import org.pentaho.di.trans.TransPainterExtension; + +@ExtensionPoint( + id = "Beam.TransDrawExtensionPoint", + description = "Draw batch or single threaded for Beam", + extensionPointId = "TransPainterStep" +) +public class TransDrawExtensionPoint implements ExtensionPointInterface { + @Override public void callExtensionPoint( LogChannelInterface log, Object object ) throws KettleException { + if ( !( object instanceof TransPainterExtension ) ) { + return; + } + TransPainterExtension ext = (TransPainterExtension) object; + boolean batch = "true".equalsIgnoreCase( ext.stepMeta.getAttribute( BeamConst.STRING_KETTLE_BEAM, BeamConst.STRING_STEP_FLAG_BATCH ) ); + boolean single = "true".equalsIgnoreCase( ext.stepMeta.getAttribute( BeamConst.STRING_KETTLE_BEAM, BeamConst.STRING_STEP_FLAG_SINGLE_THREADED ) ); + if (!batch && !single) { + return; + } + String str = ""; + if ( batch ) { + str += "Batch"; + } + if ( single ) { + if ( batch ) { + str += " / "; + } + str += "Single"; + } + if ( StringUtils.isNotEmpty( str ) ) { + str="Beam "+str; + Point strSize = ext.gc.textExtent( str ); + ext.gc.setFont( PrimitiveGCInterface.EFont.NOTE ); + ext.gc.drawText( str, ext.x1 + ext.iconsize, ext.y1 - strSize.y ); + } + } +} diff --git a/src/main/java/org/kettle/beam/pipeline/handler/BeamGenericStepHandler.java b/src/main/java/org/kettle/beam/pipeline/handler/BeamGenericStepHandler.java index 6b5b49c..4ea4d6c 100644 --- a/src/main/java/org/kettle/beam/pipeline/handler/BeamGenericStepHandler.java +++ b/src/main/java/org/kettle/beam/pipeline/handler/BeamGenericStepHandler.java @@ -7,21 +7,28 @@ import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.Flatten; import org.apache.beam.sdk.transforms.GroupByKey; +import org.apache.beam.sdk.transforms.Keys; +import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.Partition; import org.apache.beam.sdk.transforms.Values; import org.apache.beam.sdk.transforms.View; import org.apache.beam.sdk.transforms.WithKeys; import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionList; import org.apache.beam.sdk.values.PCollectionTuple; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTag; import org.kettle.beam.core.KettleRow; import org.kettle.beam.core.fn.StringToKettleRowFn; +import org.kettle.beam.core.partition.SinglePartitionFn; import org.kettle.beam.core.shared.VariableValue; +import org.kettle.beam.core.transform.StepBatchTransform; import org.kettle.beam.core.transform.StepTransform; import org.kettle.beam.core.util.JsonRowMeta; import org.kettle.beam.core.util.KettleBeamUtil; import org.kettle.beam.metastore.BeamJobConfig; +import org.kettle.beam.util.BeamConst; import org.pentaho.di.core.Const; import org.pentaho.di.core.exception.KettleException; import org.pentaho.di.core.logging.LogChannelInterface; @@ -56,6 +63,7 @@ public BeamGenericStepHandler( BeamJobConfig beamJobConfig, IMetaStore metaStore // boolean inputStep = input == null; boolean reduceParallelism = checkStepCopiesForReducedParallelism( stepMeta ); + reduceParallelism=reduceParallelism || needsSingleThreading( stepMeta ); String stepMetaInterfaceXml = XMLHandler.openTag( StepMeta.XML_TAG ) + stepMeta.getStepMetaInterface().getXML() + XMLHandler.closeTag( StepMeta.XML_TAG ); @@ -96,20 +104,27 @@ public BeamGenericStepHandler( BeamJobConfig beamJobConfig, IMetaStore metaStore // This is what the BeamJobConfig option "Streaming Kettle Steps Flush Interval" is for... // Without a valid value we default to -1 to disable flushing. // - int flushIntervalSeconds = Const.toInt(beamJobConfig.getStreamingKettleStepsFlushInterval(), -1); + int flushIntervalMs = Const.toInt(beamJobConfig.getStreamingKettleStepsFlushInterval(), -1); // Send all the information on their way to the right nodes // - StepTransform stepTransform = new StepTransform( variableValues, metaStoreJson, stepPluginClasses, xpPluginClasses, transMeta.getSizeRowset(), flushIntervalSeconds, - stepMeta.getName(), stepMeta.getStepID(), stepMetaInterfaceXml, JsonRowMeta.toJson( rowMeta ), inputStep, - targetSteps, infoSteps, infoRowMetaJsons, infoCollectionViews ); + PTransform, PCollectionTuple> stepTransform; + if (needsBatching(stepMeta)) { + stepTransform = new StepBatchTransform( variableValues, metaStoreJson, stepPluginClasses, xpPluginClasses, transMeta.getSizeRowset(), flushIntervalMs, + stepMeta.getName(), stepMeta.getStepID(), stepMetaInterfaceXml, JsonRowMeta.toJson( rowMeta ), inputStep, + targetSteps, infoSteps, infoRowMetaJsons, infoCollectionViews ); + } else { + stepTransform = new StepTransform( variableValues, metaStoreJson, stepPluginClasses, xpPluginClasses, transMeta.getSizeRowset(), flushIntervalMs, + stepMeta.getName(), stepMeta.getStepID(), stepMetaInterfaceXml, JsonRowMeta.toJson( rowMeta ), inputStep, + targetSteps, infoSteps, infoRowMetaJsons, infoCollectionViews ); + } if ( input == null ) { // Start from a dummy row and group over it. // Trick Beam into only running a single thread of the step that comes next. // input = pipeline - .apply( Create.of( Arrays.asList( "kettle-dummy-input-value" ) ) ).setCoder( StringUtf8Coder.of() ) + .apply( Create.of( Arrays.asList( "kettle-single-value" ) ) ).setCoder( StringUtf8Coder.of() ) .apply( WithKeys.of( (Void) null ) ) .apply( GroupByKey.create() ) .apply( Values.create() ) @@ -121,15 +136,28 @@ public BeamGenericStepHandler( BeamJobConfig beamJobConfig, IMetaStore metaStore String tupleId = KettleBeamUtil.createMainInputTupleId( stepMeta.getName() ); stepCollectionMap.put( tupleId, input ); } else if ( reduceParallelism ) { - - // group across all fields to get down to a single thread... - // - input = input.apply( WithKeys.of( (Void) null ) ) - .setCoder( KvCoder.of( VoidCoder.of(), input.getCoder() ) ) - .apply( GroupByKey.create() ) - .apply( Values.create() ) - .apply( Flatten.iterables() ) - ; + PCollection.IsBounded isBounded = input.isBounded(); + if (isBounded== PCollection.IsBounded.BOUNDED) { + // group across all fields to get down to a single thread... + // + input = input.apply( WithKeys.of( (Void) null ) ) + .setCoder( KvCoder.of( VoidCoder.of(), input.getCoder() ) ) + .apply( GroupByKey.create() ) + .apply( Values.create() ) + .apply( Flatten.iterables() ) + ; + } else { + + // Streaming: try partitioning over a single partition + // NOTE: doesn't seem to work + /* + input = input + .apply( Partition.of( 1, new SinglePartitionFn() ) ) + .apply( Flatten.pCollections() ) + ; + */ + throw new KettleException( "Unable to reduce parallel in an unbounded (streaming) pipeline in step : "+stepMeta.getName() ); + } } // Apply the step transform to the previous io step PCollection(s) @@ -158,6 +186,16 @@ public BeamGenericStepHandler( BeamJobConfig beamJobConfig, IMetaStore metaStore log.logBasic( "Handled step (STEP) : " + stepMeta.getName() + ", gets data from " + previousSteps.size() + " previous step(s), targets=" + targetSteps.size() + ", infos=" + infoSteps.size() ); } + public static boolean needsBatching( StepMeta stepMeta ) { + String value = stepMeta.getAttribute( BeamConst.STRING_KETTLE_BEAM, BeamConst.STRING_STEP_FLAG_BATCH ); + return value!=null && "true".equalsIgnoreCase( value ); + } + + public static boolean needsSingleThreading( StepMeta stepMeta ) { + String value = stepMeta.getAttribute( BeamConst.STRING_KETTLE_BEAM, BeamConst.STRING_STEP_FLAG_SINGLE_THREADED ); + return value!=null && "true".equalsIgnoreCase( value ); + } + private boolean checkStepCopiesForReducedParallelism( StepMeta stepMeta ) { if ( stepMeta.getCopiesString() == null ) { return false; diff --git a/src/main/java/org/kettle/beam/steps/kafka/BeamConsumeMeta.java b/src/main/java/org/kettle/beam/steps/kafka/BeamConsumeMeta.java index 2f9031b..5259dfa 100644 --- a/src/main/java/org/kettle/beam/steps/kafka/BeamConsumeMeta.java +++ b/src/main/java/org/kettle/beam/steps/kafka/BeamConsumeMeta.java @@ -77,7 +77,7 @@ public BeamConsumeMeta() { usingLogAppendTime = false; usingCreateTime = false; restrictedToCommitted = false; - allowingCommitOnConsumedOffset = false; + allowingCommitOnConsumedOffset = true; configOptions = new ArrayList<>(); } diff --git a/src/main/java/org/kettle/beam/util/BeamConst.java b/src/main/java/org/kettle/beam/util/BeamConst.java index 57fcac4..30cd6b8 100644 --- a/src/main/java/org/kettle/beam/util/BeamConst.java +++ b/src/main/java/org/kettle/beam/util/BeamConst.java @@ -38,6 +38,10 @@ public class BeamConst { public static final String STRING_KETTLE_BEAM = "Kettle Beam"; + public static final String STRING_STEP_FLAG_BATCH = "Batch"; + public static final String STRING_STEP_FLAG_SINGLE_THREADED = "SingleThreaded"; + + private static List gcpWorkerCodeDescriptions = Arrays.asList( new String[] { "n1-standard-1", "Standard machine type with 1 vCPU and 3.75 GB of memory." }, new String[] { "n1-standard-2", "Standard machine type with 2 vCPUs and 7.5 GB of memory." }, diff --git a/src/main/resources/beam_transgraph_overlays.xul b/src/main/resources/beam_transgraph_overlays.xul new file mode 100755 index 0000000..bf9d408 --- /dev/null +++ b/src/main/resources/beam_transgraph_overlays.xul @@ -0,0 +1,26 @@ + + + + + + + + + + + + + + + + + + +