Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[tis-flink-cdc-postgresql-plugin] java.lang.ClassCastException: java.sql.Timestamp cannot be cast to java.time.Instant #293

Closed
baisui1981 opened this issue Dec 8, 2023 · 3 comments
Assignees
Labels
bug Something isn't working
Milestone

Comments

@baisui1981
Copy link
Member

baisui1981 commented Dec 8, 2023

Exception log

An error occurred while using the [tis-flink-cdc-postgresql-plugin] for incremental synchronization:
2023-12-08 10:31:15,068 ERROR io.debezium.relational.TableSchemaBuilder                    [] - Failed to properly convert data value for 'public.test.cjsj' of type timestamp for row [611555857792105413, 1, 611555857792105397, 611555857792105411, null, 0, 王五', null, null, null, null, null, null, 1, 身份证, 8325611964509998, XXX, null, 142, null, 450000, null, null, null, 1***********, null, null, null, null, 1, null, null, null, null, 1***********, null, null, 1, null, null, null, testuser, 2023-10-24T17:52:09.000+0800, testuser, 2023-10-24T17:56:29.000+0800, null, null, null, null, null]:
java.lang.ClassCastException: java.sql.Timestamp cannot be cast to java.time.Instant
	at com.qlangtech.plugins.incr.flink.cdc.postgresql.PGDateTimeConverter.convertTimestamp(PGDateTimeConverter.java:71) ~[tis-flink-cdc-postgresql-plugin.jar:3.8.0]
	at io.debezium.relational.CustomConverterRegistry.lambda$getValueConverter$0(CustomConverterRegistry.java:150) ~[debezium-core-1.5.4.Final.jar:1.5.4.Final]
	at io.debezium.relational.TableSchemaBuilder.lambda$createValueGenerator$5(TableSchemaBuilder.java:264) ~[debezium-core-1.5.4.Final.jar:1.5.4.Final]
	at io.debezium.relational.TableSchema.valueFromColumnData(TableSchema.java:141) ~[debezium-core-1.5.4.Final.jar:1.5.4.Final]
	at io.debezium.relational.RelationalChangeRecordEmitter.emitReadRecord(RelationalChangeRecordEmitter.java:85) ~[debezium-core-1.5.4.Final.jar:1.5.4.Final]
	at io.debezium.relational.RelationalChangeRecordEmitter.emitChangeRecords(RelationalChangeRecordEmitter.java:48) ~[debezium-core-1.5.4.Final.jar:1.5.4.Final]
	at io.debezium.pipeline.EventDispatcher.dispatchSnapshotEvent(EventDispatcher.java:155) ~[debezium-core-1.5.4.Final.jar:1.5.4.Final]
	at io.debezium.relational.RelationalSnapshotChangeEventSource.createDataEventsForTable(RelationalSnapshotChangeEventSource.java:377) ~[debezium-core-1.5.4.Final.jar:1.5.4.Final]
	at io.debezium.relational.RelationalSnapshotChangeEventSource.createDataEvents(RelationalSnapshotChangeEventSource.java:305) ~[debezium-core-1.5.4.Final.jar:1.5.4.Final]
	at io.debezium.relational.RelationalSnapshotChangeEventSource.doExecute(RelationalSnapshotChangeEventSource.java:138) ~[debezium-core-1.5.4.Final.jar:1.5.4.Final]
	at io.debezium.pipeline.source.AbstractSnapshotChangeEventSource.execute(AbstractSnapshotChangeEventSource.java:71) ~[debezium-core-1.5.4.Final.jar:1.5.4.Final]
	at io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:110) ~[debezium-core-1.5.4.Final.jar:1.5.4.Final]
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_392]
	at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_392]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_392]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_392]
	at java.lang.Thread.run(Thread.java:750) [?:1.8.0_392]

DDL

CREATE TABLE public.test (
	testid numeric NOT NULL,
	sxh int4 NULL,
	testmc varchar(2000) NULL,
	zjzl varchar(2) NULL,
	zjzlmc varchar(100) NULL,
	zjh varchar(200) NULL,
	hjszss varchar(6) NULL,
	dh varchar(50) NULL,
	dz varchar(200) NULL,
	testlx varchar(2) NULL,
	testlxmc varchar(100) NULL,
	yxbz int4 NULL,
	cjr varchar(50) NULL,
	cjsj timestamp NULL,
	zhxgr varchar(50) NULL,
	zhxgsj timestamp NULL,
	scr varchar(50) NULL,
	scsj timestamp NULL,
	bz varchar(2000) NULL,
	CONSTRAINT test_pkey PRIMARY KEY (testid)
);

INSERT

INSERT INTO public.test (testid, sxh, testmc, zjzl, zjzlmc, zjh, hjszss, dh, dz, testlx, testlxmc, yxbz, cjr, cjsj, zhxgr, zhxgsj, scr, scsj, bz) VALUES(611555862087072406, 0, '王五', '1', '身份证', '8325611964509998', '400000', '19977589876', '测试地址5栋3单元2楼', '1', NULL, 1, 'testuser', '2023-10-24 18:18:43.000', 'testuser_dengji', '2023-10-24 18:19:49.000', NULL, NULL, NULL);

