Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MySQL创建增量通道到ES时,部署报错 #332

Closed
jiewenk opened this issue Jul 4, 2024 · 14 comments
Closed

MySQL创建增量通道到ES时,部署报错 #332

jiewenk opened this issue Jul 4, 2024 · 14 comments
Labels
bug Something isn't working
Milestone

Comments

@jiewenk
Copy link

jiewenk commented Jul 4, 2024

在已经创建了一个MySQL到ES的批量构建并执行成功的前提下,继续创建MySQL到ES的一个增量通道,在部署时的第三步报错
image
异常信息:

〇〇 start to publish asdfasdfasdf check environment'asdfasdfasdf check environment'
Config uses fallback configuration key 'jobmanager.rpc.address' instead of key 'rest.address'
✔✔ successful to publish asdfasdfasdf check environment'asdfasdfasdf check environment'
〇〇 start to publish Incr asdfasdfasdf Compile And Package'Incr asdfasdfasdf Compile And Package'
sinkFactories size:1
skip compile,file:AsdfasdfasdfListener.scala ,reason lastSuccessfulCompileTime:20240704103906 > fileModfiyTime:20240704103905
 ready to be compile file count:0,fileNames:
pkgJar:/opt/data/libs/plugins/flink/asdfasdfasdf/WEB-INF/lib/asdfasdfasdf-incr.jar,tpi:/opt/data/libs/plugins/flink/asdfasdfasdf.tpi
✔✔ successful to publish Incr asdfasdfasdf Compile And Package'Incr asdfasdfasdf Compile And Package'
〇〇 start to publish Incr asdfasdfasdf Deploy'Incr asdfasdfasdf Deploy'
streamUberJar path:/opt/data/cfg_repo/streamscript/asdfasdfasdf/0/asdfasdfasdf-incr.jar
sinkFactories size:1
target xstream file is not exist:/opt/data/cfg_repo/tis_plugin_config/ap/asdfasdfasdf/com.qlangtech.tis.plugin.datax.SelectedTabExtend_incr.xml
target xstream file is not exist:/opt/data/cfg_repo/tis_plugin_config/ap/asdfasdfasdf/com.qlangtech.tis.plugin.datax.SelectedTabExtend_batch.xml
target xstream file is not exist:/opt/data/cfg_repo/tis_plugin_config/ap/asdfasdfasdf/com.qlangtech.tis.plugin.datax.SelectedTabExtend_incr.xml
target xstream file is not exist:/opt/data/cfg_repo/tis_plugin_config/ap/asdfasdfasdf/com.qlangtech.tis.plugin.datax.SelectedTabExtend_batch.xml
target xstream file is not exist:/opt/data/cfg_repo/tis_plugin_config/ap/asdfasdfasdf/com.qlangtech.tis.plugin.datax.SelectedTabExtend_incr.xml
target xstream file is not exist:/opt/data/cfg_repo/tis_plugin_config/ap/asdfasdfasdf/com.qlangtech.tis.plugin.datax.SelectedTabExtend_batch.xml
target xstream file is not exist:/opt/data/cfg_repo/tis_plugin_config/ap/asdfasdfasdf/com.qlangtech.tis.plugin.datax.SelectedTabExtend_incr.xml
target xstream file is not exist:/opt/data/cfg_repo/tis_plugin_config/ap/asdfasdfasdf/com.qlangtech.tis.plugin.datax.SelectedTabExtend_batch.xml
Setting server pool to a list of 1 servers: [http://192.168.17.17:9200]
Using single thread/connection supporting basic connection manager
Using default GSON instance
Node Discovery disabled...
Idle connection reaping disabled...
Authentication cache set for preemptive authentication
target xstream file is not exist:/opt/data/cfg_repo/tis_plugin_config/ap/asdfasdfasdf/com.qlangtech.tis.plugin.datax.SelectedTabExtend_incr.xml
target xstream file is not exist:/opt/data/cfg_repo/tis_plugin_config/ap/asdfasdfasdf/com.qlangtech.tis.plugin.datax.SelectedTabExtend_batch.xml
target xstream file is not exist:/opt/data/cfg_repo/tis_plugin_config/ap/asdfasdfasdf/com.qlangtech.tis.plugin.datax.SelectedTabExtend_incr.xml
target xstream file is not exist:/opt/data/cfg_repo/tis_plugin_config/ap/asdfasdfasdf/com.qlangtech.tis.plugin.datax.SelectedTabExtend_batch.xml
Variable character string length must be between 1 and 2147483647 (both inclusive).
org.apache.flink.table.api.ValidationException: Variable character string length must be between 1 and 2147483647 (both inclusive).
        at org.apache.flink.table.types.logical.VarCharType.<init>(VarCharType.java:75)
        at com.qlangtech.tis.plugins.incr.flink.cdc.AbstractRowDataMapper$DefaultTypeVisitor.varcharType(AbstractRowDataMapper.java:285)
        at com.qlangtech.tis.plugins.incr.flink.cdc.AbstractRowDataMapper$DefaultTypeVisitor.varcharType(AbstractRowDataMapper.java:113)
        at com.qlangtech.tis.plugin.ds.DataType.accept(DataType.java:265)
        at com.qlangtech.tis.plugins.incr.flink.cdc.AbstractRowDataMapper.mapFlinkCol(AbstractRowDataMapper.java:300)
        at com.qlangtech.tis.plugins.incr.flink.cdc.AbstractRowDataMapper.lambda$getAllTabColsMeta$1(AbstractRowDataMapper.java:100)
        at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
        at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384)
        at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
        at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
        at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
        at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
        at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566)
        at com.qlangtech.tis.plugins.incr.flink.cdc.AbstractRowDataMapper.getAllTabColsMeta(AbstractRowDataMapper.java:101)
        at com.qlangtech.tis.plugins.incr.flink.cdc.AbstractRowDataMapper.getAllTabColsMeta(AbstractRowDataMapper.java:93)
        at com.qlangtech.tis.plugins.incr.flink.connector.elasticsearch7.ElasticSearchSinkFactory.createSinkFunction(ElasticSearchSinkFactory.java:188)
        at com.qlangtech.tis.plugin.PluginAndCfgsSnapshot.lambda$createFlinkIncrJobManifestCfgAttrs$3(PluginAndCfgsSnapshot.java:309)
        at com.qlangtech.tis.util.RobustReflectionConverter2$PluginMetas.collectMetas(RobustReflectionConverter2.java:90)
        at com.qlangtech.tis.plugin.PluginAndCfgsSnapshot.createFlinkIncrJobManifestCfgAttrs(PluginAndCfgsSnapshot.java:302)
        at com.qlangtech.tis.manage.common.incr.UberJarUtil.createStreamUberJar(UberJarUtil.java:43)
        at com.qlangtech.plugins.incr.flink.launch.TISFlinkCDCStreamFactory.deploy(TISFlinkCDCStreamFactory.java:167)
        at com.qlangtech.tis.coredefine.module.action.TISK8sDelegate.deploy(TISK8sDelegate.java:141)
        at com.qlangtech.tis.coredefine.module.action.CoreAction.lambda$getFlinkJobWorkingOrchestrate$6(CoreAction.java:696)
        at com.qlangtech.tis.datax.job.JobResName$1.accept(JobResName.java:54)
        at com.qlangtech.tis.datax.job.SubJobResName.execute(SubJobResName.java:36)
        at com.qlangtech.tis.datax.job.JobResName.execSubJob(JobResName.java:71)
        at com.qlangtech.tis.coredefine.module.action.CoreAction.lambda$null$2(CoreAction.java:635)
        at com.qlangtech.tis.datax.job.ServerLaunchToken.writeLaunchToken(ServerLaunchToken.java:420)
        at com.qlangtech.tis.coredefine.module.action.CoreAction.lambda$launchIncrSyncChannel$3(CoreAction.java:630)
        at com.qlangtech.tis.datax.job.DefaultSSERunnable.execute(DefaultSSERunnable.java:112)
        at com.qlangtech.tis.coredefine.module.action.CoreAction.launchIncrSyncChannel(CoreAction.java:628)
        at com.qlangtech.tis.coredefine.module.action.CoreAction.doDeployIncrSyncChannal(CoreAction.java:603)
        at sun.reflect.GeneratedMethodAccessor278.invoke(Unknown Source)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at com.qlangtech.tis.runtime.module.action.BasicModule.execute(BasicModule.java:176)
        at sun.reflect.GeneratedMethodAccessor38.invoke(Unknown Source)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at ognl.OgnlRuntime.invokeMethod(OgnlRuntime.java:899)
        at ognl.OgnlRuntime.callAppropriateMethod(OgnlRuntime.java:1544)
        at ognl.ObjectMethodAccessor.callMethod(ObjectMethodAccessor.java:68)
        at com.opensymphony.xwork2.ognl.accessor.XWorkMethodAccessor.callMethodWithDebugInfo(XWorkMethodAccessor.java:98)
        at com.opensymphony.xwork2.ognl.accessor.XWorkMethodAccessor.callMethod(XWorkMethodAccessor.java:90)
        at ognl.OgnlRuntime.callMethod(OgnlRuntime.java:1620)
        at ognl.ASTMethod.getValueBody(ASTMethod.java:91)
        at ognl.SimpleNode.evaluateGetValueBody(SimpleNode.java:212)
        at ognl.SimpleNode.getValue(SimpleNode.java:258)
        at ognl.Ognl.getValue(Ognl.java:470)
        at ognl.Ognl.getValue(Ognl.java:434)
        at com.opensymphony.xwork2.ognl.OgnlUtil$3.execute(OgnlUtil.java:401)
        at com.opensymphony.xwork2.ognl.OgnlUtil.compileAndExecuteMethod(OgnlUtil.java:453)
        at com.opensymphony.xwork2.ognl.OgnlUtil.callMethod(OgnlUtil.java:399)
        at com.opensymphony.xwork2.DefaultActionInvocation.invokeAction(DefaultActionInvocation.java:438)
        at com.opensymphony.xwork2.DefaultActionInvocation.invokeActionOnly(DefaultActionInvocation.java:293)
        at com.opensymphony.xwork2.DefaultActionInvocation.invoke(DefaultActionInvocation.java:254)
        at org.apache.struts2.interceptor.debugging.DebuggingInterceptor.intercept(DebuggingInterceptor.java:250)
        at com.opensymphony.xwork2.DefaultActionInvocation.invoke(DefaultActionInvocation.java:249)
        at com.opensymphony.xwork2.interceptor.DefaultWorkflowInterceptor.doIntercept(DefaultWorkflowInterceptor.java:179)
        at com.opensymphony.xwork2.interceptor.MethodFilterInterceptor.intercept(MethodFilterInterceptor.java:99)
        at com.opensymphony.xwork2.DefaultActionInvocation.invoke(DefaultActionInvocation.java:249)
        at com.opensymphony.xwork2.validator.ValidationInterceptor.doIntercept(ValidationInterceptor.java:263)
        at org.apache.struts2.interceptor.validation.AnnotationValidationInterceptor.doIntercept(AnnotationValidationInterceptor.java:49)
        at com.opensymphony.xwork2.interceptor.MethodFilterInterceptor.intercept(MethodFilterInterceptor.java:99)
        at com.opensymphony.xwork2.DefaultActionInvocation.invoke(DefaultActionInvocation.java:249)
        at com.opensymphony.xwork2.interceptor.ConversionErrorInterceptor.doIntercept(ConversionErrorInterceptor.java:142)
        at com.opensymphony.xwork2.interceptor.MethodFilterInterceptor.intercept(MethodFilterInterceptor.java:99)
        at com.opensymphony.xwork2.DefaultActionInvocation.invoke(DefaultActionInvocation.java:249)
        at com.opensymphony.xwork2.interceptor.ParametersInterceptor.doIntercept(ParametersInterceptor.java:137)
        at com.opensymphony.xwork2.interceptor.MethodFilterInterceptor.intercept(MethodFilterInterceptor.java:99)
        at com.opensymphony.xwork2.DefaultActionInvocation.invoke(DefaultActionInvocation.java:249)
        at com.opensymphony.xwork2.interceptor.ParametersInterceptor.doIntercept(ParametersInterceptor.java:137)
        at com.opensymphony.xwork2.interceptor.MethodFilterInterceptor.intercept(MethodFilterInterceptor.java:99)
        at com.opensymphony.xwork2.DefaultActionInvocation.invoke(DefaultActionInvocation.java:249)
        at com.opensymphony.xwork2.interceptor.StaticParametersInterceptor.intercept(StaticParametersInterceptor.java:201)
        at com.opensymphony.xwork2.DefaultActionInvocation.invoke(DefaultActionInvocation.java:249)
        at org.apache.struts2.interceptor.MultiselectInterceptor.intercept(MultiselectInterceptor.java:67)
        at com.opensymphony.xwork2.DefaultActionInvocation.invoke(DefaultActionInvocation.java:249)
        at org.apache.struts2.interceptor.DateTextFieldInterceptor.intercept(DateTextFieldInterceptor.java:133)
        at com.opensymphony.xwork2.DefaultActionInvocation.invoke(DefaultActionInvocation.java:249)
        at org.apache.struts2.interceptor.CheckboxInterceptor.intercept(CheckboxInterceptor.java:89)
        at com.opensymphony.xwork2.DefaultActionInvocation.invoke(DefaultActionInvocation.java:249)
        at org.apache.struts2.interceptor.FileUploadInterceptor.intercept(FileUploadInterceptor.java:243)
        at com.opensymphony.xwork2.DefaultActionInvocation.invoke(DefaultActionInvocation.java:249)
        at com.opensymphony.xwork2.interceptor.ModelDrivenInterceptor.intercept(ModelDrivenInterceptor.java:101)
        at com.opensymphony.xwork2.DefaultActionInvocation.invoke(DefaultActionInvocation.java:249)
        at com.opensymphony.xwork2.interceptor.ScopedModelDrivenInterceptor.intercept(ScopedModelDrivenInterceptor.java:142)
        at com.opensymphony.xwork2.DefaultActionInvocation.invoke(DefaultActionInvocation.java:249)
        at com.opensymphony.xwork2.interceptor.ChainingInterceptor.intercept(ChainingInterceptor.java:160)
        at com.opensymphony.xwork2.DefaultActionInvocation.invoke(DefaultActionInvocation.java:249)
        at com.opensymphony.xwork2.interceptor.PrepareInterceptor.doIntercept(PrepareInterceptor.java:175)
        at com.opensymphony.xwork2.interceptor.MethodFilterInterceptor.intercept(MethodFilterInterceptor.java:99)
        at com.opensymphony.xwork2.DefaultActionInvocation.invoke(DefaultActionInvocation.java:249)
        at org.apache.struts2.interceptor.I18nInterceptor.intercept(I18nInterceptor.java:121)
        at com.opensymphony.xwork2.DefaultActionInvocation.invoke(DefaultActionInvocation.java:249)
        at org.apache.struts2.interceptor.ServletConfigInterceptor.intercept(ServletConfigInterceptor.java:167)
        at com.opensymphony.xwork2.DefaultActionInvocation.invoke(DefaultActionInvocation.java:249)
        at com.opensymphony.xwork2.interceptor.AliasInterceptor.intercept(AliasInterceptor.java:203)
        at com.opensymphony.xwork2.DefaultActionInvocation.invoke(DefaultActionInvocation.java:249)
        at com.opensymphony.xwork2.interceptor.ExceptionMappingInterceptor.intercept(ExceptionMappingInterceptor.java:196)
        at com.opensymphony.xwork2.DefaultActionInvocation.invoke(DefaultActionInvocation.java:249)
        at com.qlangtech.tis.manage.common.valve.OperationLogInterceptor.doIntercept(OperationLogInterceptor.java:64)
        at com.opensymphony.xwork2.interceptor.MethodFilterInterceptor.intercept(MethodFilterInterceptor.java:99)
        at com.opensymphony.xwork2.DefaultActionInvocation.invoke(DefaultActionInvocation.java:249)
        at com.qlangtech.tis.manage.common.valve.TisExceptionInterceptor.doIntercept(TisExceptionInterceptor.java:89)
        at com.opensymphony.xwork2.interceptor.MethodFilterInterceptor.intercept(MethodFilterInterceptor.java:99)
        at com.opensymphony.xwork2.DefaultActionInvocation.invoke(DefaultActionInvocation.java:249)
        at com.qlangtech.tis.manage.spring.aop.AuthorityCheckAdvice.doIntercept(AuthorityCheckAdvice.java:109)
        at com.opensymphony.xwork2.interceptor.MethodFilterInterceptor.intercept(MethodFilterInterceptor.java:99)
        at com.opensymphony.xwork2.DefaultActionInvocation.invoke(DefaultActionInvocation.java:249)
        at com.opensymphony.xwork2.interceptor.ParametersInterceptor.doIntercept(ParametersInterceptor.java:137)
        at com.opensymphony.xwork2.interceptor.MethodFilterInterceptor.intercept(MethodFilterInterceptor.java:99)
        at com.opensymphony.xwork2.DefaultActionInvocation.invoke(DefaultActionInvocation.java:249)
        at org.apache.struts2.factory.StrutsActionProxy.execute(StrutsActionProxy.java:48)
        at org.apache.struts2.dispatcher.Dispatcher.serviceAction(Dispatcher.java:574)
        at org.apache.struts2.dispatcher.ExecuteOperations.executeAction(ExecuteOperations.java:79)
        at org.apache.struts2.dispatcher.servlet.StrutsServlet.service(StrutsServlet.java:80)
        at javax.servlet.http.HttpServlet.service(HttpServlet.java:790)
        at org.eclipse.jetty.servlet.ServletHolder$NotAsyncServlet.service(ServletHolder.java:1411)
        at org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:763)
        at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1651)
        at com.qlangtech.tis.manage.common.DefaultFilter.doFilter(DefaultFilter.java:180)
        at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1630)
        at org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:567)
        at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:143)
        at org.eclipse.jetty.security.SecurityHandler.handle(SecurityHandler.java:602)
        at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:127)
        at org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:235)
        at org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java:1610)
        at org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:233)
        at org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1377)
        at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:188)
        at org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:507)
        at org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:1580)
        at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:186)
        at org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1292)
        at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141)
        at org.eclipse.jetty.server.handler.HandlerList.handle(HandlerList.java:59)
        at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:127)
        at org.eclipse.jetty.server.Server.handle(Server.java:501)
        at org.eclipse.jetty.server.HttpChannel.lambda$handle$1(HttpChannel.java:383)
        at org.eclipse.jetty.server.HttpChannel.dispatch(HttpChannel.java:556)
        at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:375)
        at org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:273)
        at org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:311)
        at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:105)
        at org.eclipse.jetty.io.ChannelEndPoint$1.run(ChannelEndPoint.java:104)
        at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.runTask(EatWhatYouKill.java:336)
        at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.doProduce(EatWhatYouKill.java:313)
        at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.tryProduce(EatWhatYouKill.java:171)
        at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.run(EatWhatYouKill.java:129)
        at org.eclipse.jetty.util.thread.ReservedThreadExecutor$ReservedThread.run(ReservedThreadExecutor.java:375)
        at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:806)
        at org.eclipse.jetty.util.thread.QueuedThreadPool$Runner.run(QueuedThreadPool.java:938)
        at java.lang.Thread.run(Thread.java:748)
