Hello, I'm trying to create an Accumulator that takes in 2 columns, where each "value" is therefore a tuple, and results in a tuple of 2 arrays yet no matter what I try I receive an error trace like the one below. (Oddly, using almost the same pattern with 1 input column (a Long for "value") and 1 Array of Longs result/output works perfectly fine, so I'm not sure why suddenly using a Tuple should make much of any difference.) please help. Error trace: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Given parameters do not match any signature. Actual: (java.lang.Long, java.lang.Boolean) Expected: (scala.Tuple2) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198) at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149) at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699) at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232) at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916) at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992) at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992) Caused by: org.apache.flink.table.api.ValidationException: Given parameters do not match any signature. Actual: (java.lang.Long, java.lang.Boolean) Expected: (scala.Tuple2) at org.apache.flink.table.planner.expressions.PlannerTypeInferenceUtilImpl.validateArguments(PlannerTypeInferenceUtilImpl.java:112) at org.apache.flink.table.planner.expressions.PlannerTypeInferenceUtilImpl.runTypeInference(PlannerTypeInferenceUtilImpl.java:71) at org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.runLegacyTypeInference(ResolveCallByArgumentsRule.java:218) at org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.lambda$visit$2(ResolveCallByArgumentsRule.java:134) at java.base/java.util.Optional.orElseGet(Optional.java:369) at org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:134) at org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:89) at org.apache.flink.table.expressions.ApiExpressionVisitor.visit(ApiExpressionVisitor.java:39) at org.apache.flink.table.expressions.UnresolvedCallExpression.accept(UnresolvedCallExpression.java:132) at org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:124) at org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:89) at org.apache.flink.table.expressions.ApiExpressionVisitor.visit(ApiExpressionVisitor.java:39) at org.apache.flink.table.expressions.UnresolvedCallExpression.accept(UnresolvedCallExpression.java:132) at org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule.lambda$apply$0(ResolveCallByArgumentsRule.java:83) at java.base/java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:271) at java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1655) at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484) at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474) at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913) at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) at java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578) at org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule.apply(ResolveCallByArgumentsRule.java:84) at org.apache.flink.table.expressions.resolver.ExpressionResolver.lambda$null$1(ExpressionResolver.java:211) at java.base/java.util.function.Function.lambda$andThen$1(Function.java:88) at org.apache.flink.table.expressions.resolver.ExpressionResolver.resolve(ExpressionResolver.java:178) at org.apache.flink.table.operations.utils.OperationTreeBuilder.aggregate(OperationTreeBuilder.java:237) at org.apache.flink.table.operations.utils.OperationTreeBuilder.aggregate(OperationTreeBuilder.java:528) at org.apache.flink.table.api.internal.TableImpl$AggregatedTableImpl.select(TableImpl.java:685) at remind.Job$.main(Job.scala:181) at remind.Job.main(Job.scala) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288) This is my code: case class MembershipsIDsAcc( var subscriberIDs: mutable.Set[Long], var ownerIDs: mutable.Set[Long] ) class MembershipsIDsAgg extends AggregateFunction[Row, MembershipsIDsAcc] { override def createAccumulator(): MembershipsIDsAcc = MembershipsIDsAcc(mutable.Set(), mutable.Set()) def accumulate(acc: MembershipsIDsAcc, value: (Long, Boolean)): Unit = { acc.subscriberIDs.add(value._1) if (value._2) { acc.ownerIDs.add(value._1) } else { acc.ownerIDs.remove(value._1) } } def retract(acc: MembershipsIDsAcc, value: (Long, Boolean)): Unit = { acc.subscriberIDs.remove(value._1) acc.ownerIDs.remove(value._1) } def resetAccumulator(acc: MembershipsIDsAcc): Unit = { acc.subscriberIDs = mutable.Set() acc.ownerIDs = mutable.Set() } override def getValue(acc: MembershipsIDsAcc): Row = { Row.of(acc.subscriberIDs.toArray, acc.ownerIDs.toArray) } override def getResultType: TypeInformation[Row] = { new RowTypeInfo( createTypeInformation[Array[Long]], createTypeInformation[Array[Long]] ) } } // Usage ... val membershipsByGroupId = membershipsNotDeletedTable .groupBy($"group_id") .aggregate( membershipsIDsAgg( $"user_id", $"owner" ) as ("subscriber_ids", "owner_ids") ) .select($"group_id", $"subscriber_ids", $"owner_ids") ... Rex Fenley | Software Engineer - Mobile and Backend Remind.com | BLOG | FOLLOW US | LIKE US |
Imports: import java.util.Date import org.apache.flink.api.scala._ import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.typeutils.RowTypeInfo import org.apache.flink.table.api.{ DataTypes, EnvironmentSettings, TableEnvironment, TableSchema } import org.apache.flink.table.api.bridge.scala._ import org.apache.flink.table.data.RowData import org.apache.flink.streaming.api.scala.{ DataStream, StreamExecutionEnvironment } import org.apache.flink.types.Row import org.apache.flink.table.api._ import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchUpsertTableSinkFactoryBase import org.apache.flink.streaming.connectors.elasticsearch7.Elasticsearch7UpsertTableSink import org.apache.flink.streaming.connectors.elasticsearch7.Elasticsearch7UpsertTableSinkFactory import org.apache.flink.table.descriptors.Elasticsearch import org.apache.flink.table.descriptors._ import org.apache.flink.table.factories.SerializationFormatFactory import org.apache.flink.formats.json import org.apache.flink.table.functions.AggregateFunction import org.apache.flink.types.ListValue import scala.collection.mutable On Sun, Oct 25, 2020 at 6:09 PM Rex Fenley <[hidden email]> wrote:
-- Rex Fenley | Software Engineer - Mobile and Backend Remind.com | BLOG | FOLLOW US | LIKE US |
If I switch accumulate to the following: def accumulate(acc: MembershipsIDsAcc, value: org.apache.flink.api.java.tuple.Tuple2[java.lang.Long, java.lang.Boolean]): Unit = {...} I instead receive: Tuple needs to be parameterized by using generics. org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:847) org.apache.flink.api.java.typeutils.TypeExtractor.privateCreateTypeInfo(TypeExtractor.java:781) org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfo(TypeExtractor.java:735) org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfo(TypeExtractor.java:731) ... On Sun, Oct 25, 2020 at 6:24 PM Rex Fenley <[hidden email]> wrote:
-- Rex Fenley | Software Engineer - Mobile and Backend Remind.com | BLOG | FOLLOW US | LIKE US |
Hi, I think you can directly declare `def accumulate(acc: MembershipsIDsAcc, value1: Long, value2: Boolean)` Best, Xingbo Rex Fenley <[hidden email]> 于2020年10月26日周一 上午9:28写道:
|
Thanks! Looks like that worked! Fwiw, the error message is very confusing. Is there any way this can be improved? Thanks :) On Sun, Oct 25, 2020 at 6:42 PM Xingbo Huang <[hidden email]> wrote:
-- Rex Fenley | Software Engineer - Mobile and Backend Remind.com | BLOG | FOLLOW US | LIKE US |
Free forum by Nabble | Edit this page |