Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kudusource to kudusink #969

Closed
huaxiapanda opened this issue Jun 17, 2022 · 0 comments · Fixed by #970
Closed

kudusource to kudusink #969

huaxiapanda opened this issue Jun 17, 2022 · 0 comments · Fixed by #970
Labels
bug Something isn't working

Comments

@huaxiapanda
Copy link

场景 kuduSource to KuduSink
json
{
"job": {
"content": [
{
"reader": {
"name": "kudureader",
"parameter": {
"column": [
{
"name": "REPORT_TIME",
"type": "UNIXTIME_MICROS"
},
{
"name": "VEHICLE_TAG",
"type": "STRING"
},
{
"name": "REV",
"type": "INT32"
},
{
"name": "LONGITUDE",
"type": "DOUBLE"
},
{
"name": "LATITUDE",
"type": "DOUBLE"
},
{
"name": "SPEED",
"type": "DOUBLE"
},
{
"name": "HEADING",
"type": "DOUBLE"
},
{
"name": "TRAIN_ASSIGNMENT",
"type": "STRING"
},
{
"name": "PREDICTABLE",
"type": "INT32"
}
],
"masters": "172.29.207.143:7051,172.29.207.143:7151,172.29.207.143:7251",
"table": "sfmta_kudu",
"readMode": "read_latest",
"workerCount": 2,
"operationTimeout": 30000,
"adminOperationTimeout": 30000,
"queryTimeout": 30000,
"batchSizeBytes": 1048576
}
},
"writer": {
"parameter": {
"column": [
{
"name": "REPORT_TIME",
"type": "UNIXTIME_MICROS"
},
{
"name": "VEHICLE_TAG",
"type": "STRING"
},
{
"name": "REV",
"type": "INT32"
},
{
"name": "LONGITUDE",
"type": "DOUBLE"
},
{
"name": "LATITUDE",
"type": "DOUBLE"
},
{
"name": "SPEED",
"type": "DOUBLE"
},
{
"name": "HEADING",
"type": "DOUBLE"
},
{
"name": "TRAIN_ASSIGNMENT",
"type": "STRING"
},
{
"name": "PREDICTABLE",
"type": "INT32"
}
],
"masters": "localhost:7051",
"table": "sfmta_kudu_one",
"flushMode": "manual_flush",
"writeMode": "append",
"batchSizeBytes": 1048576
},
"name": "kuduwriter"
}
}
],
"setting": {
"restore": {
"maxRowNumForCheckpoint": 0,
"isRestore": false,
"restoreColumnName": "",
"restoreColumnIndex": 0
},
"errorLimit": {
"record": 100
},
"speed": {
"bytes": 0,
"channel": 1
},
"log": {
"isLogger": false,
"level": "debug",
"path": "",
"pattern": ""
}
}
}
}

命令
bin/start-chunjun -mode local -jobType sync -job ./chunjun-examples/json/kudu/kudu_stream_test.json -chunjunDistDir chunjun-dist

结果

java.lang.ClassCastException: java.sql.Timestamp cannot be cast to java.lang.Long
at com.dtstack.chunjun.connector.kudu.sink.KuduOutputFormat.writeSingleRecordInternal(KuduOutputFormat.java:77)
at com.dtstack.chunjun.sink.format.BaseRichOutputFormat.writeSingleRecord(BaseRichOutputFormat.java:466)
at com.dtstack.chunjun.sink.format.BaseRichOutputFormat.writeRecord(BaseRichOutputFormat.java:272)
at com.dtstack.chunjun.sink.format.BaseRichOutputFormat.writeRecord(BaseRichOutputFormat.java:92)
at com.dtstack.chunjun.sink.DtOutputFormatSinkFunction.invoke(DtOutputFormatSinkFunction.java:117)
at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:54)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:191)
at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:204)
at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:174)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:398)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:619)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:583)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:758)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:573)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassCastException: java.sql.Timestamp cannot be cast to java.lang.Long
at org.apache.flink.table.data.GenericRowData.getLong(GenericRowData.java:154)
at com.dtstack.chunjun.connector.kudu.converter.KuduColumnConverter.lambda$createExternalConverter$3ab0349a$5(KuduColumnConverter.java:175)
at com.dtstack.chunjun.connector.kudu.converter.KuduColumnConverter.lambda$wrapIntoNullableExternalConverter$4be2f56a$1(KuduColumnConverter.java:86)
at com.dtstack.chunjun.connector.kudu.converter.KuduColumnConverter.toExternal(KuduColumnConverter.java:106)
at com.dtstack.chunjun.connector.kudu.converter.KuduColumnConverter.toExternal(KuduColumnConverter.java:52)
at com.dtstack.chunjun.connector.kudu.sink.KuduOutputFormat.writeSingleRecordInternal(KuduOutputFormat.java:72)
... 16 more
', fieldName='null', createTime=2022-06-16 13:55:57.308]

分析
kudu UNIXTIME_MICROS原先设计映射成flink table的bigint类型,但是在进行kudusink的时候,RowData的类型是Timestamp因此,出现类型转换异常

@huaxiapanda huaxiapanda added the bug Something isn't working label Jun 17, 2022
huaxiapanda pushed a commit to huaxiapanda/chunjun that referenced this issue Jun 17, 2022
FlechazoW pushed a commit that referenced this issue Jun 19, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

1 participant