✕✕ faild to publish Incr asdfasdfasdf Deploy'Incr asdfasdfasdf Deploy'
null
java.lang.reflect.InvocationTargetException: null
        at sun.reflect.GeneratedMethodAccessor278.invoke(Unknown Source)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at com.qlangtech.tis.runtime.module.action.BasicModule.execute(BasicModule.java:176)
        at sun.reflect.GeneratedMethodAccessor38.invoke(Unknown Source)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at ognl.OgnlRuntime.invokeMethod(OgnlRuntime.java:899)
        at ognl.OgnlRuntime.callAppropriateMethod(OgnlRuntime.java:1544)
        at ognl.ObjectMethodAccessor.callMethod(ObjectMethodAccessor.java:68)
        at com.opensymphony.xwork2.ognl.accessor.XWorkMethodAccessor.callMethodWithDebugInfo(XWorkMethodAccessor.java:98)
        at com.opensymphony.xwork2.ognl.accessor.XWorkMethodAccessor.callMethod(XWorkMethodAccessor.java:90)
        at ognl.OgnlRuntime.callMethod(OgnlRuntime.java:1620)
        at ognl.ASTMethod.getValueBody(ASTMethod.java:91)
        at ognl.SimpleNode.evaluateGetValueBody(SimpleNode.java:212)
        at ognl.SimpleNode.getValue(SimpleNode.java:258)
        at ognl.Ognl.getValue(Ognl.java:470)
        at ognl.Ognl.getValue(Ognl.java:434)
        at com.opensymphony.xwork2.ognl.OgnlUtil$3.execute(OgnlUtil.java:401)
        at com.opensymphony.xwork2.ognl.OgnlUtil.compileAndExecuteMethod(OgnlUtil.java:453)
        at com.opensymphony.xwork2.ognl.OgnlUtil.callMethod(OgnlUtil.java:399)
        at com.opensymphony.xwork2.DefaultActionInvocation.invokeAction(DefaultActionInvocation.java:438)
        at com.opensymphony.xwork2.DefaultActionInvocation.invokeActionOnly(DefaultActionInvocation.java:293)
        at com.opensymphony.xwork2.DefaultActionInvocation.invoke(DefaultActionInvocation.java:254)
        at org.apache.struts2.interceptor.debugging.DebuggingInterceptor.intercept(DebuggingInterceptor.java:250)
        at com.opensymphony.xwork2.DefaultActionInvocation.invoke(DefaultActionInvocation.java:249)
        at com.opensymphony.xwork2.interceptor.DefaultWorkflowInterceptor.doIntercept(DefaultWorkflowInterceptor.java:179)
        at com.opensymphony.xwork2.interceptor.MethodFilterInterceptor.intercept(MethodFilterInterceptor.java:99)
        at com.opensymphony.xwork2.DefaultActionInvocation.invoke(DefaultActionInvocation.java:249)
        at com.opensymphony.xwork2.validator.ValidationInterceptor.doIntercept(ValidationInterceptor.java:263)
        at org.apache.struts2.interceptor.validation.AnnotationValidationInterceptor.doIntercept(AnnotationValidationInterceptor.java:49)
        at com.opensymphony.xwork2.interceptor.MethodFilterInterceptor.intercept(MethodFilterInterceptor.java:99)
        at com.opensymphony.xwork2.DefaultActionInvocation.invoke(DefaultActionInvocation.java:249)
        at com.opensymphony.xwork2.interceptor.ConversionErrorInterceptor.doIntercept(ConversionErrorInterceptor.java:142)
        at com.opensymphony.xwork2.interceptor.MethodFilterInterceptor.intercept(MethodFilterInterceptor.java:99)
        at com.opensymphony.xwork2.DefaultActionInvocation.invoke(DefaultActionInvocation.java:249)
        at com.opensymphony.xwork2.interceptor.ParametersInterceptor.doIntercept(ParametersInterceptor.java:137)
        at com.opensymphony.xwork2.interceptor.MethodFilterInterceptor.intercept(MethodFilterInterceptor.java:99)
        at com.opensymphony.xwork2.DefaultActionInvocation.invoke(DefaultActionInvocation.java:249)
        at com.opensymphony.xwork2.interceptor.ParametersInterceptor.doIntercept(ParametersInterceptor.java:137)
        at com.opensymphony.xwork2.interceptor.MethodFilterInterceptor.intercept(MethodFilterInterceptor.java:99)
        at com.opensymphony.xwork2.DefaultActionInvocation.invoke(DefaultActionInvocation.java:249)
        at com.opensymphony.xwork2.interceptor.StaticParametersInterceptor.intercept(StaticParametersInterceptor.java:201)
        at com.opensymphony.xwork2.DefaultActionInvocation.invoke(DefaultActionInvocation.java:249)
        at org.apache.struts2.interceptor.MultiselectInterceptor.intercept(MultiselectInterceptor.java:67)
        at com.opensymphony.xwork2.DefaultActionInvocation.invoke(DefaultActionInvocation.java:249)
        at org.apache.struts2.interceptor.DateTextFieldInterceptor.intercept(DateTextFieldInterceptor.java:133)
        at com.opensymphony.xwork2.DefaultActionInvocation.invoke(DefaultActionInvocation.java:249)
        at org.apache.struts2.interceptor.CheckboxInterceptor.intercept(CheckboxInterceptor.java:89)
        at com.opensymphony.xwork2.DefaultActionInvocation.invoke(DefaultActionInvocation.java:249)
        at org.apache.struts2.interceptor.FileUploadInterceptor.intercept(FileUploadInterceptor.java:243)
        at com.opensymphony.xwork2.DefaultActionInvocation.invoke(DefaultActionInvocation.java:249)
        at com.opensymphony.xwork2.interceptor.ModelDrivenInterceptor.intercept(ModelDrivenInterceptor.java:101)
        at com.opensymphony.xwork2.DefaultActionInvocation.invoke(DefaultActionInvocation.java:249)
        at com.opensymphony.xwork2.interceptor.ScopedModelDrivenInterceptor.intercept(ScopedModelDrivenInterceptor.java:142)
        at com.opensymphony.xwork2.DefaultActionInvocation.invoke(DefaultActionInvocation.java:249)
        at com.opensymphony.xwork2.interceptor.ChainingInterceptor.intercept(ChainingInterceptor.java:160)
        at com.opensymphony.xwork2.DefaultActionInvocation.invoke(DefaultActionInvocation.java:249)
        at com.opensymphony.xwork2.interceptor.PrepareInterceptor.doIntercept(PrepareInterceptor.java:175)
        at com.opensymphony.xwork2.interceptor.MethodFilterInterceptor.intercept(MethodFilterInterceptor.java:99)
        at com.opensymphony.xwork2.DefaultActionInvocation.invoke(DefaultActionInvocation.java:249)
        at org.apache.struts2.interceptor.I18nInterceptor.intercept(I18nInterceptor.java:121)
        at com.opensymphony.xwork2.DefaultActionInvocation.invoke(DefaultActionInvocation.java:249)
        at org.apache.struts2.interceptor.ServletConfigInterceptor.intercept(ServletConfigInterceptor.java:167)
        at com.opensymphony.xwork2.DefaultActionInvocation.invoke(DefaultActionInvocation.java:249)
        at com.opensymphony.xwork2.interceptor.AliasInterceptor.intercept(AliasInterceptor.java:203)
        at com.opensymphony.xwork2.DefaultActionInvocation.invoke(DefaultActionInvocation.java:249)
        at com.opensymphony.xwork2.interceptor.ExceptionMappingInterceptor.intercept(ExceptionMappingInterceptor.java:196)
        at com.opensymphony.xwork2.DefaultActionInvocation.invoke(DefaultActionInvocation.java:249)
        at com.qlangtech.tis.manage.common.valve.OperationLogInterceptor.doIntercept(OperationLogInterceptor.java:64)
        at com.opensymphony.xwork2.interceptor.MethodFilterInterceptor.intercept(MethodFilterInterceptor.java:99)
        at com.opensymphony.xwork2.DefaultActionInvocation.invoke(DefaultActionInvocation.java:249)
        at com.qlangtech.tis.manage.common.valve.TisExceptionInterceptor.doIntercept(TisExceptionInterceptor.java:89)
        at com.opensymphony.xwork2.interceptor.MethodFilterInterceptor.intercept(MethodFilterInterceptor.java:99)
        at com.opensymphony.xwork2.DefaultActionInvocation.invoke(DefaultActionInvocation.java:249)
        at com.qlangtech.tis.manage.spring.aop.AuthorityCheckAdvice.doIntercept(AuthorityCheckAdvice.java:109)
        at com.opensymphony.xwork2.interceptor.MethodFilterInterceptor.intercept(MethodFilterInterceptor.java:99)
        at com.opensymphony.xwork2.DefaultActionInvocation.invoke(DefaultActionInvocation.java:249)
        at com.opensymphony.xwork2.interceptor.ParametersInterceptor.doIntercept(ParametersInterceptor.java:137)
        at com.opensymphony.xwork2.interceptor.MethodFilterInterceptor.intercept(MethodFilterInterceptor.java:99)
        at com.opensymphony.xwork2.DefaultActionInvocation.invoke(DefaultActionInvocation.java:249)
        at org.apache.struts2.factory.StrutsActionProxy.execute(StrutsActionProxy.java:48)
        at org.apache.struts2.dispatcher.Dispatcher.serviceAction(Dispatcher.java:574)
        at org.apache.struts2.dispatcher.ExecuteOperations.executeAction(ExecuteOperations.java:79)
        at org.apache.struts2.dispatcher.servlet.StrutsServlet.service(StrutsServlet.java:80)
        at javax.servlet.http.HttpServlet.service(HttpServlet.java:790)
        at org.eclipse.jetty.servlet.ServletHolder$NotAsyncServlet.service(ServletHolder.java:1411)
        at org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:763)
        at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1651)
        at com.qlangtech.tis.manage.common.DefaultFilter.doFilter(DefaultFilter.java:180)
        at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1630)
        at org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:567)
        at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:143)
        at org.eclipse.jetty.security.SecurityHandler.handle(SecurityHandler.java:602)
        at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:127)
        at org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:235)
        at org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java:1610)
        at org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:233)
        at org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1377)
        at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:188)
        at org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:507)
        at org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:1580)
        at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:186)
        at org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1292)
        at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141)
        at org.eclipse.jetty.server.handler.HandlerList.handle(HandlerList.java:59)
        at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:127)
        at org.eclipse.jetty.server.Server.handle(Server.java:501)
        at org.eclipse.jetty.server.HttpChannel.lambda$handle$1(HttpChannel.java:383)
        at org.eclipse.jetty.server.HttpChannel.dispatch(HttpChannel.java:556)
        at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:375)
        at org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:273)
        at org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:311)
        at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:105)
        at org.eclipse.jetty.io.ChannelEndPoint$1.run(ChannelEndPoint.java:104)
        at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.runTask(EatWhatYouKill.java:336)
        at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.doProduce(EatWhatYouKill.java:313)
        at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.tryProduce(EatWhatYouKill.java:171)
        at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.run(EatWhatYouKill.java:129)
        at org.eclipse.jetty.util.thread.ReservedThreadExecutor$ReservedThread.run(ReservedThreadExecutor.java:375)
        at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:806)
        at org.eclipse.jetty.util.thread.QueuedThreadPool$Runner.run(QueuedThreadPool.java:938)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: org.apache.flink.table.api.ValidationException: Variable character string length must be between 1 and 2147483647 (both inclusive).
        at com.qlangtech.tis.datax.job.ServerLaunchToken.writeLaunchToken(ServerLaunchToken.java:435)
        at com.qlangtech.tis.coredefine.module.action.CoreAction.lambda$launchIncrSyncChannel$3(CoreAction.java:630)
        at com.qlangtech.tis.datax.job.DefaultSSERunnable.execute(DefaultSSERunnable.java:112)
        at com.qlangtech.tis.coredefine.module.action.CoreAction.launchIncrSyncChannel(CoreAction.java:628)
        at com.qlangtech.tis.coredefine.module.action.CoreAction.doDeployIncrSyncChannal(CoreAction.java:603)
        ... 123 common frames omitted
