Skip to content

Commit

Permalink
广告中的归因: 请求-》曝光-》点击
Browse files Browse the repository at this point in the history
  • Loading branch information
LinMingQiang committed Dec 25, 2020
1 parent 92a5c82 commit ff1c1d4
Show file tree
Hide file tree
Showing 13 changed files with 537 additions and 79 deletions.
10 changes: 5 additions & 5 deletions flink-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@
<artifactId>flink-core</artifactId>
<dependencies>
<!-- 日志输出用-->
<!-- <dependency>-->
<!-- <groupId>org.slf4j</groupId>-->
<!-- <artifactId>slf4j-simple</artifactId>-->
<!-- <version>1.7.25</version>-->
<!-- </dependency>-->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.7.25</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
Expand Down
105 changes: 105 additions & 0 deletions flink-core/src/main/java/com/flink/learn/bean/ReportLogPojo.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
package com.flink.learn.bean;

public class ReportLogPojo {
public String app_id;
public Long reqTime = 0L;
public Long impTime = 0L;
public Long clickTime = 0L;
public String req_id;
public Long req;
public Long imp;
public Long click;

public ReportLogPojo() {
}

@Override
public String toString() {
return "ReportLogPojo{" +
"app_id='" + app_id + '\'' +
", reqTime=" + reqTime +
", impTime=" + impTime +
", clickTime=" + clickTime +
", req_id='" + req_id + '\'' +
", req=" + req +
", imp=" + imp +
", click=" + click +
'}';
}

public String getApp_id() {
return app_id;
}

public void setApp_id(String app_id) {
this.app_id = app_id;
}

public Long getReqTime() {
return reqTime;
}

public void setReqTime(Long reqTime) {
this.reqTime = reqTime;
}

public Long getImpTime() {
return impTime;
}

public void setImpTime(Long impTime) {
this.impTime = impTime;
}

public String getReq_id() {
return req_id;
}

public Long getClickTime() {
return clickTime;
}

public void setClickTime(Long clickTime) {
this.clickTime = clickTime;
}

public void setReq_id(String req_id) {
this.req_id = req_id;
}

public Long getReq() {
return req;
}

public void setReq(Long req) {
this.req = req;
}

public Long getImp() {
return imp;
}

public void setImp(Long imp) {
this.imp = imp;
}

public Long getClick() {
return click;
}

public void setClick(Long click) {
this.click = click;
}

public ReportLogPojo(String req_id, String app_id, Long reqTime, Long impTime, Long clickTime, Long req, Long imp, Long click) {
this.app_id = app_id;
this.impTime = impTime;
this.req_id = req_id;
this.req = req;
this.imp = imp;
this.click = click;
this.reqTime = reqTime;
this.clickTime = clickTime;
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
package com.flink.learn.entry;

import com.flink.common.core.EnvironmentalKey;
import com.flink.common.core.FlinkLearnPropertiesUtil;
import com.flink.common.deserialize.TopicOffsetJsonEventtimeDeserialize;
import com.flink.common.deserialize.TopicOffsetTimeStampMsgDeserialize;
import com.flink.common.java.core.FlinkEvnBuilder;
import com.flink.common.java.manager.KafkaSourceManager;
import com.flink.common.kafka.KafkaManager;
import com.flink.learn.bean.ReportLogPojo;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

public class FlinkStreamAttributEntry {
public static StreamExecutionEnvironment streamEnv = null;

// // test2: {"ts":5,"msg":"hello3"}
// // test1 : {"ts":5,"msg":"hello2"} {"ts":10,"msg":"hello"} {"ts":115,"msg":"hello"}
// // test3:
// // 输出三个;
// // test1: {"ts":20,"msg":"hello"} 不输出 ,因为超过 10s了 。join不了test2的5s数据
// // test2: {"ts":10,"msg":"hello"}

/**
* 并行度不能太多,否则报 rocksdb
*
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
FlinkLearnPropertiesUtil.init(EnvironmentalKey.LOCAL_PROPERTIES_PATH(),
"WordCountEntry");
streamEnv = FlinkEvnBuilder.buildStreamingEnv(FlinkLearnPropertiesUtil.param(),
FlinkLearnPropertiesUtil.CHECKPOINT_PATH(),
FlinkLearnPropertiesUtil.CHECKPOINT_INTERVAL());
SingleOutputStreamOperator<ReportLogPojo> req =
KafkaSourceManager.getKafkaDataStream(streamEnv,
"test",
"localhost:9092",
"latest", new TopicOffsetJsonEventtimeDeserialize())
.assignTimestampsAndWatermarks(
new BoundedOutOfOrdernessTimestampExtractor<KafkaManager.KafkaTopicOffsetTimeMsg>(Time.seconds(10)) {
@Override
public long extractTimestamp(KafkaManager.KafkaTopicOffsetTimeMsg element) {
return element.ts();
}
})
.keyBy((KeySelector<KafkaManager.KafkaTopicOffsetTimeMsg, String>) value -> value.msg())
.process(new KeyedProcessFunction<String, KafkaManager.KafkaTopicOffsetTimeMsg, ReportLogPojo>() {
ValueState<Boolean> has = null;

@Override
public void open(Configuration parameters) throws Exception {
has = getRuntimeContext().getState(new ValueStateDescriptor<Boolean>("has", Types.BOOLEAN, false));
}

@Override
public void processElement(KafkaManager.KafkaTopicOffsetTimeMsg v1, Context context, Collector<ReportLogPojo> collector) throws Exception {
if (!has.value()) {
has.update(true);
collector.collect(new ReportLogPojo(v1.msg(), v1.msg(), v1.ts(), 0L, 0L, 1L, 0L, 0L));
}
}
});

SingleOutputStreamOperator<ReportLogPojo> cd2 =
KafkaSourceManager.getKafkaDataStream(streamEnv,
"test2",
"localhost:9092",
"latest", new TopicOffsetJsonEventtimeDeserialize())
.assignTimestampsAndWatermarks(
new BoundedOutOfOrdernessTimestampExtractor<KafkaManager.KafkaTopicOffsetTimeMsg>(Time.seconds(10)) {
@Override
public long extractTimestamp(KafkaManager.KafkaTopicOffsetTimeMsg element) {
return element.ts();
}
})
.keyBy((KeySelector<KafkaManager.KafkaTopicOffsetTimeMsg, String>) value -> value.msg())
.process(new KeyedProcessFunction<String, KafkaManager.KafkaTopicOffsetTimeMsg, ReportLogPojo>() {
ValueState<Boolean> has = null;

@Override
public void open(Configuration parameters) throws Exception {
has = getRuntimeContext().getState(new ValueStateDescriptor<Boolean>("has2", Types.BOOLEAN, false));
}

@Override
public void processElement(KafkaManager.KafkaTopicOffsetTimeMsg v1, Context context, Collector<ReportLogPojo> collector) throws Exception {
if (!has.value()) {
has.update(true);
collector.collect(new ReportLogPojo(v1.msg(), v1.msg(), 0L, v1.ts(), 0L, 0L, 0L, 0L));
}
}
});


SingleOutputStreamOperator<ReportLogPojo> cd3 =
KafkaSourceManager.getKafkaDataStream(streamEnv,
"test3",
"localhost:9092",
"latest", new TopicOffsetJsonEventtimeDeserialize())
.assignTimestampsAndWatermarks(
new BoundedOutOfOrdernessTimestampExtractor<KafkaManager.KafkaTopicOffsetTimeMsg>(Time.seconds(10)) {
@Override
public long extractTimestamp(KafkaManager.KafkaTopicOffsetTimeMsg element) {
return element.ts();
}
})
.keyBy((KeySelector<KafkaManager.KafkaTopicOffsetTimeMsg, String>) value -> value.msg())
.process(new KeyedProcessFunction<String, KafkaManager.KafkaTopicOffsetTimeMsg, ReportLogPojo>() {
ValueState<Boolean> has = null;

@Override
public void open(Configuration parameters) throws Exception {
has = getRuntimeContext().getState(new ValueStateDescriptor<Boolean>("has3", Types.BOOLEAN, false));
}

@Override
public void processElement(KafkaManager.KafkaTopicOffsetTimeMsg v1, Context context, Collector<ReportLogPojo> collector) throws Exception {
if (!has.value()) {
has.update(true);
collector.collect(new ReportLogPojo(v1.msg(), v1.msg(), 0L, 0L, v1.ts(), 0L, 0L, 0L));
}
}
});
run(req, cd2, cd3);
streamEnv.execute();
}


public static void run(SingleOutputStreamOperator<ReportLogPojo> req,
SingleOutputStreamOperator<ReportLogPojo> impsrc,
SingleOutputStreamOperator<ReportLogPojo> clicksrc) {

SingleOutputStreamOperator<ReportLogPojo> imp = req.keyBy((KeySelector<ReportLogPojo, String>) value -> value.req_id)
.intervalJoin(
impsrc.keyBy((KeySelector<ReportLogPojo, String>) value -> value.req_id))
.between(Time.seconds(-10), Time.seconds(10)) // 前后10s
.process(new ProcessJoinFunction<ReportLogPojo, ReportLogPojo, ReportLogPojo>() {
@Override
public void processElement(ReportLogPojo v1, ReportLogPojo v2, Context context, Collector<ReportLogPojo> collector) throws Exception {
if (v1.reqTime <= v2.impTime) {
collector.collect(new ReportLogPojo(v1.req_id, v1.app_id, v1.reqTime, v2.impTime, 0L, 0L, 1L, 0L));
}
}
});

SingleOutputStreamOperator<ReportLogPojo> click = imp.keyBy((KeySelector<ReportLogPojo, String>) v -> v.req_id)
.intervalJoin(
clicksrc.keyBy((KeySelector<ReportLogPojo, String>) value -> value.req_id))
.between(Time.seconds(-10), Time.seconds(10))
.process(new ProcessJoinFunction<ReportLogPojo, ReportLogPojo, ReportLogPojo>() {
@Override
public void processElement(ReportLogPojo imp, ReportLogPojo click, Context context, Collector<ReportLogPojo> collector) throws Exception {
if (click.clickTime >= imp.impTime) {
collector.collect(new ReportLogPojo(imp.req_id, imp.app_id, imp.reqTime, imp.impTime, click.clickTime, 0L, 0L, 1L));
}
}
});


req
.map(x -> new Tuple2<>(x.reqTime / 1000L, x.req))
.returns(Types.TUPLE(Types.LONG, Types.LONG))
.keyBy(x -> x.f0)
.sum(1)
.print();


imp.map(x -> new Tuple2<>(x.reqTime / 1000L, x.imp))
.returns(Types.TUPLE(Types.LONG, Types.LONG))
.keyBy(x -> x.f0)
.sum(1)
.print();


click.map(x -> new Tuple2<>(x.reqTime / 1000L, x.click))
.returns(Types.TUPLE(Types.LONG, Types.LONG))
.keyBy(x -> x.f0)
.sum(1)
.print();

}
}
Loading

0 comments on commit ff1c1d4

Please sign in to comment.