From 90824da04da3c2fe85aa8a0a84cec40c250a43cd Mon Sep 17 00:00:00 2001 From: FeizNouri Date: Fri, 4 Oct 2019 09:41:53 +0100 Subject: [PATCH 1/2] added the SplitRecord processor --- .../logisland-processor-common/pom.xml | 6 + .../logisland/processor/SplitRecord.java | 144 +++++++++++++ .../logisland/processor/SplitRecordTest.java | 199 ++++++++++++++++++ 3 files changed, 349 insertions(+) create mode 100644 logisland-components/logisland-processors/logisland-processor-common/src/main/java/com/hurence/logisland/processor/SplitRecord.java create mode 100644 logisland-components/logisland-processors/logisland-processor-common/src/test/java/com/hurence/logisland/processor/SplitRecordTest.java diff --git a/logisland-components/logisland-processors/logisland-processor-common/pom.xml b/logisland-components/logisland-processors/logisland-processor-common/pom.xml index bcafd566e..2b2fe51ac 100644 --- a/logisland-components/logisland-processors/logisland-processor-common/pom.xml +++ b/logisland-components/logisland-processors/logisland-processor-common/pom.xml @@ -135,6 +135,12 @@ ${scala.version} compile + + org.scala-lang + scala-library + 2.12.9 + compile + diff --git a/logisland-components/logisland-processors/logisland-processor-common/src/main/java/com/hurence/logisland/processor/SplitRecord.java b/logisland-components/logisland-processors/logisland-processor-common/src/main/java/com/hurence/logisland/processor/SplitRecord.java new file mode 100644 index 000000000..3629b53cb --- /dev/null +++ b/logisland-components/logisland-processors/logisland-processor-common/src/main/java/com/hurence/logisland/processor/SplitRecord.java @@ -0,0 +1,144 @@ +package com.hurence.logisland.processor; + +import com.hurence.logisland.annotation.behavior.DynamicProperty; +import com.hurence.logisland.annotation.documentation.CapabilityDescription; +import com.hurence.logisland.component.InitializationException; +import com.hurence.logisland.component.PropertyDescriptor; +import com.hurence.logisland.record.Field; +import com.hurence.logisland.record.Record; +import com.hurence.logisland.record.StandardRecord; +import com.hurence.logisland.validator.StandardValidators; +import com.hurence.logisland.validator.ValidationContext; +import com.hurence.logisland.validator.ValidationResult; + + + +import java.util.*; + +@CapabilityDescription("This processor is used to create a new set of records from one record.") +@DynamicProperty(name = "new record name", + supportsExpressionLanguage = true, + value = "fields to have", + description = "the new record") +public class SplitRecord extends AbstractProcessor { + static final long serialVersionUID = 1413578915552852739L; + + public static final PropertyDescriptor KEEP_PARENT_RECORD = new PropertyDescriptor.Builder() + .name("keep.parent.record") + .description("Specify if the parent record should exist") + .required(false) + .defaultValue("false") + .addValidator(StandardValidators.BOOLEAN_VALIDATOR) + .build(); + public static final PropertyDescriptor KEEP_PARENT_RECORD_TYPE = new PropertyDescriptor.Builder() + .name("keep.parent.record_type") + .description("Specify whether to use the dynamic property name as record_type or not") + .required(false) + .defaultValue("false") + .addValidator(StandardValidators.BOOLEAN_VALIDATOR) + .build(); + public static final PropertyDescriptor KEEP_PARENT_RECORD_TIME = new PropertyDescriptor.Builder() + .name("keep.parent.record_time") + .description("Specify whether to use the processing_time as record_time or not") + .required(false) + .defaultValue("true") + .addValidator(StandardValidators.BOOLEAN_VALIDATOR) + .build(); + + @Override + public List getSupportedPropertyDescriptors() { + final List descriptors = new ArrayList<>(); + descriptors.add(KEEP_PARENT_RECORD); + descriptors.add(KEEP_PARENT_RECORD_TIME); + descriptors.add(KEEP_PARENT_RECORD_TYPE); + return Collections.unmodifiableList(descriptors); + } + @Override + protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) { + return new PropertyDescriptor.Builder() + .name(propertyDescriptorName) + .expressionLanguageSupported(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .required(true) + .dynamic(true) + .build(); + } + @Override + public void init(final ProcessContext context) throws InitializationException { + super.init(context); + } + /*@Override + protected Collection customValidate(final ValidationContext context) { + final List validationResults = new ArrayList<>(super.customValidate(context)); + try { + + } catch (Exception ex) { + validationResults.add( + new ValidationResult.Builder() + .input(ex.getMessage()) + .valid(false) + .build()); + } + return validationResults; + }*/ + + @Override + public Collection process(ProcessContext context, Collection records) { + final boolean keepParent = context.getPropertyValue(KEEP_PARENT_RECORD).asBoolean(); + final boolean keepParentType = context.getPropertyValue(KEEP_PARENT_RECORD_TYPE).asBoolean(); + final boolean keepParentTime = context.getPropertyValue(KEEP_PARENT_RECORD_TIME).asBoolean(); + Collection oringinRecods = new ArrayList<>(); + try { + init(context); + } catch (Throwable t) { + getLogger().error("error while initializing", t); + } + try { + for (Record record : records) { + for (Map.Entry entry : context.getProperties().entrySet()) { + if (!entry.getKey().isDynamic()) { + continue; + } + String newRecordType = entry.getKey().getName(); + String fieldName = entry.getValue(); + List fieldNames = getFieldsNames(fieldName); + Map map = new HashMap<>(); + Set> fields = record.getFieldsEntrySet(); + for (String field : fieldNames) { + if (record.hasField(field)) + map.put(field, record.getField(field)); + else throw new IllegalArgumentException("there is no field " + field); + } + Field idField = new Field(record.getId()); + map.put("parent_record_id", idField); + long recordTime = 0; + if(!keepParentTime) { + recordTime = System.nanoTime(); + } else {recordTime = record.getTime().getTime(); } + String recordType = null; + if (keepParentType) { + recordType = record.getType(); + } else {recordType = newRecordType;} + Record newRecord = new StandardRecord().setFields(map).setType(recordType).setTime(recordTime).setId(newRecordType); + oringinRecods.add(newRecord); + } + if (keepParent) { + oringinRecods.add(record); + } + } + } catch (Throwable t) { + getLogger().error("error while processing records ", t); + } + return oringinRecods; + } + + private List getFieldsNames (String fields) { + String[] field = fields.split(","); + List s1 = new ArrayList<>() ; + for (String s : field) { + s1.add(s.replaceAll(" ", "")) ; + } + return s1; + } + +} diff --git a/logisland-components/logisland-processors/logisland-processor-common/src/test/java/com/hurence/logisland/processor/SplitRecordTest.java b/logisland-components/logisland-processors/logisland-processor-common/src/test/java/com/hurence/logisland/processor/SplitRecordTest.java new file mode 100644 index 000000000..effeb4faf --- /dev/null +++ b/logisland-components/logisland-processors/logisland-processor-common/src/test/java/com/hurence/logisland/processor/SplitRecordTest.java @@ -0,0 +1,199 @@ +package com.hurence.logisland.processor; + +import com.hurence.logisland.record.FieldType; +import com.hurence.logisland.record.Record; +import com.hurence.logisland.record.StandardRecord; +import com.hurence.logisland.util.runner.MockRecord; +import com.hurence.logisland.util.runner.TestRunner; +import com.hurence.logisland.util.runner.TestRunners; +import org.junit.Test; + +public class SplitRecordTest { + @Test + public void testValidity() { + final TestRunner testRunner = TestRunners.newTestRunner(new SplitRecord()); + testRunner.assertValid(); + testRunner.setProperty(SplitRecord.KEEP_PARENT_RECORD, "00"); + testRunner.assertNotValid(); + testRunner.setProperty(SplitRecord.KEEP_PARENT_RECORD, "false"); + testRunner.assertValid(); + testRunner.setProperty(SplitRecord.KEEP_PARENT_RECORD_TIME, "false"); + testRunner.assertValid(); + testRunner.setProperty(SplitRecord.KEEP_PARENT_RECORD_TYPE, "tru"); + testRunner.assertNotValid(); + } + + @Test + public void testRecords() { + Record record1 = new StandardRecord(); + record1.setField("field1", FieldType.STRING, "Hello World"); + record1.setField("field2", FieldType.STRING, "Logisland"); + record1.setField("field3", FieldType.INT, 1000); + + TestRunner testRunner = TestRunners.newTestRunner(new SplitRecord()); + testRunner.setProperty("record_type1", "field1, field2"); + testRunner.setProperty("record_type2", "field3"); + testRunner.setProperty("record_type3", "field1, field3"); + testRunner.assertValid(); + testRunner.enqueue(record1); + testRunner.run(); + testRunner.assertAllInputRecordsProcessed(); + testRunner.assertOutputRecordsCount(3); + + MockRecord out = testRunner.getOutputRecords().get(2); + out.assertRecordSizeEquals(3); + out.assertFieldTypeEquals("field1", FieldType.STRING); + out.assertFieldTypeEquals("field2", FieldType.STRING); + out.assertFieldTypeEquals("record_time", FieldType.LONG); + out.assertFieldTypeEquals("record_type", FieldType.STRING); + out.assertFieldEquals("field1", "Hello World"); + out.assertFieldEquals("field2", "Logisland"); + out.assertFieldEquals("record_type", "record_type1"); + out.assertFieldEquals("record_time", record1.getTime().getTime()); + + MockRecord out1 = testRunner.getOutputRecords().get(0); + out1.assertRecordSizeEquals(2); + out1.assertFieldTypeEquals("field3", FieldType.INT); + out1.assertFieldTypeEquals("record_time", FieldType.LONG); + out1.assertFieldTypeEquals("record_type", FieldType.STRING); + out1.assertFieldEquals("field3", 1000); + out1.assertFieldEquals("record_type", "record_type2"); + out1.assertFieldEquals("record_time", record1.getTime().getTime()); + + MockRecord out2 = testRunner.getOutputRecords().get(1); + out2.assertRecordSizeEquals(3); + out2.assertFieldTypeEquals("field1", FieldType.STRING); + out2.assertFieldTypeEquals("field3", FieldType.INT); + out2.assertFieldTypeEquals("record_time", FieldType.LONG); + out2.assertFieldTypeEquals("record_type", FieldType.STRING); + out2.assertFieldEquals("field1", "Hello World"); + out2.assertFieldEquals("field3", 1000); + out2.assertFieldEquals("record_type", "record_type3"); + out2.assertFieldEquals("record_time", record1.getTime().getTime()); + } + @Test + public void testRecordsKeepParent() { + Record record1 = new StandardRecord(); + record1.setField("field1", FieldType.STRING, "Hello World"); + record1.setField("field2", FieldType.STRING, "Logisland"); + record1.setField("field3", FieldType.INT, 1000); + + TestRunner testRunner = TestRunners.newTestRunner(new SplitRecord()); + testRunner.setProperty(SplitRecord.KEEP_PARENT_RECORD, "true"); + testRunner.setProperty("record_type1", "field1, field2"); + testRunner.setProperty("record_type2", "field3"); + testRunner.setProperty("record_type3", "field1, field3"); + testRunner.assertValid(); + testRunner.enqueue(record1); + testRunner.run(); + testRunner.assertAllInputRecordsProcessed(); + testRunner.assertOutputRecordsCount(4); + + MockRecord out = testRunner.getOutputRecords().get(2); + out.assertRecordSizeEquals(3); + out.assertFieldTypeEquals("field1", FieldType.STRING); + out.assertFieldTypeEquals("field2", FieldType.STRING); + out.assertFieldEquals("field1", "Hello World"); + out.assertFieldEquals("field2", "Logisland"); + + MockRecord out1 = testRunner.getOutputRecords().get(0); + out1.assertRecordSizeEquals(2); + out1.assertFieldTypeEquals("field3", FieldType.INT); + out1.assertFieldEquals("field3", 1000); + + MockRecord out2 = testRunner.getOutputRecords().get(1); + out2.assertRecordSizeEquals(3); + out2.assertFieldTypeEquals("field1", FieldType.STRING); + out2.assertFieldTypeEquals("field3", FieldType.INT); + out2.assertFieldEquals("field1", "Hello World"); + out2.assertFieldEquals("field3", 1000); + + MockRecord out3 = testRunner.getOutputRecords().get(3); + out3.assertRecordSizeEquals(3); + out3.assertFieldTypeEquals("field1", FieldType.STRING); + out3.assertFieldTypeEquals("field2", FieldType.STRING); + out3.assertFieldTypeEquals("field3", FieldType.INT); + out3.assertFieldEquals("field1", "Hello World"); + out3.assertFieldEquals("field2", "Logisland"); + out3.assertFieldEquals("field3", 1000); + + } + @Test + public void testRecordsKeepAll() { + Record record1 = new StandardRecord(); + record1.setField("field1", FieldType.STRING, "Hello World"); + record1.setField("field2", FieldType.STRING, "Logisland"); + record1.setField("field3", FieldType.INT, 1000); + + TestRunner testRunner = TestRunners.newTestRunner(new SplitRecord()); + testRunner.setProperty(SplitRecord.KEEP_PARENT_RECORD, "true"); + testRunner.setProperty(SplitRecord.KEEP_PARENT_RECORD_TIME, "false"); + testRunner.setProperty(SplitRecord.KEEP_PARENT_RECORD_TYPE, "true"); + testRunner.setProperty("record_type1", "field1, field2"); + testRunner.setProperty("record_type2", "field3"); + testRunner.setProperty("record_type3", "field1, field3"); + testRunner.assertValid(); + testRunner.enqueue(record1); + testRunner.run(); + testRunner.assertAllInputRecordsProcessed(); + testRunner.assertOutputRecordsCount(4); + + MockRecord out = testRunner.getOutputRecords().get(2); + out.assertRecordSizeEquals(3); + out.assertFieldTypeEquals("field1", FieldType.STRING); + out.assertFieldTypeEquals("field2", FieldType.STRING); + out.assertFieldTypeEquals("record_type", FieldType.STRING); + out.assertFieldTypeEquals("record_time", FieldType.LONG); + out.assertFieldEquals("field1", "Hello World"); + out.assertFieldEquals("field2", "Logisland"); + out.assertFieldEquals("record_type", record1.getType()); + + MockRecord out1 = testRunner.getOutputRecords().get(0); + out1.assertRecordSizeEquals(2); + out1.assertFieldTypeEquals("field3", FieldType.INT); + out1.assertFieldTypeEquals("record_type", FieldType.STRING); + out1.assertFieldTypeEquals("record_time", FieldType.LONG); + out1.assertFieldEquals("field3", 1000); + out1.assertFieldEquals("record_type", record1.getType()); + + MockRecord out2 = testRunner.getOutputRecords().get(1); + out2.assertRecordSizeEquals(3); + out2.assertFieldTypeEquals("field1", FieldType.STRING); + out2.assertFieldTypeEquals("field3", FieldType.INT); + out2.assertFieldTypeEquals("record_type", FieldType.STRING); + out2.assertFieldTypeEquals("record_time", FieldType.LONG); + out2.assertFieldEquals("field1", "Hello World"); + out2.assertFieldEquals("field3", 1000); + out2.assertFieldEquals("record_type", record1.getType()); + + MockRecord out3 = testRunner.getOutputRecords().get(3); + out3.assertRecordSizeEquals(3); + out3.assertFieldTypeEquals("field1", FieldType.STRING); + out3.assertFieldTypeEquals("field2", FieldType.STRING); + out3.assertFieldTypeEquals("field3", FieldType.INT); + out3.assertFieldTypeEquals("record_type", FieldType.STRING); + out3.assertFieldTypeEquals("record_time", FieldType.LONG); + out3.assertFieldEquals("field1", "Hello World"); + out3.assertFieldEquals("field2", "Logisland"); + out3.assertFieldEquals("field3", 1000); + out3.assertFieldEquals("record_type", record1.getType()); + } + @Test + public void testValidFields() { + Record record1 = new StandardRecord(); + record1.setField("field1", FieldType.STRING, "Hello World"); + record1.setField("field2", FieldType.STRING, "Logisland"); + record1.setField("field3", FieldType.INT, 1000); + + TestRunner testRunner = TestRunners.newTestRunner(new SplitRecord()); + testRunner.setProperty(SplitRecord.KEEP_PARENT_RECORD, "false"); + testRunner.setProperty(SplitRecord.KEEP_PARENT_RECORD_TIME, "true"); + testRunner.setProperty(SplitRecord.KEEP_PARENT_RECORD_TYPE, "false"); + testRunner.setProperty("record_type1", "field5, field2"); + testRunner.setProperty("record_type2", "field3"); + testRunner.setProperty("record_type3", "field5, field3"); + testRunner.assertValid(); + } + + +} From bfbad8f104ee7a4462b85bb8e935464662e0fb0c Mon Sep 17 00:00:00 2001 From: FeizNouri Date: Mon, 7 Oct 2019 11:48:53 +0100 Subject: [PATCH 2/2] fixed some reviews --- .../logisland-processor-common/pom.xml | 6 - .../logisland/processor/SplitRecord.java | 22 +++- .../logisland/processor/SplitRecordTest.java | 121 +++++++++++++++++- 3 files changed, 132 insertions(+), 17 deletions(-) diff --git a/logisland-components/logisland-processors/logisland-processor-common/pom.xml b/logisland-components/logisland-processors/logisland-processor-common/pom.xml index 2b2fe51ac..bcafd566e 100644 --- a/logisland-components/logisland-processors/logisland-processor-common/pom.xml +++ b/logisland-components/logisland-processors/logisland-processor-common/pom.xml @@ -135,12 +135,6 @@ ${scala.version} compile - - org.scala-lang - scala-library - 2.12.9 - compile - diff --git a/logisland-components/logisland-processors/logisland-processor-common/src/main/java/com/hurence/logisland/processor/SplitRecord.java b/logisland-components/logisland-processors/logisland-processor-common/src/main/java/com/hurence/logisland/processor/SplitRecord.java index 3629b53cb..fd7de391e 100644 --- a/logisland-components/logisland-processors/logisland-processor-common/src/main/java/com/hurence/logisland/processor/SplitRecord.java +++ b/logisland-components/logisland-processors/logisland-processor-common/src/main/java/com/hurence/logisland/processor/SplitRecord.java @@ -5,6 +5,7 @@ import com.hurence.logisland.component.InitializationException; import com.hurence.logisland.component.PropertyDescriptor; import com.hurence.logisland.record.Field; +import com.hurence.logisland.record.FieldDictionary; import com.hurence.logisland.record.Record; import com.hurence.logisland.record.StandardRecord; import com.hurence.logisland.validator.StandardValidators; @@ -87,7 +88,7 @@ public Collection process(ProcessContext context, Collection rec final boolean keepParent = context.getPropertyValue(KEEP_PARENT_RECORD).asBoolean(); final boolean keepParentType = context.getPropertyValue(KEEP_PARENT_RECORD_TYPE).asBoolean(); final boolean keepParentTime = context.getPropertyValue(KEEP_PARENT_RECORD_TIME).asBoolean(); - Collection oringinRecods = new ArrayList<>(); + Collection oringinRecords = new ArrayList<>(); try { init(context); } catch (Throwable t) { @@ -104,10 +105,15 @@ public Collection process(ProcessContext context, Collection rec List fieldNames = getFieldsNames(fieldName); Map map = new HashMap<>(); Set> fields = record.getFieldsEntrySet(); + boolean errorField = false; + String errorList = ""; for (String field : fieldNames) { if (record.hasField(field)) map.put(field, record.getField(field)); - else throw new IllegalArgumentException("there is no field " + field); + else { + errorField = true; + errorList = errorList.concat(String.format("there is no field %s \n",field)); + } } Field idField = new Field(record.getId()); map.put("parent_record_id", idField); @@ -119,17 +125,21 @@ public Collection process(ProcessContext context, Collection rec if (keepParentType) { recordType = record.getType(); } else {recordType = newRecordType;} - Record newRecord = new StandardRecord().setFields(map).setType(recordType).setTime(recordTime).setId(newRecordType); - oringinRecods.add(newRecord); + if (errorField) { + oringinRecords.add(new StandardRecord().setFields(map).addError("there are some field(s) that don't exist : \n" + errorList).setType(recordType).setTime(recordTime).setId(UUID.randomUUID().toString())); + } else { + Record newRecord = new StandardRecord().setFields(map).setType(recordType).setTime(recordTime).setId(UUID.randomUUID().toString()); + oringinRecords.add(newRecord); + } } if (keepParent) { - oringinRecods.add(record); + oringinRecords.add(record); } } } catch (Throwable t) { getLogger().error("error while processing records ", t); } - return oringinRecods; + return oringinRecords; } private List getFieldsNames (String fields) { diff --git a/logisland-components/logisland-processors/logisland-processor-common/src/test/java/com/hurence/logisland/processor/SplitRecordTest.java b/logisland-components/logisland-processors/logisland-processor-common/src/test/java/com/hurence/logisland/processor/SplitRecordTest.java index effeb4faf..8e5acca0c 100644 --- a/logisland-components/logisland-processors/logisland-processor-common/src/test/java/com/hurence/logisland/processor/SplitRecordTest.java +++ b/logisland-components/logisland-processors/logisland-processor-common/src/test/java/com/hurence/logisland/processor/SplitRecordTest.java @@ -186,13 +186,124 @@ public void testValidFields() { record1.setField("field3", FieldType.INT, 1000); TestRunner testRunner = TestRunners.newTestRunner(new SplitRecord()); - testRunner.setProperty(SplitRecord.KEEP_PARENT_RECORD, "false"); - testRunner.setProperty(SplitRecord.KEEP_PARENT_RECORD_TIME, "true"); - testRunner.setProperty(SplitRecord.KEEP_PARENT_RECORD_TYPE, "false"); - testRunner.setProperty("record_type1", "field5, field2"); + testRunner.setProperty("record_type1", "field1, field2"); testRunner.setProperty("record_type2", "field3"); - testRunner.setProperty("record_type3", "field5, field3"); + testRunner.setProperty("record_type3", "field1, field3"); + testRunner.assertValid(); + testRunner.enqueue(record1); + testRunner.run(); + testRunner.assertAllInputRecordsProcessed(); + testRunner.assertOutputRecordsCount(3); + + MockRecord out = testRunner.getOutputRecords().get(2); + out.assertRecordSizeEquals(3); + out.assertFieldTypeEquals("record_time", FieldType.LONG); + out.assertFieldTypeEquals("record_type", FieldType.STRING); + out.assertFieldEquals("record_type", "record_type1"); + out.assertFieldEquals("record_time", record1.getTime().getTime()); + + MockRecord out1 = testRunner.getOutputRecords().get(0); + out1.assertRecordSizeEquals(2); + out1.assertFieldTypeEquals("record_time", FieldType.LONG); + out1.assertFieldTypeEquals("record_type", FieldType.STRING); + out1.assertFieldEquals("record_type", "record_type2"); + out1.assertFieldEquals("record_time", record1.getTime().getTime()); + + MockRecord out2 = testRunner.getOutputRecords().get(1); + out2.assertRecordSizeEquals(3); + out2.assertFieldTypeEquals("record_time", FieldType.LONG); + out2.assertFieldTypeEquals("record_type", FieldType.STRING); + out2.assertFieldEquals("record_type", "record_type3"); + out2.assertFieldEquals("record_time", record1.getTime().getTime()); + + Record record2 = new StandardRecord(); + record2.setField("field1", FieldType.STRING, "Hello World"); + record2.setField("field2", FieldType.STRING, "Logisland"); + record2.setField("field3", FieldType.INT, 1000); + + TestRunner testRunner1 = TestRunners.newTestRunner(new SplitRecord()); + testRunner1.setProperty(SplitRecord.KEEP_PARENT_RECORD, "true"); + testRunner1.setProperty(SplitRecord.KEEP_PARENT_RECORD_TIME, "false"); + testRunner1.setProperty(SplitRecord.KEEP_PARENT_RECORD_TYPE, "true"); + testRunner1.setProperty("record_type1", "field1, field2"); + testRunner1.setProperty("record_type2", "field3"); + testRunner1.setProperty("record_type3", "field1, field3"); + testRunner1.assertValid(); + testRunner1.enqueue(record2); + testRunner1.run(); + testRunner1.assertAllInputRecordsProcessed(); + testRunner1.assertOutputRecordsCount(4); + + MockRecord out4 = testRunner1.getOutputRecords().get(2); + out4.assertRecordSizeEquals(3); + out4.assertFieldTypeEquals("record_type", FieldType.STRING); + out4.assertFieldTypeEquals("record_time", FieldType.LONG); + out4.assertFieldEquals("record_type", record2.getType()); + + MockRecord out5 = testRunner1.getOutputRecords().get(0); + out5.assertRecordSizeEquals(2); + out5.assertFieldTypeEquals("record_type", FieldType.STRING); + out5.assertFieldTypeEquals("record_time", FieldType.LONG); + out5.assertFieldEquals("record_type", record2.getType()); + + MockRecord out6 = testRunner1.getOutputRecords().get(1); + out6.assertRecordSizeEquals(3); + out6.assertFieldTypeEquals("record_type", FieldType.STRING); + out6.assertFieldTypeEquals("record_time", FieldType.LONG); + out6.assertFieldEquals("record_type", record2.getType()); + + MockRecord out3 = testRunner1.getOutputRecords().get(3); + out3.assertRecordSizeEquals(3); + + out3.assertFieldTypeEquals("record_type", FieldType.STRING); + out3.assertFieldTypeEquals("record_time", FieldType.LONG); + out3.assertFieldEquals("record_type", record2.getType()); + } + @Test + public void testRecordsError() { + Record record1 = new StandardRecord(); + record1.setField("field1", FieldType.STRING, "Hello World"); + record1.setField("field2", FieldType.STRING, "Logisland"); + record1.setField("field3", FieldType.INT, 1000); + + TestRunner testRunner = TestRunners.newTestRunner(new SplitRecord()); + testRunner.setProperty("record_type1", "field5, field2"); + testRunner.setProperty("record_type2", "field789"); + testRunner.setProperty("record_type3", "field1, field3"); testRunner.assertValid(); + testRunner.enqueue(record1); + testRunner.run(); + testRunner.assertAllInputRecordsProcessed(); + testRunner.assertOutputRecordsCount(1); + + MockRecord out = testRunner.getOutputRecords().get(2); + out.assertRecordSizeEquals(3); + out.assertFieldTypeEquals("field2", FieldType.STRING); + out.assertFieldTypeEquals("record_time", FieldType.LONG); + out.assertFieldTypeEquals("record_type", FieldType.STRING); + out.assertFieldTypeEquals("record_errors", FieldType.ARRAY); + out.assertFieldEquals("field2", "Logisland"); + out.assertFieldEquals("record_type", "record_type1"); + out.assertFieldEquals("record_time", record1.getTime().getTime()); + + MockRecord out1 = testRunner.getOutputRecords().get(0); + out1.assertRecordSizeEquals(2); + out1.assertFieldTypeEquals("record_time", FieldType.LONG); + out1.assertFieldTypeEquals("record_type", FieldType.STRING); + out1.assertFieldTypeEquals("record_errors", FieldType.ARRAY); + out1.assertFieldEquals("record_type", "record_type2"); + out1.assertFieldEquals("record_time", record1.getTime().getTime()); + + MockRecord out2 = testRunner.getOutputRecords().get(1); + out2.assertRecordSizeEquals(3); + out2.assertFieldTypeEquals("field1", FieldType.STRING); + out2.assertFieldTypeEquals("field3", FieldType.INT); + out2.assertFieldTypeEquals("record_time", FieldType.LONG); + out2.assertFieldTypeEquals("record_type", FieldType.STRING); + out2.assertFieldEquals("field1", "Hello World"); + out2.assertFieldEquals("field3", 1000); + out2.assertFieldEquals("record_type", "record_type3"); + out2.assertFieldEquals("record_time", record1.getTime().getTime()); }