Caused by: org.apache.flink.table.api.ValidationException: Variable character string length must be between 1 and 2147483647 (both inclusive).
        at org.apache.flink.table.types.logical.VarCharType.<init>(VarCharType.java:75)
        at com.qlangtech.tis.plugins.incr.flink.cdc.AbstractRowDataMapper$DefaultTypeVisitor.varcharType(AbstractRowDataMapper.java:285)
        at com.qlangtech.tis.plugins.incr.flink.cdc.AbstractRowDataMapper$DefaultTypeVisitor.varcharType(AbstractRowDataMapper.java:113)
        at com.qlangtech.tis.plugin.ds.DataType.accept(DataType.java:265)
        at com.qlangtech.tis.plugins.incr.flink.cdc.AbstractRowDataMapper.mapFlinkCol(AbstractRowDataMapper.java:300)
        at com.qlangtech.tis.plugins.incr.flink.cdc.AbstractRowDataMapper.lambda$getAllTabColsMeta$1(AbstractRowDataMapper.java:100)
        at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
        at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384)
        at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
        at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
        at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
        at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
        at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566)
        at com.qlangtech.tis.plugins.incr.flink.cdc.AbstractRowDataMapper.getAllTabColsMeta(AbstractRowDataMapper.java:101)
        at com.qlangtech.tis.plugins.incr.flink.cdc.AbstractRowDataMapper.getAllTabColsMeta(AbstractRowDataMapper.java:93)
        at com.qlangtech.tis.plugins.incr.flink.connector.elasticsearch7.ElasticSearchSinkFactory.createSinkFunction(ElasticSearchSinkFactory.java:188)
        at com.qlangtech.tis.plugin.PluginAndCfgsSnapshot.lambda$createFlinkIncrJobManifestCfgAttrs$3(PluginAndCfgsSnapshot.java:309)
        at com.qlangtech.tis.util.RobustReflectionConverter2$PluginMetas.collectMetas(RobustReflectionConverter2.java:90)
        at com.qlangtech.tis.plugin.PluginAndCfgsSnapshot.createFlinkIncrJobManifestCfgAttrs(PluginAndCfgsSnapshot.java:302)
        at com.qlangtech.tis.manage.common.incr.UberJarUtil.createStreamUberJar(UberJarUtil.java:43)
        at com.qlangtech.plugins.incr.flink.launch.TISFlinkCDCStreamFactory.deploy(TISFlinkCDCStreamFactory.java:167)
        at com.qlangtech.tis.coredefine.module.action.TISK8sDelegate.deploy(TISK8sDelegate.java:141)
        at com.qlangtech.tis.coredefine.module.action.CoreAction.lambda$getFlinkJobWorkingOrchestrate$6(CoreAction.java:696)
        at com.qlangtech.tis.datax.job.JobResName$1.accept(JobResName.java:54)
        at com.qlangtech.tis.datax.job.SubJobResName.execute(SubJobResName.java:36)
        at com.qlangtech.tis.datax.job.JobResName.execSubJob(JobResName.java:71)
        at com.qlangtech.tis.coredefine.module.action.CoreAction.lambda$null$2(CoreAction.java:635)
        at com.qlangtech.tis.datax.job.ServerLaunchToken.writeLaunchToken(ServerLaunchToken.java:420)
        ... 127 common frames omitted
