You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
I have implemented a simple UDF to return the length of an array:
packagetest.ksql.udf;
importio.confluent.ksql.function.udf.Udf;
importio.confluent.ksql.function.udf.UdfDescription;
importjava.util.List;
@UdfDescription(name = "sizeof", description = "returns the size of an array")
publicclassSizeOf {
@Udf(description = "returns the size of an array")
publiclongsizeOfListInteger(finalList<Integer> list) { returnlist.size(); }
@Udf(description = "returns the size of an array")
publiclongsizeOfListLong(finalList<Long> list) { returnlist.size(); }
}
Loading this UDF in the KSQL server on startup results in a KsqlException:
[2018-10-23 15:13:45,840] INFO Adding function sizeof for method public long test.ksql.udf.SizeOf.sizeOfListLong(java.util.Map) (io.confluent.ksql.function.UdfLoader:238)
[2018-10-23 15:13:45,848] WARN Failed to add UDF to the MetaStore. name=sizeof method=public long escid.esp.analytics.ksql.udf.SizeOf.sizeOfListLong(java.util.List) (io.confluent.ksql.function.UdfLoader:213)
io.confluent.ksql.util.KsqlException: Can't add function KsqlFunction{returnType=Schema{INT64}, arguments=[ARRAY], functionName='sizeof', kudfClass=class io.confluent.ksql.function.udf.PluggableUdf, description='returns the size of a map
', pathLoadedFrom='[OMITTED]/ksql/ext/analytics-ksql.1.0-SNAPSHOT-standalone.jar'} as a function with the same name and argument types already exists KsqlFunction{returnType=Schema{INT64}, arguments=[ARRAY], functionName='sizeof', kudfClass=class io.confluent.ksql.function.udf.PluggableUdf, description='returns the size of a map', pathLoadedFrom='[OMITTED]/ksql/ext/analytics-ksql.1.0-SNAPSHOT-standalone.jar'}
at io.confluent.ksql.function.UdfFactory.checkCompatible(UdfFactory.java:69)
at io.confluent.ksql.function.UdfFactory.addFunction(UdfFactory.java:56)
at io.confluent.ksql.function.InternalFunctionRegistry.addFunction(InternalFunctionRegistry.java:104)
at io.confluent.ksql.metastore.MetaStoreImpl.addFunction(MetaStoreImpl.java:227)
at io.confluent.ksql.function.UdfLoader.addFunction(UdfLoader.java:246)
at io.confluent.ksql.function.UdfLoader.lambda$handleUdfAnnotation$8(UdfLoader.java:208)
at io.github.lukehutch.fastclasspathscanner.scanner.ScanSpec$9.lookForMatches(ScanSpec.java:1390)
at io.github.lukehutch.fastclasspathscanner.scanner.ScanSpec.callMatchProcessors(ScanSpec.java:696)
at io.github.lukehutch.fastclasspathscanner.FastClasspathScanner.scan(FastClasspathScanner.java:1606)
at io.github.lukehutch.fastclasspathscanner.FastClasspathScanner.scan(FastClasspathScanner.java:1678)
at io.github.lukehutch.fastclasspathscanner.FastClasspathScanner.scan(FastClasspathScanner.java:1704)
at io.confluent.ksql.function.UdfLoader.loadUdfs(UdfLoader.java:138)
at io.confluent.ksql.function.UdfLoader.lambda$load$2(UdfLoader.java:108)
at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184)
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175)
at java.util.Iterator.forEachRemaining(Iterator.java:116)
at java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:151)
at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:174)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418)
at io.confluent.ksql.function.UdfLoader.load(UdfLoader.java:108)
at io.confluent.ksql.rest.server.KsqlRestApplication.buildApplication(KsqlRestApplication.java:254)
at io.confluent.ksql.rest.server.KsqlServerMain.createExecutable(KsqlServerMain.java:83)
at io.confluent.ksql.rest.server.KsqlServerMain.main(KsqlServerMain.java:45)
It seems that the different type parameters (Integer, Long) of the List in the two implementations are not distinguished in the signature of the KsqlFunction (in both cases it's arguments=[ARRAY]).
The same problem also arises when using parameterized Maps as the argument types.
I took a closer look at the sources. It seems that in io.confluent.ksql.function.UdfFactory.mapToFunctionParameter the FunctionParameter object representing the (parameterizied) argument type of the method definining the UDF (e.g., List) is created from theschema.type() of the org.apache.kafka.connect.data.SchemaBuilder object schema for the argument. However, this only returns the raw type org.apache.kafka.connect.data.Schema.Type.ARRAY but not the schema.valueSchema that represents the type parameter String.
I would suggest adding attributes for the keySchema as well as the valueSchema in the case of complex types to the class FunctionParameter and updating the methods of the class correspondingly.
I have implemented a simple UDF to return the length of an array:
Loading this UDF in the KSQL server on startup results in a KsqlException:
It seems that the different type parameters (Integer, Long) of the List in the two implementations are not distinguished in the signature of the KsqlFunction (in both cases it's arguments=[ARRAY]).
The same problem also arises when using parameterized Maps as the argument types.
The bug might be related to #2029 .
The text was updated successfully, but these errors were encountered: