Skip to content

Commit

Permalink
Add transformers among formats CSV, Json, KV, Columns and Triple. See #…
Browse files Browse the repository at this point in the history
  • Loading branch information
chengscu authored and shaomeng.wang committed Jun 4, 2020
1 parent 2c1a195 commit 8ee1b4f
Show file tree
Hide file tree
Showing 155 changed files with 3,527 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,4 +54,9 @@ protected TableSchema getDataSchema() {
*/
public abstract TableSchema getOutputSchema();

public void open() {
}

public void close() {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,32 @@

import java.io.Serializable;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;

/**
* Adapt a {@link Mapper} to run within flink.
*/
public class FlatMapperAdapter implements FlatMapFunction<Row, Row>, Serializable {
public class FlatMapperAdapter extends RichFlatMapFunction<Row, Row> implements Serializable {

private final FlatMapper mapper;

public FlatMapperAdapter(FlatMapper mapper) {
this.mapper = mapper;
}

@Override
public void open(Configuration parameters) throws Exception {
mapper.open();
}

@Override
public void close() throws Exception {
mapper.close();
}

@Override
public void flatMap(Row value, Collector<Row> out) throws Exception {
this.mapper.flatMap(value, out);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package com.alibaba.alink.operator.batch.dataproc.format;

import com.alibaba.alink.operator.batch.utils.FlatMapBatchOp;
import com.alibaba.alink.operator.common.dataproc.format.AnyToTripleFlatMapper;
import com.alibaba.alink.operator.common.dataproc.format.FormatTransParams;
import com.alibaba.alink.operator.common.dataproc.format.FormatType;
import com.alibaba.alink.params.dataproc.format.ToTripleParams;
import org.apache.flink.ml.api.misc.param.Params;

public class AnyToTripleBatchOp<T extends AnyToTripleBatchOp<T>> extends FlatMapBatchOp<T>
implements ToTripleParams<T> {

public AnyToTripleBatchOp() {
this(null);
}

public AnyToTripleBatchOp(FormatType formatType, Params params) {
this(
(null == params ? new Params() : params)
.set(FormatTransParams.FROM_FORMAT, formatType)
);
}

public AnyToTripleBatchOp(Params params) {
super(AnyToTripleFlatMapper::new, params);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package com.alibaba.alink.operator.batch.dataproc.format;

import org.apache.flink.ml.api.misc.param.Params;

import com.alibaba.alink.operator.batch.utils.MapBatchOp;
import com.alibaba.alink.operator.common.dataproc.format.FormatTransMapper;
import com.alibaba.alink.operator.common.dataproc.format.FormatTransParams;
import com.alibaba.alink.operator.common.dataproc.format.FormatType;

/**
* Transform vector to table columns. This transformer will map vector column to columns as designed.
*/
public class BaseFormatTransBatchOp<T extends BaseFormatTransBatchOp <T>> extends MapBatchOp <T> {

private BaseFormatTransBatchOp() {
this(null);
}

public BaseFormatTransBatchOp(FormatType fromFormat, FormatType toFormat, Params params) {
this(
(null == params ? new Params() : params)
.set(FormatTransParams.FROM_FORMAT, fromFormat)
.set(FormatTransParams.TO_FORMAT, toFormat)
);
}

private BaseFormatTransBatchOp(Params params) {
super(FormatTransMapper::new, params);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@

package com.alibaba.alink.operator.batch.dataproc.format;

import com.alibaba.alink.operator.common.dataproc.format.FormatType;
import com.alibaba.alink.params.dataproc.format.ColumnsToCsvParams;
import org.apache.flink.ml.api.misc.param.Params;

public class ColumnsToCsvBatchOp extends BaseFormatTransBatchOp<ColumnsToCsvBatchOp>
implements ColumnsToCsvParams<ColumnsToCsvBatchOp> {

public ColumnsToCsvBatchOp() {
this(new Params());
}

public ColumnsToCsvBatchOp(Params params) {
super(FormatType.COLUMNS, FormatType.CSV, params);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@

package com.alibaba.alink.operator.batch.dataproc.format;

import com.alibaba.alink.operator.common.dataproc.format.FormatType;
import com.alibaba.alink.params.dataproc.format.ColumnsToJsonParams;
import org.apache.flink.ml.api.misc.param.Params;

public class ColumnsToJsonBatchOp extends BaseFormatTransBatchOp<ColumnsToJsonBatchOp>
implements ColumnsToJsonParams<ColumnsToJsonBatchOp> {

public ColumnsToJsonBatchOp() {
this(new Params());
}

public ColumnsToJsonBatchOp(Params params) {
super(FormatType.COLUMNS, FormatType.JSON, params);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@

package com.alibaba.alink.operator.batch.dataproc.format;

import com.alibaba.alink.operator.common.dataproc.format.FormatType;
import com.alibaba.alink.params.dataproc.format.ColumnsToKvParams;
import org.apache.flink.ml.api.misc.param.Params;

public class ColumnsToKvBatchOp extends BaseFormatTransBatchOp<ColumnsToKvBatchOp>
implements ColumnsToKvParams<ColumnsToKvBatchOp> {

public ColumnsToKvBatchOp() {
this(new Params());
}

public ColumnsToKvBatchOp(Params params) {
super(FormatType.COLUMNS, FormatType.KV, params);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package com.alibaba.alink.operator.batch.dataproc.format;

import com.alibaba.alink.operator.common.dataproc.format.FormatType;
import com.alibaba.alink.params.dataproc.format.ColumnsToTripleParams;
import org.apache.flink.ml.api.misc.param.Params;

public class ColumnsToTripleBatchOp extends AnyToTripleBatchOp<ColumnsToTripleBatchOp>
implements ColumnsToTripleParams<ColumnsToTripleBatchOp> {

private static final long serialVersionUID = 7543648266815893977L;

public ColumnsToTripleBatchOp() {
this(new Params());
}

public ColumnsToTripleBatchOp(Params params) {
super(FormatType.COLUMNS, params);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@

package com.alibaba.alink.operator.batch.dataproc.format;

import com.alibaba.alink.operator.common.dataproc.format.FormatType;
import com.alibaba.alink.params.dataproc.format.ColumnsToVectorParams;
import org.apache.flink.ml.api.misc.param.Params;

public class ColumnsToVectorBatchOp extends BaseFormatTransBatchOp<ColumnsToVectorBatchOp>
implements ColumnsToVectorParams<ColumnsToVectorBatchOp> {

public ColumnsToVectorBatchOp() {
this(new Params());
}

public ColumnsToVectorBatchOp(Params params) {
super(FormatType.COLUMNS, FormatType.VECTOR, params);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@

package com.alibaba.alink.operator.batch.dataproc.format;

import com.alibaba.alink.operator.common.dataproc.format.FormatType;
import com.alibaba.alink.params.dataproc.format.CsvToColumnsParams;
import org.apache.flink.ml.api.misc.param.Params;

public class CsvToColumnsBatchOp extends BaseFormatTransBatchOp<CsvToColumnsBatchOp>
implements CsvToColumnsParams<CsvToColumnsBatchOp> {

public CsvToColumnsBatchOp() {
this(new Params());
}

public CsvToColumnsBatchOp(Params params) {
super(FormatType.CSV, FormatType.COLUMNS, params);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@

package com.alibaba.alink.operator.batch.dataproc.format;

import com.alibaba.alink.operator.common.dataproc.format.FormatType;
import com.alibaba.alink.params.dataproc.format.CsvToJsonParams;
import org.apache.flink.ml.api.misc.param.Params;

public class CsvToJsonBatchOp extends BaseFormatTransBatchOp<CsvToJsonBatchOp>
implements CsvToJsonParams<CsvToJsonBatchOp> {

public CsvToJsonBatchOp() {
this(new Params());
}

public CsvToJsonBatchOp(Params params) {
super(FormatType.CSV, FormatType.JSON, params);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@

package com.alibaba.alink.operator.batch.dataproc.format;

import com.alibaba.alink.operator.common.dataproc.format.FormatType;
import com.alibaba.alink.params.dataproc.format.CsvToKvParams;
import org.apache.flink.ml.api.misc.param.Params;

public class CsvToKvBatchOp extends BaseFormatTransBatchOp<CsvToKvBatchOp>
implements CsvToKvParams<CsvToKvBatchOp> {

public CsvToKvBatchOp() {
this(new Params());
}

public CsvToKvBatchOp(Params params) {
super(FormatType.CSV, FormatType.KV, params);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package com.alibaba.alink.operator.batch.dataproc.format;

import com.alibaba.alink.operator.common.dataproc.format.FormatType;
import com.alibaba.alink.params.dataproc.format.CsvToTripleParams;
import org.apache.flink.ml.api.misc.param.Params;

public class CsvToTripleBatchOp extends AnyToTripleBatchOp<CsvToTripleBatchOp>
implements CsvToTripleParams<CsvToTripleBatchOp> {

private static final long serialVersionUID = 7543648266815893977L;

public CsvToTripleBatchOp() {
this(new Params());
}

public CsvToTripleBatchOp(Params params) {
super(FormatType.CSV, params);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@

package com.alibaba.alink.operator.batch.dataproc.format;

import com.alibaba.alink.operator.common.dataproc.format.FormatType;
import com.alibaba.alink.params.dataproc.format.CsvToVectorParams;
import org.apache.flink.ml.api.misc.param.Params;

public class CsvToVectorBatchOp extends BaseFormatTransBatchOp<CsvToVectorBatchOp>
implements CsvToVectorParams<CsvToVectorBatchOp> {

public CsvToVectorBatchOp() {
this(new Params());
}

public CsvToVectorBatchOp(Params params) {
super(FormatType.CSV, FormatType.VECTOR, params);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@

package com.alibaba.alink.operator.batch.dataproc.format;

import com.alibaba.alink.operator.common.dataproc.format.FormatType;
import com.alibaba.alink.params.dataproc.format.JsonToColumnsParams;
import org.apache.flink.ml.api.misc.param.Params;

public class JsonToColumnsBatchOp extends BaseFormatTransBatchOp<JsonToColumnsBatchOp>
implements JsonToColumnsParams<JsonToColumnsBatchOp> {

public JsonToColumnsBatchOp() {
this(new Params());
}

public JsonToColumnsBatchOp(Params params) {
super(FormatType.JSON, FormatType.COLUMNS, params);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@

package com.alibaba.alink.operator.batch.dataproc.format;

import com.alibaba.alink.operator.common.dataproc.format.FormatType;
import com.alibaba.alink.params.dataproc.format.JsonToCsvParams;
import org.apache.flink.ml.api.misc.param.Params;

public class JsonToCsvBatchOp extends BaseFormatTransBatchOp<JsonToCsvBatchOp>
implements JsonToCsvParams<JsonToCsvBatchOp> {

public JsonToCsvBatchOp() {
this(new Params());
}

public JsonToCsvBatchOp(Params params) {
super(FormatType.JSON, FormatType.CSV, params);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@

package com.alibaba.alink.operator.batch.dataproc.format;

import com.alibaba.alink.operator.common.dataproc.format.FormatType;
import com.alibaba.alink.params.dataproc.format.JsonToKvParams;
import org.apache.flink.ml.api.misc.param.Params;

public class JsonToKvBatchOp extends BaseFormatTransBatchOp<JsonToKvBatchOp>
implements JsonToKvParams<JsonToKvBatchOp> {

public JsonToKvBatchOp() {
this(new Params());
}

public JsonToKvBatchOp(Params params) {
super(FormatType.JSON, FormatType.KV, params);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package com.alibaba.alink.operator.batch.dataproc.format;

import com.alibaba.alink.operator.common.dataproc.format.FormatType;
import com.alibaba.alink.params.dataproc.format.JsonToTripleParams;
import org.apache.flink.ml.api.misc.param.Params;

public class JsonToTripleBatchOp extends AnyToTripleBatchOp<JsonToTripleBatchOp>
implements JsonToTripleParams<JsonToTripleBatchOp> {

private static final long serialVersionUID = 7543648266815893977L;

public JsonToTripleBatchOp() {
this(new Params());
}

public JsonToTripleBatchOp(Params params) {
super(FormatType.JSON, params);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@

package com.alibaba.alink.operator.batch.dataproc.format;

import com.alibaba.alink.operator.common.dataproc.format.FormatType;
import com.alibaba.alink.params.dataproc.format.JsonToVectorParams;
import org.apache.flink.ml.api.misc.param.Params;

public class JsonToVectorBatchOp extends BaseFormatTransBatchOp<JsonToVectorBatchOp>
implements JsonToVectorParams<JsonToVectorBatchOp> {

public JsonToVectorBatchOp() {
this(new Params());
}

public JsonToVectorBatchOp(Params params) {
super(FormatType.JSON, FormatType.VECTOR, params);
}
}
Loading

0 comments on commit 8ee1b4f

Please sign in to comment.