@jiewenk jiewenk closed this as completed Jul 4, 2024
@jiewenk jiewenk reopened this Jul 4, 2024
@jiewenk
Copy link
Author

jiewenk commented Jul 4, 2024

额,我重新把机器扩容了,重复一样的步骤,还是在同样的地方报错,求作者大大指点

@jiewenk
Copy link
Author

jiewenk commented Jul 4, 2024

MySQL增量同步ES以及MySQL增量同步MySQL,错误如上所示

@baisui1981
Copy link
Member

关键错误信息在这里

org.apache.flink.table.api.ValidationException: Variable character string length must be between 1 and 2147483647 (both inclusive).
        at org.apache.flink.table.types.logical.VarCharType.<init>(VarCharType.java:75)
        at com.qlangtech.tis.plugins.incr.flink.cdc.AbstractRowDataMapper$DefaultTypeVisitor.varcharType(AbstractRowDataMapper.java:285)
        at com.qlangtech.tis.plugins.incr.flink.cdc.AbstractRowDataMapper$DefaultTypeVisitor.varcharType(AbstractRowDataMapper.java:113)
        at com.qlangtech.tis.plugin.ds.DataType.accept(DataType.java:265)

能否提供一下mysql的 create table ddl?

@jiewenk
Copy link
Author