UPDATE

UPDATE public.test SET sxh=1, testmc='test' WHERE testid=611555862087072406;
@baisui1981 baisui1981 added the bug Something isn't working label Dec 8, 2023
@baisui1981 baisui1981 added this to the v4.0.0 milestone Dec 8, 2023
@taochunda
Copy link

taochunda commented Dec 11, 2023

I used the following to fix the date conversion error:

protected String convertTimestamp(Object input) {
        if (input != null) {
            if (input instanceof Timestamp) {
                Timestamp timestamp = (Timestamp) input;
                ZonedDateTime zdt = timestamp.toInstant().atZone(ZoneId.systemDefault());
                return zdt.withZoneSameInstant(ZoneId.of("Asia/Shanghai")).format(DateTimeFormatter.ISO_DATE_TIME);
            }
        }
        return null;
    }

But it throws a new exception:

test_dto2Rowdata -> skipUpdateBeforeEvent -> Sink: test (1/1)#0 (a4c1454f8df593d68394f513d7629994) switched from RUNNING to FAILED with failure cause: com.esotericsoftware.kryo.KryoException: java.lang.UnsupportedOperationException
Serialization trace:
fields (org.apache.kafka.connect.data.ConnectSchema)
schema (org.apache.kafka.connect.data.Struct)
	at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
	at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
	at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
	at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
	at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
	at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
	at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143)
	at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
	at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657)
	at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:273)
	at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.copy(PojoSerializer.java:235)
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:69)
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
	at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:317)
	at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:411)
	at com.ververica.cdc.debezium.internal.DebeziumChangeFetcher.emitRecordsUnderCheckpointLock(DebeziumChangeFetcher.java:239)
	at com.ververica.cdc.debezium.internal.DebeziumChangeFetcher.handleBatch(DebeziumChangeFetcher.java:225)
	at com.ververica.cdc.debezium.internal.DebeziumChangeFetcher.runFetchLoop(DebeziumChangeFetcher.java:151)
	at com.ververica.cdc.debezium.DebeziumSourceFunction.run(DebeziumSourceFunction.java:439)
	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
	at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269)
Caused by: java.lang.UnsupportedOperationException
	at java.util.Collections$UnmodifiableCollection.add(Collections.java:1055)
	at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:116)
	at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22)
	at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
	at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
	... 24 more

通过查看flink docker 使用的kryo的版本为:

			<dependency>
				<groupId>com.esotericsoftware.kryo</groupId>
				<artifactId>kryo</artifactId>
				<version>2.24.0</version>
			</dependency>

原因找到了,如下图:
截屏2024-02-21 20 38 59

@taochunda
Copy link

UPDATE

UPDATE public.test SET sxh=1, testmc='test' WHERE testid=611555862087072406;

@baisui1981 baisui1981 self-assigned this Feb 19, 2024
baisui1981 added a commit that referenced this issue Feb 22, 2024
@baisui1981
Copy link
Member Author

由于 testid 为 numeric 类型通过 postgresql JDBC接口 通过metadata获取得到为decimal类型,但是 precision与 scale 均为0

testid numeric NOT NULL,

导致在flink的内部类型为 decimal( 38,0 )

在实例postgresql->mysql 场景中
生成的 create table ddl 为

testid decimal(10,0)

最终在flink sql方式同步过程中的执行计划为:

 Source: 192.168.28.201:5432_tis
+- Process
   +- test
      +- [53]:TableSourceScan(table=[[*anonymous_datastream_source$14*]], fields=[testid, sxh, testmc, zjzl, zjzlmc, zjh, hjszss, dh, dz, testlx, testlxmc, yxbz, cjr, cjsj, zhxgr, zhxgsj, scr, scsj, bz])
         +- [54]:DropUpdateBefore
            +- [55]:Calc(select=[CAST(testid AS DECIMAL(10, 0)) AS testid, sxh, testmc, zjzl, zjzlmc, zjh, hjszss, dh, dz, testlx, testlxmc, yxbz, cjr, cjsj, zhxgr, zhxgsj, scr, scsj, bz])
               +- [56]:ConstraintEnforcer[NotNullEnforcer(fields=[testid])]
                  +- [56]:Sink(table=[default_catalog.default_database.test], targetColumns=[[0],[1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15],[16],[17],[18]], fields=[testid, sxh, testmc, zjzl, zjzlmc, zjh, hjszss, dh, dz, testlx, testlxmc, yxbz, cjr, cjsj, zhxgr, zhxgsj, scr, scsj, bz])

多了一个CAST(testid AS DECIMAL(10, 0)) AS testid的转化,导致最终在flink执行过程,testid 字段值经过[56]处理之后变为空,原因可能为 decimal( 38,0 ) 转化为 DECIMAL(10, 0) 精度变低,值会丢掉

baisui1981 added a commit to qlangtech/plugins that referenced this issue Feb 26, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

2 participants