jiewenk commented Jul 4, 2024

关键错误信息在这里

org.apache.flink.table.api.ValidationException: Variable character string length must be between 1 and 2147483647 (both inclusive).
        at org.apache.flink.table.types.logical.VarCharType.<init>(VarCharType.java:75)
        at com.qlangtech.tis.plugins.incr.flink.cdc.AbstractRowDataMapper$DefaultTypeVisitor.varcharType(AbstractRowDataMapper.java:285)
        at com.qlangtech.tis.plugins.incr.flink.cdc.AbstractRowDataMapper$DefaultTypeVisitor.varcharType(AbstractRowDataMapper.java:113)
        at com.qlangtech.tis.plugin.ds.DataType.accept(DataType.java:265)

能否提供一下mysql的 create table ddl?

主要是从MySQL到ES全量是成功了的,然后增量就失败了
然后是要源端MySQL的表的ddl吗?

@jiewenk
Copy link
Author

jiewenk commented Jul 4, 2024

这是MySQL表的ddl语句:

CREATE TABLE `video` (
  `id` varchar(32) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT '视频ID',
  `device_id` varchar(32) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL COMMENT '所属设备ID:可以为空',
  `user_id` varchar(32) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT '所属用户ID',
  `likes` bigint DEFAULT NULL COMMENT '点赞数',
  `share_text` varchar(500) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL COMMENT '分享文字',
  `video_origin` int NOT NULL COMMENT '视频来源:0-终端上传,1-用户自己上传,2-用户自己上传',
  `create_time` datetime NOT NULL COMMENT '创建时间',
  `update_time` datetime NOT NULL COMMENT '更新时间',
  `url` text CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci,
  `comment_total` int NOT NULL DEFAULT '0' COMMENT '评论数量(含二级评论)',
  `template_id` varchar(128) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL DEFAULT '' COMMENT '所使用的的视频模板',
  `video_property` int DEFAULT NULL COMMENT '视频属性:0-素材视频,1-完成剪辑视频',
  `share_count` int DEFAULT '0' COMMENT '分享总数',
  `download_count` int DEFAULT '0' COMMENT '下载总数',
  `collection_count` int DEFAULT '0' COMMENT '收藏总数',
  `browse_count` int DEFAULT '0' COMMENT '浏览总数',
  `score` int DEFAULT '0' COMMENT '视频分数',
  `video_visibility` int NOT NULL COMMENT '视频可见性:0-私有,1-仅粉丝可见,2-公开',
  `grounding_status` int DEFAULT NULL COMMENT '上架状态:0-已下架,1-上架中,2-审核中,3-审核不通过',
  `recommended_status` int DEFAULT NULL COMMENT '推荐状态:0-false,1-true',
  `cancel_reason` varchar(150) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL COMMENT '下架原因',
  `self_read` int DEFAULT NULL COMMENT '是否自己已读:0-未读,1-已读',
  `generate_time` datetime DEFAULT NULL COMMENT '设备上传视频时间',
  `follow` int NOT NULL DEFAULT '0' COMMENT '通过视频达到的关注人数',
  `duration` float DEFAULT '60' COMMENT '视频时长',
  `completion_rate` float NOT NULL DEFAULT '0' COMMENT '完播率',
  `origin_url` text CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci,
  `cover_url` text CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci,
  `water_mark_url` varchar(512) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL COMMENT '水印',
  `type` tinyint NOT NULL DEFAULT '0' COMMENT '0:1:',
  `ld` varchar(512) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL DEFAULT '' COMMENT '流畅',
  `sd` varchar(512) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL DEFAULT '' COMMENT '标清',
  `hd` varchar(512) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL DEFAULT '' COMMENT '高清',
  `full_hd` varchar(512) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL DEFAULT '' COMMENT '全高清',
  `media_type` tinyint NOT NULL DEFAULT '0' COMMENT '媒体类型0:视频,1:图片,默认视频',
  `create_by` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL COMMENT '创建者',
  `update_by` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL COMMENT '更新者',
  `publish_time` timestamp NULL DEFAULT NULL COMMENT '视频发布到广场的时间',
  `width` int DEFAULT NULL,
  `height` int DEFAULT NULL,
  `download_switch` tinyint(1) DEFAULT '0' COMMENT '0打开下载开关 1关闭下载开关',
  `first_frame_md5` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL COMMENT '视频第一帧的md5值',
  `pets_id` varchar(320) NOT NULL DEFAULT '' COMMENT '媒体包含的宠物',
  `mood_id` int DEFAULT NULL COMMENT '关联心情id',
  PRIMARY KEY (`id`) USING BTREE,
  KEY `idx_likes` (`likes`) USING BTREE,
  KEY `idx_user_id` (`user_id`) USING BTREE,
  KEY `idx_video_origin` (`video_origin`) USING BTREE,
  KEY `video_first_frame_md5_index` (`first_frame_md5`) USING BTREE,
  KEY `idx_device_id` (`device_id`) USING BTREE,
  KEY `video_visibility` (`video_visibility`) USING BTREE,
  KEY `video_origin` (`video_origin`) USING BTREE,
  KEY `grounding_status` (`grounding_status`) USING BTREE,
  KEY `video_mix_rec_index` (`video_visibility`,`video_origin`,`grounding_status`,`user_id`,`create_time` DESC) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci ROW_FORMAT=DYNAMIC COMMENT='视频表';

@baisui1981
Copy link
Member

然后是要源端MySQL的表的ddl吗?

是的。另外,elastic 端schema 编辑页面截一个图看一下

@jiewenk
Copy link
Author

jiewenk commented Jul 4, 2024

image
image
image
image
image

@baisui1981
Copy link
Member

切换到 专家模式 ,把对应的 xml格式的 schema 贴一下

@jiewenk
Copy link
Author

jiewenk commented Jul 4, 2024

专家模式 ,把对应的 xml格式的 schema 贴一下

{
	"column": [{
		"array": false,
		"name": "id",
		"index": true,
		"store": true,
		"pk": true,
		"type": "keyword",
		"doc_values": false
	}, {
		"array": false,
		"name": "device_id",
		"index": true,
		"store": true,
		"type": "keyword",
		"doc_values": false
	}, {
		"array": false,
		"name": "user_id",
		"index": true,
		"store": true,
		"type": "keyword",
		"doc_values": false
	}, {
		"array": false,
		"name": "likes",
		"index": true,
		"store": true,
		"type": "long",
		"doc_values": false
	}, {
		"array": false,
		"analyzer": "standard",
		"name": "share_text",
		"index": true,
		"store": true,
		"type": "text",
		"doc_values": false
	}, {
		"array": false,
		"name": "video_origin",
		"index": true,
		"store": true,
		"type": "long",
		"doc_values": false
	}, {
		"array": false,
		"name": "create_time",
		"index": true,
		"store": true,
		"type": "date",
		"doc_values": true
	}, {
		"array": false,
		"name": "update_time",
		"index": true,
		"store": true,
		"type": "date",
		"doc_values": true
	}, {
		"array": false,
		"name": "url",
		"index": true,
		"store": true,
		"type": "keyword",
		"doc_values": false
	}, {
		"array": false,
		"name": "comment_total",
		"index": true,
		"store": true,
		"type": "long",
		"doc_values": false
	}, {
		"array": false,
		"name": "template_id",
		"index": true,
		"store": true,
		"type": "keyword",
		"doc_values": false
	}, {
		"array": false,
		"name": "video_property",
		"index": true,
		"store": true,
		"type": "long",
		"doc_values": false
	}, {
		"array": false,
		"name": "share_count",
		"index": true,
		"store": true,
		"type": "long",
		"doc_values": false
	}, {
		"array": false,
		"name": "download_count",
		"index": true,
		"store": true,
		"type": "long",
		"doc_values": false
	}, {
		"array": false,
		"name": "collection_count",
		"index": true,
		"store": true,
		"type": "long",
		"doc_values": false
	}, {
		"array": false,
		"name": "browse_count",
		"index": true,
		"store": true,
		"type": "long",
		"doc_values": false
	}, {
		"array": false,
		"name": "video_visibility",
		"index": true,
		"store": true,
		"type": "long",
		"doc_values": false
	}, {
		"array": false,
		"name": "grounding_status",
		"index": true,
		"store": true,
		"type": "long",
		"doc_values": false
	}, {
		"array": false,
		"name": "recommended_status",
		"index": true,
		"store": true,
		"type": "long",
		"doc_values": false
	}, {
		"array": false,
		"name": "cancel_reason",
		"index": true,
		"store": true,
		"type": "keyword",
		"doc_values": false
	}, {
		"array": false,
		"name": "self_read",
		"index": true,
		"store": true,
		"type": "long",
		"doc_values": false
	}, {
		"array": false,
		"name": "generate_time",
		"index": true,
		"store": true,
		"type": "date",
		"doc_values": true
	}, {
		"array": false,
		"name": "follow",
		"index": true,
		"store": true,
		"type": "long",
		"doc_values": false
	}, {
		"array": false,
		"name": "duration",
		"index": true,
		"store": true,
		"type": "double",
		"doc_values": false
	}, {
		"array": false,
		"name": "completion_rate",
		"index": true,
		"store": true,
		"type": "double",
		"doc_values": false
	}, {
		"array": false,
		"name": "origin_url",
		"index": true,
		"store": true,
		"type": "keyword",
		"doc_values": false
	}, {
		"array": false,
		"name": "cover_url",
		"index": true,
		"store": true,
		"type": "keyword",
		"doc_values": false
	}, {
		"array": false,
		"name": "water_mark_url",
		"index": true,
		"store": true,
		"type": "keyword",
		"doc_values": false
	}, {
		"array": false,
		"name": "type",
		"index": true,
		"store": true,
		"type": "long",
		"doc_values": false
	}, {
		"array": false,
		"name": "ld",
		"index": true,
		"store": true,
		"type": "keyword",
		"doc_values": false
	}, {
		"array": false,
		"name": "sd",
		"index": true,
		"store": true,
		"type": "keyword",
		"doc_values": false
	}, {
		"array": false,
		"name": "hd",
		"index": true,
		"store": true,
		"type": "keyword",
		"doc_values": false
	}, {
		"array": false,
		"name": "full_hd",
		"index": true,
		"store": true,
		"type": "keyword",
		"doc_values": false
	}, {
		"array": false,
		"name": "media_type",
		"index": true,
		"store": true,
		"type": "long",
		"doc_values": false
	}, {
		"array": false,
		"name": "create_by",
		"index": true,
		"store": true,
		"type": "keyword",
		"doc_values": false
	}, {
		"array": false,
		"name": "update_by",
		"index": true,
		"store": true,
		"type": "keyword",
		"doc_values": false
	}, {
		"array": false,
		"name": "publish_time",
		"index": true,
		"store": true,
		"type": "date",
		"doc_values": true
	}, {
		"array": false,
		"name": "width",
		"index": true,
		"store": true,
		"type": "long",
		"doc_values": false
	}, {
		"array": false,
		"name": "height",
		"index": true,
		"store": true,
		"type": "long",
		"doc_values": false
	}, {
		"array": false,
		"name": "download_switch",
		"index": true,
		"store": true,
		"type": "boolean",
		"doc_values": false
	}, {
		"array": false,
		"name": "first_frame_md5",
		"index": true,
		"store": true,
		"type": "keyword",
		"doc_values": false
	}, {
		"array": false,
		"analyzer": "standard",
		"name": "pets_id",
		"index": true,
		"store": true,
		"type": "text",
		"doc_values": false
	}, {
		"array": false,
		"name": "mood_id",
		"index": true,
		"store": true,
		"type": "long",
		"doc_values": false
	}]
}

TIS默认生成的

{
	"column":[
		{
			"array":false,
			"name":"id",
			"index":true,
			"store":true,
			"pk":true,
			"type":"keyword",
			"doc_values":false
		},
		{
			"array":false,
			"name":"device_id",
			"index":true,
			"store":true,
			"type":"keyword",
			"doc_values":false
		},
		{
			"array":false,
			"name":"user_id",
			"index":true,
			"store":true,
			"type":"keyword",
			"doc_values":false
		},
		{
			"array":false,
			"name":"likes",
			"index":true,
			"store":true,
			"type":"long",
			"doc_values":false
		},
		{
			"array":false,
			"name":"share_text",
			"index":true,
			"store":true,
			"type":"keyword",
			"doc_values":false
		},
		{
			"array":false,
			"name":"video_origin",
			"index":true,
			"store":true,
			"type":"long",
			"doc_values":false
		},
		{
			"array":false,
			"name":"create_time",
			"index":true,
			"store":true,
			"type":"date",
			"doc_values":false
		},
		{
			"array":false,
			"name":"update_time",
			"index":true,
			"store":true,
			"type":"date",
			"doc_values":false
		},
		{
			"array":false,
			"name":"url",
			"index":true,
			"store":true,
			"type":"keyword",
			"doc_values":false
		},
		{
			"array":false,
			"name":"comment_total",
			"index":true,
			"store":true,
			"type":"long",
			"doc_values":false
		},
		{
			"array":false,
			"name":"template_id",
			"index":true,
			"store":true,
			"type":"keyword",
			"doc_values":false
		},
		{
			"array":false,
			"name":"video_property",
			"index":true,
			"store":true,
			"type":"long",
			"doc_values":false
		},
		{
			"array":false,
			"name":"share_count",
			"index":true,
			"store":true,
			"type":"long",
			"doc_values":false
		},
		{
			"array":false,
			"name":"download_count",
			"index":true,
			"store":true,
			"type":"long",
			"doc_values":false
		},
		{
			"array":false,
			"name":"collection_count",
			"index":true,
			"store":true,
			"type":"long",
			"doc_values":false
		},
		{
			"array":false,
			"name":"browse_count",
			"index":true,
			"store":true,
			"type":"long",
			"doc_values":false
		},
		{
			"array":false,
			"name":"score",
			"index":true,
			"store":true,
			"type":"long",
			"doc_values":false
		},
		{
			"array":false,
			"name":"video_visibility",
			"index":true,
			"store":true,
			"type":"long",
			"doc_values":false
		},
		{
			"array":false,
			"name":"grounding_status",
			"index":true,
			"store":true,
			"type":"long",
			"doc_values":false
		},
		{
			"array":false,
			"name":"recommended_status",
			"index":true,
			"store":true,
			"type":"long",
			"doc_values":false
		},
		{
			"array":false,
			"name":"cancel_reason",
			"index":true,
			"store":true,
			"type":"keyword",
			"doc_values":false
		},
		{
			"array":false,
			"name":"self_read",
			"index":true,
			"store":true,
			"type":"long",
			"doc_values":false
		},
		{
			"array":false,
			"name":"generate_time",
			"index":true,
			"store":true,
			"type":"date",
			"doc_values":false
		},
		{
			"array":false,
			"name":"follow",
			"index":true,
			"store":true,
			"type":"long",
			"doc_values":false
		},
		{
			"array":false,
			"name":"duration",
			"index":true,
			"store":true,
			"type":"double",
			"doc_values":false
		},
		{
			"array":false,
			"name":"completion_rate",
			"index":true,
			"store":true,
			"type":"double",
			"doc_values":false
		},
		{
			"array":false,
			"name":"origin_url",
			"index":true,
			"store":true,
			"type":"keyword",
			"doc_values":false
		},
		{
			"array":false,
			"name":"cover_url",
			"index":true,
			"store":true,
			"type":"keyword",
			"doc_values":false
		},
		{
			"array":false,
			"name":"water_mark_url",
			"index":true,
			"store":true,
			"type":"keyword",
			"doc_values":false
		},
		{
			"array":false,
			"name":"type",
			"index":true,
			"store":true,
			"type":"long",
			"doc_values":false
		},
		{
			"array":false,
			"name":"ld",
			"index":true,
			"store":true,
			"type":"keyword",
			"doc_values":false
		},
		{
			"array":false,
			"name":"sd",
			"index":true,
			"store":true,
			"type":"keyword",
			"doc_values":false
		},
		{
			"array":false,
			"name":"hd",
			"index":true,
			"store":true,
			"type":"keyword",
			"doc_values":false
		},
		{
			"array":false,
			"name":"full_hd",
			"index":true,
			"store":true,
			"type":"keyword",
			"doc_values":false
		},
		{
			"array":false,
			"name":"media_type",
			"index":true,
			"store":true,
			"type":"long",
			"doc_values":false
		},
		{
			"array":false,
			"name":"create_by",
			"index":true,
			"store":true,
			"type":"keyword",
			"doc_values":false
		},
		{
			"array":false,
			"name":"update_by",
			"index":true,
			"store":true,
			"type":"keyword",
			"doc_values":false
		},
		{
			"array":false,
			"name":"publish_time",
			"index":true,
			"store":true,
			"type":"date",
			"doc_values":false
		},
		{
			"array":false,
			"name":"width",
			"index":true,
			"store":true,
			"type":"long",
			"doc_values":false
		},
		{
			"array":false,
			"name":"height",
			"index":true,
			"store":true,
			"type":"long",
			"doc_values":false
		},
		{
			"array":false,
			"name":"download_switch",
			"index":true,
			"store":true,
			"type":"boolean",
			"doc_values":false
		},
		{
			"array":false,
			"name":"first_frame_md5",
			"index":true,
			"store":true,
			"type":"keyword",
			"doc_values":false
		},
		{
			"array":false,
			"name":"pets_id",
			"index":true,
			"store":true,
			"type":"keyword",
			"doc_values":false
		},
		{
			"array":false,
			"name":"mood_id",
			"index":true,
			"store":true,
			"type":"long",
			"doc_values":false
		}
	]
}

@baisui1981
Copy link
Member

ok,我查一下

@baisui1981 baisui1981 added the bug Something isn't working label Jul 4, 2024
@baisui1981 baisui1981 added this to the V4.0.1 milestone Jul 4, 2024
@baisui1981
Copy link
Member

baisui1981 commented Jul 4, 2024

原因分析

由于设置text类型的类型,内部对应jdbc type为 JDBCTypes.LONGVARCHAR 没有设置colSize

new DataTypeMeta(new DataType(JDBCTypes.LONGVARCHAR))

所以在生成flink任务时会报异常

org.apache.flink.table.api.ValidationException: Variable character string length must be between 1 and 2147483647 (both inclusive).
        at org.apache.flink.table.types.logical.VarCharType.<init>(VarCharType.java:75)
        at com.qlangtech.tis.plugins.incr.flink.cdc.AbstractRowDataMapper$DefaultTypeVisitor.varcharType(AbstractRowDataMapper.java:285)
        at com.qlangtech.tis.plugins.incr.flink.cdc.AbstractRowDataMapper$DefaultTypeVisitor.varcharType(AbstractRowDataMapper.java:113)
        at com.qlangtech.tis.plugin.ds.DataType.accept(DataType.java:265)

只需添加一个Integer.MAX_VALUE参数即可:

new DataTypeMeta(new DataType(JDBCTypes.LONGVARCHAR, Integer.MAX_VALUE))

@jiewenk 有微信联系方式不,发你一个jar替换一下即可

@jiewenk
Copy link
Author

jiewenk commented Jul 5, 2024

有微信联系方式不,发你一个jar替换一下即可

18482162262,感谢!
05205FC6

@jiewenk jiewenk reopened this Jul 5, 2024
@jiewenk
Copy link
Author

jiewenk commented Jul 5, 2024

成功创建MySQL到ES的增量通道后,如果MySQL的字段值允许为空值,在增量同步时,会报空指针
部分异常堆栈如下:

Caused by: java.lang.NullPointerException
	at org.apache.flink.table.data.GenericRowData.getInt(GenericRowData.java:149)
	at com.qlangtech.plugins.incr.flink.cdc.RowFieldGetterFactory$IntGetter.getObject(RowFieldGetterFactory.java:226)
	at com.qlangtech.plugins.incr.flink.cdc.RowFieldGetterFactory$BasicGetter.getVal(RowFieldGetterFactory.java:246)
	at com.qlangtech.plugins.incr.flink.cdc.RowFieldGetterFactory$BasicGetter.getFieldOrNull(RowFieldGetterFactory.java:241)
	at com.qlangtech.plugins.incr.flink.cdc.FlinkCol.getRowDataVal(FlinkCol.java:65)
	at com.qlangtech.tis.plugins.incr.flink.connector.elasticsearch7.ElasticSearchSinkFactory$DefaultElasticsearchSinkFunction.createIndexRequest(ElasticSearchSinkFactory.java:270)
	at com.qlangtech.tis.plugins.incr.flink.connector.elasticsearch7.ElasticSearchSinkFactory$DefaultElasticsearchSinkFunction.process(ElasticSearchSinkFactory.java:291)
	at com.qlangtech.tis.plugins.incr.flink.connector.elasticsearch7.ElasticSearchSinkFactory$DefaultElasticsearchSinkFunction.process(ElasticSearchSinkFactory.java:245)
	at org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.invoke(ElasticsearchSinkBase.java:318)
	at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:54)
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75)
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50)
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
	at org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:39)
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75)
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50)
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
	at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:38)
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75)
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:61)
	at org.apache.flink.streaming.api.operators.ProcessOperator$ContextImpl.output(ProcessOperator.java:103)
	at com.qlangtech.tis.realtime.SourceProcessFunction.processElement(SourceProcessFunction.java:47)
	at com.qlangtech.tis.realtime.SourceProcessFunction.processElement(SourceProcessFunction.java:32)
	at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75)
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50)
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
	at org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:309)
	at org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:110)
	at org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:101)
	at com.ververica.cdc.connectors.mysql.source.reader.MySqlRecordEmitter$OutputCollector.collect(MySqlRecordEmitter.java:150)
	at com.qlangtech.plugins.incr.flink.cdc.TISDeserializationSchema.deserialize(TISDeserializationSchema.java:109)
	at com.ververica.cdc.connectors.mysql.source.reader.MySqlRecordEmitter.emitElement(MySqlRecordEmitter.java:128)
	at com.ververica.cdc.connectors.mysql.source.reader.MySqlRecordEmitter.processElement(MySqlRecordEmitter.java:110)
	at com.ververica.cdc.connectors.mysql.source.reader.MySqlRecordEmitter.emitRecord(MySqlRecordEmitter.java:82)
	at com.ververica.cdc.connectors.mysql.source.reader.MySqlRecordEmitter.emitRecord(MySqlRecordEmitter.java:55)
	at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:160)
	at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:419)
	at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
	at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:562)
	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:858)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:807)
	at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953)
	at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:932)
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
	at java.lang.Thread.run(Thread.java:750)

@baisui1981
Copy link
Member

已经修复 tis-incr/tis-realtime-flink/src/main/java/com/qlangtech/plugins/incr/flink/cdc/RowFieldGetterFactory.java

 private Object getVal(RowData rowData) {
           // 添加是否是空的判断即可
            if (rowData.isNullAt(this.colIndex)) {
                return null;
            }
            try {
                return getObject((GenericRowData) rowData);
            } catch (ClassCastException e) {
                throw new RuntimeException("colIdx:" + this.colIndex + ",colName:" + this.colName, e);
            }
        }

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

2 participants