Failing to create Accumulator over multiple columns

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Failing to create Accumulator over multiple columns

Rex Fenley
Hello,

I'm trying to create an Accumulator that takes in 2 columns, where each "value" is therefore a tuple, and results in a tuple of 2 arrays yet no matter what I try I receive an error trace like the one below.

(Oddly, using almost the same pattern with 1 input column (a Long for "value")  and 1 Array of Longs result/output works perfectly fine, so I'm not sure why suddenly using a Tuple should make much of any difference.)

please help.

Error trace:

org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Given parameters do not match any signature.
Actual: (java.lang.Long, java.lang.Boolean)
Expected: (scala.Tuple2)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)
at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
Caused by: org.apache.flink.table.api.ValidationException: Given parameters do not match any signature.
Actual: (java.lang.Long, java.lang.Boolean)
Expected: (scala.Tuple2)
at org.apache.flink.table.planner.expressions.PlannerTypeInferenceUtilImpl.validateArguments(PlannerTypeInferenceUtilImpl.java:112)
at org.apache.flink.table.planner.expressions.PlannerTypeInferenceUtilImpl.runTypeInference(PlannerTypeInferenceUtilImpl.java:71)
at org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.runLegacyTypeInference(ResolveCallByArgumentsRule.java:218)
at org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.lambda$visit$2(ResolveCallByArgumentsRule.java:134)
at java.base/java.util.Optional.orElseGet(Optional.java:369)
at org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:134)
at org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:89)
at org.apache.flink.table.expressions.ApiExpressionVisitor.visit(ApiExpressionVisitor.java:39)
at org.apache.flink.table.expressions.UnresolvedCallExpression.accept(UnresolvedCallExpression.java:132)
at org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:124)
at org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:89)
at org.apache.flink.table.expressions.ApiExpressionVisitor.visit(ApiExpressionVisitor.java:39)
at org.apache.flink.table.expressions.UnresolvedCallExpression.accept(UnresolvedCallExpression.java:132)
at org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule.lambda$apply$0(ResolveCallByArgumentsRule.java:83)
at java.base/java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:271)
at java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1655)
at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913)
at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578)
at org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule.apply(ResolveCallByArgumentsRule.java:84)
at org.apache.flink.table.expressions.resolver.ExpressionResolver.lambda$null$1(ExpressionResolver.java:211)
at java.base/java.util.function.Function.lambda$andThen$1(Function.java:88)
at org.apache.flink.table.expressions.resolver.ExpressionResolver.resolve(ExpressionResolver.java:178)
at org.apache.flink.table.operations.utils.OperationTreeBuilder.aggregate(OperationTreeBuilder.java:237)
at org.apache.flink.table.operations.utils.OperationTreeBuilder.aggregate(OperationTreeBuilder.java:528)
at org.apache.flink.table.api.internal.TableImpl$AggregatedTableImpl.select(TableImpl.java:685)
at remind.Job$.main(Job.scala:181)
at remind.Job.main(Job.scala)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)

This is my code:

case class MembershipsIDsAcc(
var subscriberIDs: mutable.Set[Long],
var ownerIDs: mutable.Set[Long]
)
class MembershipsIDsAgg extends AggregateFunction[Row, MembershipsIDsAcc] {

override def createAccumulator(): MembershipsIDsAcc =
MembershipsIDsAcc(mutable.Set(), mutable.Set())

def accumulate(acc: MembershipsIDsAcc, value: (Long, Boolean)): Unit = {
acc.subscriberIDs.add(value._1)
if (value._2) {
acc.ownerIDs.add(value._1)
} else {
acc.ownerIDs.remove(value._1)
}
}

def retract(acc: MembershipsIDsAcc, value: (Long, Boolean)): Unit = {
acc.subscriberIDs.remove(value._1)
acc.ownerIDs.remove(value._1)
}

def resetAccumulator(acc: MembershipsIDsAcc): Unit = {
acc.subscriberIDs = mutable.Set()
acc.ownerIDs = mutable.Set()
}

override def getValue(acc: MembershipsIDsAcc): Row = {
Row.of(acc.subscriberIDs.toArray, acc.ownerIDs.toArray)
}

override def getResultType: TypeInformation[Row] = {
new RowTypeInfo(
createTypeInformation[Array[Long]],
createTypeInformation[Array[Long]]
)
}
}

// Usage
...
val membershipsByGroupId =
membershipsNotDeletedTable
.groupBy($"group_id")
.aggregate(
membershipsIDsAgg(
$"user_id",
$"owner"
) as ("subscriber_ids", "owner_ids")
)
.select($"group_id", $"subscriber_ids", $"owner_ids")
...

--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US

Reply | Threaded
Open this post in threaded view
|

Re: Failing to create Accumulator over multiple columns

Rex Fenley
Imports:

import java.util.Date
import org.apache.flink.api.scala._
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.typeutils.RowTypeInfo
import org.apache.flink.table.api.{
DataTypes,
EnvironmentSettings,
TableEnvironment,
TableSchema
}
import org.apache.flink.table.api.bridge.scala._
import org.apache.flink.table.data.RowData
import org.apache.flink.streaming.api.scala.{
DataStream,
StreamExecutionEnvironment
}
import org.apache.flink.types.Row
import org.apache.flink.table.api._
import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchUpsertTableSinkFactoryBase
import org.apache.flink.streaming.connectors.elasticsearch7.Elasticsearch7UpsertTableSink
import org.apache.flink.streaming.connectors.elasticsearch7.Elasticsearch7UpsertTableSinkFactory
import org.apache.flink.table.descriptors.Elasticsearch
import org.apache.flink.table.descriptors._
import org.apache.flink.table.factories.SerializationFormatFactory
import org.apache.flink.formats.json
import org.apache.flink.table.functions.AggregateFunction
import org.apache.flink.types.ListValue
import scala.collection.mutable

On Sun, Oct 25, 2020 at 6:09 PM Rex Fenley <[hidden email]> wrote:
Hello,

I'm trying to create an Accumulator that takes in 2 columns, where each "value" is therefore a tuple, and results in a tuple of 2 arrays yet no matter what I try I receive an error trace like the one below.

(Oddly, using almost the same pattern with 1 input column (a Long for "value")  and 1 Array of Longs result/output works perfectly fine, so I'm not sure why suddenly using a Tuple should make much of any difference.)

please help.

Error trace:

org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Given parameters do not match any signature.
Actual: (java.lang.Long, java.lang.Boolean)
Expected: (scala.Tuple2)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)
at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
Caused by: org.apache.flink.table.api.ValidationException: Given parameters do not match any signature.
Actual: (java.lang.Long, java.lang.Boolean)
Expected: (scala.Tuple2)
at org.apache.flink.table.planner.expressions.PlannerTypeInferenceUtilImpl.validateArguments(PlannerTypeInferenceUtilImpl.java:112)
at org.apache.flink.table.planner.expressions.PlannerTypeInferenceUtilImpl.runTypeInference(PlannerTypeInferenceUtilImpl.java:71)
at org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.runLegacyTypeInference(ResolveCallByArgumentsRule.java:218)
at org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.lambda$visit$2(ResolveCallByArgumentsRule.java:134)
at java.base/java.util.Optional.orElseGet(Optional.java:369)
at org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:134)
at org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:89)
at org.apache.flink.table.expressions.ApiExpressionVisitor.visit(ApiExpressionVisitor.java:39)
at org.apache.flink.table.expressions.UnresolvedCallExpression.accept(UnresolvedCallExpression.java:132)
at org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:124)
at org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:89)
at org.apache.flink.table.expressions.ApiExpressionVisitor.visit(ApiExpressionVisitor.java:39)
at org.apache.flink.table.expressions.UnresolvedCallExpression.accept(UnresolvedCallExpression.java:132)
at org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule.lambda$apply$0(ResolveCallByArgumentsRule.java:83)
at java.base/java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:271)
at java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1655)
at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913)
at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578)
at org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule.apply(ResolveCallByArgumentsRule.java:84)
at org.apache.flink.table.expressions.resolver.ExpressionResolver.lambda$null$1(ExpressionResolver.java:211)
at java.base/java.util.function.Function.lambda$andThen$1(Function.java:88)
at org.apache.flink.table.expressions.resolver.ExpressionResolver.resolve(ExpressionResolver.java:178)
at org.apache.flink.table.operations.utils.OperationTreeBuilder.aggregate(OperationTreeBuilder.java:237)
at org.apache.flink.table.operations.utils.OperationTreeBuilder.aggregate(OperationTreeBuilder.java:528)
at org.apache.flink.table.api.internal.TableImpl$AggregatedTableImpl.select(TableImpl.java:685)
at remind.Job$.main(Job.scala:181)
at remind.Job.main(Job.scala)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)

This is my code:

case class MembershipsIDsAcc(
var subscriberIDs: mutable.Set[Long],
var ownerIDs: mutable.Set[Long]
)
class MembershipsIDsAgg extends AggregateFunction[Row, MembershipsIDsAcc] {

override def createAccumulator(): MembershipsIDsAcc =
MembershipsIDsAcc(mutable.Set(), mutable.Set())

def accumulate(acc: MembershipsIDsAcc, value: (Long, Boolean)): Unit = {
acc.subscriberIDs.add(value._1)
if (value._2) {
acc.ownerIDs.add(value._1)
} else {
acc.ownerIDs.remove(value._1)
}
}

def retract(acc: MembershipsIDsAcc, value: (Long, Boolean)): Unit = {
acc.subscriberIDs.remove(value._1)
acc.ownerIDs.remove(value._1)
}

def resetAccumulator(acc: MembershipsIDsAcc): Unit = {
acc.subscriberIDs = mutable.Set()
acc.ownerIDs = mutable.Set()
}

override def getValue(acc: MembershipsIDsAcc): Row = {
Row.of(acc.subscriberIDs.toArray, acc.ownerIDs.toArray)
}

override def getResultType: TypeInformation[Row] = {
new RowTypeInfo(
createTypeInformation[Array[Long]],
createTypeInformation[Array[Long]]
)
}
}

// Usage
...
val membershipsByGroupId =
membershipsNotDeletedTable
.groupBy($"group_id")
.aggregate(
membershipsIDsAgg(
$"user_id",
$"owner"
) as ("subscriber_ids", "owner_ids")
)
.select($"group_id", $"subscriber_ids", $"owner_ids")
...

--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US

Reply | Threaded
Open this post in threaded view
|

Re: Failing to create Accumulator over multiple columns

Rex Fenley
If I switch accumulate to the following:
def accumulate(acc: MembershipsIDsAcc, value: org.apache.flink.api.java.tuple.Tuple2[java.lang.Long, java.lang.Boolean]): Unit = {...}


I instead receive:

Tuple needs to be parameterized by using generics.
org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:847)
org.apache.flink.api.java.typeutils.TypeExtractor.privateCreateTypeInfo(TypeExtractor.java:781)
org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfo(TypeExtractor.java:735)
org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfo(TypeExtractor.java:731)
...

On Sun, Oct 25, 2020 at 6:24 PM Rex Fenley <[hidden email]> wrote:
Imports:

import java.util.Date
import org.apache.flink.api.scala._
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.typeutils.RowTypeInfo
import org.apache.flink.table.api.{
DataTypes,
EnvironmentSettings,
TableEnvironment,
TableSchema
}
import org.apache.flink.table.api.bridge.scala._
import org.apache.flink.table.data.RowData
import org.apache.flink.streaming.api.scala.{
DataStream,
StreamExecutionEnvironment
}
import org.apache.flink.types.Row
import org.apache.flink.table.api._
import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchUpsertTableSinkFactoryBase
import org.apache.flink.streaming.connectors.elasticsearch7.Elasticsearch7UpsertTableSink
import org.apache.flink.streaming.connectors.elasticsearch7.Elasticsearch7UpsertTableSinkFactory
import org.apache.flink.table.descriptors.Elasticsearch
import org.apache.flink.table.descriptors._
import org.apache.flink.table.factories.SerializationFormatFactory
import org.apache.flink.formats.json
import org.apache.flink.table.functions.AggregateFunction
import org.apache.flink.types.ListValue
import scala.collection.mutable

On Sun, Oct 25, 2020 at 6:09 PM Rex Fenley <[hidden email]> wrote:
Hello,

I'm trying to create an Accumulator that takes in 2 columns, where each "value" is therefore a tuple, and results in a tuple of 2 arrays yet no matter what I try I receive an error trace like the one below.

(Oddly, using almost the same pattern with 1 input column (a Long for "value")  and 1 Array of Longs result/output works perfectly fine, so I'm not sure why suddenly using a Tuple should make much of any difference.)

please help.

Error trace:

org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Given parameters do not match any signature.
Actual: (java.lang.Long, java.lang.Boolean)
Expected: (scala.Tuple2)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)
at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
Caused by: org.apache.flink.table.api.ValidationException: Given parameters do not match any signature.
Actual: (java.lang.Long, java.lang.Boolean)
Expected: (scala.Tuple2)
at org.apache.flink.table.planner.expressions.PlannerTypeInferenceUtilImpl.validateArguments(PlannerTypeInferenceUtilImpl.java:112)
at org.apache.flink.table.planner.expressions.PlannerTypeInferenceUtilImpl.runTypeInference(PlannerTypeInferenceUtilImpl.java:71)
at org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.runLegacyTypeInference(ResolveCallByArgumentsRule.java:218)
at org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.lambda$visit$2(ResolveCallByArgumentsRule.java:134)
at java.base/java.util.Optional.orElseGet(Optional.java:369)
at org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:134)
at org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:89)
at org.apache.flink.table.expressions.ApiExpressionVisitor.visit(ApiExpressionVisitor.java:39)
at org.apache.flink.table.expressions.UnresolvedCallExpression.accept(UnresolvedCallExpression.java:132)
at org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:124)
at org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:89)
at org.apache.flink.table.expressions.ApiExpressionVisitor.visit(ApiExpressionVisitor.java:39)
at org.apache.flink.table.expressions.UnresolvedCallExpression.accept(UnresolvedCallExpression.java:132)
at org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule.lambda$apply$0(ResolveCallByArgumentsRule.java:83)
at java.base/java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:271)
at java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1655)
at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913)
at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578)
at org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule.apply(ResolveCallByArgumentsRule.java:84)
at org.apache.flink.table.expressions.resolver.ExpressionResolver.lambda$null$1(ExpressionResolver.java:211)
at java.base/java.util.function.Function.lambda$andThen$1(Function.java:88)
at org.apache.flink.table.expressions.resolver.ExpressionResolver.resolve(ExpressionResolver.java:178)
at org.apache.flink.table.operations.utils.OperationTreeBuilder.aggregate(OperationTreeBuilder.java:237)
at org.apache.flink.table.operations.utils.OperationTreeBuilder.aggregate(OperationTreeBuilder.java:528)
at org.apache.flink.table.api.internal.TableImpl$AggregatedTableImpl.select(TableImpl.java:685)
at remind.Job$.main(Job.scala:181)
at remind.Job.main(Job.scala)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)

This is my code:

case class MembershipsIDsAcc(
var subscriberIDs: mutable.Set[Long],
var ownerIDs: mutable.Set[Long]
)
class MembershipsIDsAgg extends AggregateFunction[Row, MembershipsIDsAcc] {

override def createAccumulator(): MembershipsIDsAcc =
MembershipsIDsAcc(mutable.Set(), mutable.Set())

def accumulate(acc: MembershipsIDsAcc, value: (Long, Boolean)): Unit = {
acc.subscriberIDs.add(value._1)
if (value._2) {
acc.ownerIDs.add(value._1)
} else {
acc.ownerIDs.remove(value._1)
}
}

def retract(acc: MembershipsIDsAcc, value: (Long, Boolean)): Unit = {
acc.subscriberIDs.remove(value._1)
acc.ownerIDs.remove(value._1)
}

def resetAccumulator(acc: MembershipsIDsAcc): Unit = {
acc.subscriberIDs = mutable.Set()
acc.ownerIDs = mutable.Set()
}

override def getValue(acc: MembershipsIDsAcc): Row = {
Row.of(acc.subscriberIDs.toArray, acc.ownerIDs.toArray)
}

override def getResultType: TypeInformation[Row] = {
new RowTypeInfo(
createTypeInformation[Array[Long]],
createTypeInformation[Array[Long]]
)
}
}

// Usage
...
val membershipsByGroupId =
membershipsNotDeletedTable
.groupBy($"group_id")
.aggregate(
membershipsIDsAgg(
$"user_id",
$"owner"
) as ("subscriber_ids", "owner_ids")
)
.select($"group_id", $"subscriber_ids", $"owner_ids")
...

--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US

Reply | Threaded
Open this post in threaded view
|

Re: Failing to create Accumulator over multiple columns

Xingbo Huang
Hi,
I think you can directly declare `def accumulate(acc: MembershipsIDsAcc, value1: Long, value2: Boolean)`

Best,
Xingbo

Rex Fenley <[hidden email]> 于2020年10月26日周一 上午9:28写道:
If I switch accumulate to the following:
def accumulate(acc: MembershipsIDsAcc, value: org.apache.flink.api.java.tuple.Tuple2[java.lang.Long, java.lang.Boolean]): Unit = {...}


I instead receive:

Tuple needs to be parameterized by using generics.
org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:847)
org.apache.flink.api.java.typeutils.TypeExtractor.privateCreateTypeInfo(TypeExtractor.java:781)
org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfo(TypeExtractor.java:735)
org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfo(TypeExtractor.java:731)
...

On Sun, Oct 25, 2020 at 6:24 PM Rex Fenley <[hidden email]> wrote:
Imports:

import java.util.Date
import org.apache.flink.api.scala._
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.typeutils.RowTypeInfo
import org.apache.flink.table.api.{
DataTypes,
EnvironmentSettings,
TableEnvironment,
TableSchema
}
import org.apache.flink.table.api.bridge.scala._
import org.apache.flink.table.data.RowData
import org.apache.flink.streaming.api.scala.{
DataStream,
StreamExecutionEnvironment
}
import org.apache.flink.types.Row
import org.apache.flink.table.api._
import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchUpsertTableSinkFactoryBase
import org.apache.flink.streaming.connectors.elasticsearch7.Elasticsearch7UpsertTableSink
import org.apache.flink.streaming.connectors.elasticsearch7.Elasticsearch7UpsertTableSinkFactory
import org.apache.flink.table.descriptors.Elasticsearch
import org.apache.flink.table.descriptors._
import org.apache.flink.table.factories.SerializationFormatFactory
import org.apache.flink.formats.json
import org.apache.flink.table.functions.AggregateFunction
import org.apache.flink.types.ListValue
import scala.collection.mutable

On Sun, Oct 25, 2020 at 6:09 PM Rex Fenley <[hidden email]> wrote:
Hello,

I'm trying to create an Accumulator that takes in 2 columns, where each "value" is therefore a tuple, and results in a tuple of 2 arrays yet no matter what I try I receive an error trace like the one below.

(Oddly, using almost the same pattern with 1 input column (a Long for "value")  and 1 Array of Longs result/output works perfectly fine, so I'm not sure why suddenly using a Tuple should make much of any difference.)

please help.

Error trace:

org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Given parameters do not match any signature.
Actual: (java.lang.Long, java.lang.Boolean)
Expected: (scala.Tuple2)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)
at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
Caused by: org.apache.flink.table.api.ValidationException: Given parameters do not match any signature.
Actual: (java.lang.Long, java.lang.Boolean)
Expected: (scala.Tuple2)
at org.apache.flink.table.planner.expressions.PlannerTypeInferenceUtilImpl.validateArguments(PlannerTypeInferenceUtilImpl.java:112)
at org.apache.flink.table.planner.expressions.PlannerTypeInferenceUtilImpl.runTypeInference(PlannerTypeInferenceUtilImpl.java:71)
at org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.runLegacyTypeInference(ResolveCallByArgumentsRule.java:218)
at org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.lambda$visit$2(ResolveCallByArgumentsRule.java:134)
at java.base/java.util.Optional.orElseGet(Optional.java:369)
at org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:134)
at org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:89)
at org.apache.flink.table.expressions.ApiExpressionVisitor.visit(ApiExpressionVisitor.java:39)
at org.apache.flink.table.expressions.UnresolvedCallExpression.accept(UnresolvedCallExpression.java:132)
at org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:124)
at org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:89)
at org.apache.flink.table.expressions.ApiExpressionVisitor.visit(ApiExpressionVisitor.java:39)
at org.apache.flink.table.expressions.UnresolvedCallExpression.accept(UnresolvedCallExpression.java:132)
at org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule.lambda$apply$0(ResolveCallByArgumentsRule.java:83)
at java.base/java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:271)
at java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1655)
at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913)
at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578)
at org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule.apply(ResolveCallByArgumentsRule.java:84)
at org.apache.flink.table.expressions.resolver.ExpressionResolver.lambda$null$1(ExpressionResolver.java:211)
at java.base/java.util.function.Function.lambda$andThen$1(Function.java:88)
at org.apache.flink.table.expressions.resolver.ExpressionResolver.resolve(ExpressionResolver.java:178)
at org.apache.flink.table.operations.utils.OperationTreeBuilder.aggregate(OperationTreeBuilder.java:237)
at org.apache.flink.table.operations.utils.OperationTreeBuilder.aggregate(OperationTreeBuilder.java:528)
at org.apache.flink.table.api.internal.TableImpl$AggregatedTableImpl.select(TableImpl.java:685)
at remind.Job$.main(Job.scala:181)
at remind.Job.main(Job.scala)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)

This is my code:

case class MembershipsIDsAcc(
var subscriberIDs: mutable.Set[Long],
var ownerIDs: mutable.Set[Long]
)
class MembershipsIDsAgg extends AggregateFunction[Row, MembershipsIDsAcc] {

override def createAccumulator(): MembershipsIDsAcc =
MembershipsIDsAcc(mutable.Set(), mutable.Set())

def accumulate(acc: MembershipsIDsAcc, value: (Long, Boolean)): Unit = {
acc.subscriberIDs.add(value._1)
if (value._2) {
acc.ownerIDs.add(value._1)
} else {
acc.ownerIDs.remove(value._1)
}
}

def retract(acc: MembershipsIDsAcc, value: (Long, Boolean)): Unit = {
acc.subscriberIDs.remove(value._1)
acc.ownerIDs.remove(value._1)
}

def resetAccumulator(acc: MembershipsIDsAcc): Unit = {
acc.subscriberIDs = mutable.Set()
acc.ownerIDs = mutable.Set()
}

override def getValue(acc: MembershipsIDsAcc): Row = {
Row.of(acc.subscriberIDs.toArray, acc.ownerIDs.toArray)
}

override def getResultType: TypeInformation[Row] = {
new RowTypeInfo(
createTypeInformation[Array[Long]],
createTypeInformation[Array[Long]]
)
}
}

// Usage
...
val membershipsByGroupId =
membershipsNotDeletedTable
.groupBy($"group_id")
.aggregate(
membershipsIDsAgg(
$"user_id",
$"owner"
) as ("subscriber_ids", "owner_ids")
)
.select($"group_id", $"subscriber_ids", $"owner_ids")
...

--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US

Reply | Threaded
Open this post in threaded view
|

Re: Failing to create Accumulator over multiple columns

Rex Fenley
Thanks! Looks like that worked!

Fwiw, the error message is very confusing. Is there any way this can be improved?

Thanks :)

On Sun, Oct 25, 2020 at 6:42 PM Xingbo Huang <[hidden email]> wrote:
Hi,
I think you can directly declare `def accumulate(acc: MembershipsIDsAcc, value1: Long, value2: Boolean)`

Best,
Xingbo

Rex Fenley <[hidden email]> 于2020年10月26日周一 上午9:28写道:
If I switch accumulate to the following:
def accumulate(acc: MembershipsIDsAcc, value: org.apache.flink.api.java.tuple.Tuple2[java.lang.Long, java.lang.Boolean]): Unit = {...}


I instead receive:

Tuple needs to be parameterized by using generics.
org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:847)
org.apache.flink.api.java.typeutils.TypeExtractor.privateCreateTypeInfo(TypeExtractor.java:781)
org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfo(TypeExtractor.java:735)
org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfo(TypeExtractor.java:731)
...

On Sun, Oct 25, 2020 at 6:24 PM Rex Fenley <[hidden email]> wrote:
Imports:

import java.util.Date
import org.apache.flink.api.scala._
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.typeutils.RowTypeInfo
import org.apache.flink.table.api.{
DataTypes,
EnvironmentSettings,
TableEnvironment,
TableSchema
}
import org.apache.flink.table.api.bridge.scala._
import org.apache.flink.table.data.RowData
import org.apache.flink.streaming.api.scala.{
DataStream,
StreamExecutionEnvironment
}
import org.apache.flink.types.Row
import org.apache.flink.table.api._
import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchUpsertTableSinkFactoryBase
import org.apache.flink.streaming.connectors.elasticsearch7.Elasticsearch7UpsertTableSink
import org.apache.flink.streaming.connectors.elasticsearch7.Elasticsearch7UpsertTableSinkFactory
import org.apache.flink.table.descriptors.Elasticsearch
import org.apache.flink.table.descriptors._
import org.apache.flink.table.factories.SerializationFormatFactory
import org.apache.flink.formats.json
import org.apache.flink.table.functions.AggregateFunction
import org.apache.flink.types.ListValue
import scala.collection.mutable

On Sun, Oct 25, 2020 at 6:09 PM Rex Fenley <[hidden email]> wrote:
Hello,

I'm trying to create an Accumulator that takes in 2 columns, where each "value" is therefore a tuple, and results in a tuple of 2 arrays yet no matter what I try I receive an error trace like the one below.

(Oddly, using almost the same pattern with 1 input column (a Long for "value")  and 1 Array of Longs result/output works perfectly fine, so I'm not sure why suddenly using a Tuple should make much of any difference.)

please help.

Error trace:

org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Given parameters do not match any signature.
Actual: (java.lang.Long, java.lang.Boolean)
Expected: (scala.Tuple2)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)
at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
Caused by: org.apache.flink.table.api.ValidationException: Given parameters do not match any signature.
Actual: (java.lang.Long, java.lang.Boolean)
Expected: (scala.Tuple2)
at org.apache.flink.table.planner.expressions.PlannerTypeInferenceUtilImpl.validateArguments(PlannerTypeInferenceUtilImpl.java:112)
at org.apache.flink.table.planner.expressions.PlannerTypeInferenceUtilImpl.runTypeInference(PlannerTypeInferenceUtilImpl.java:71)
at org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.runLegacyTypeInference(ResolveCallByArgumentsRule.java:218)
at org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.lambda$visit$2(ResolveCallByArgumentsRule.java:134)
at java.base/java.util.Optional.orElseGet(Optional.java:369)
at org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:134)
at org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:89)
at org.apache.flink.table.expressions.ApiExpressionVisitor.visit(ApiExpressionVisitor.java:39)
at org.apache.flink.table.expressions.UnresolvedCallExpression.accept(UnresolvedCallExpression.java:132)
at org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:124)
at org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:89)
at org.apache.flink.table.expressions.ApiExpressionVisitor.visit(ApiExpressionVisitor.java:39)
at org.apache.flink.table.expressions.UnresolvedCallExpression.accept(UnresolvedCallExpression.java:132)
at org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule.lambda$apply$0(ResolveCallByArgumentsRule.java:83)
at java.base/java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:271)
at java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1655)
at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913)
at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578)
at org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule.apply(ResolveCallByArgumentsRule.java:84)
at org.apache.flink.table.expressions.resolver.ExpressionResolver.lambda$null$1(ExpressionResolver.java:211)
at java.base/java.util.function.Function.lambda$andThen$1(Function.java:88)
at org.apache.flink.table.expressions.resolver.ExpressionResolver.resolve(ExpressionResolver.java:178)
at org.apache.flink.table.operations.utils.OperationTreeBuilder.aggregate(OperationTreeBuilder.java:237)
at org.apache.flink.table.operations.utils.OperationTreeBuilder.aggregate(OperationTreeBuilder.java:528)
at org.apache.flink.table.api.internal.TableImpl$AggregatedTableImpl.select(TableImpl.java:685)
at remind.Job$.main(Job.scala:181)
at remind.Job.main(Job.scala)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)

This is my code:

case class MembershipsIDsAcc(
var subscriberIDs: mutable.Set[Long],
var ownerIDs: mutable.Set[Long]
)
class MembershipsIDsAgg extends AggregateFunction[Row, MembershipsIDsAcc] {

override def createAccumulator(): MembershipsIDsAcc =
MembershipsIDsAcc(mutable.Set(), mutable.Set())

def accumulate(acc: MembershipsIDsAcc, value: (Long, Boolean)): Unit = {
acc.subscriberIDs.add(value._1)
if (value._2) {
acc.ownerIDs.add(value._1)
} else {
acc.ownerIDs.remove(value._1)
}
}

def retract(acc: MembershipsIDsAcc, value: (Long, Boolean)): Unit = {
acc.subscriberIDs.remove(value._1)
acc.ownerIDs.remove(value._1)
}

def resetAccumulator(acc: MembershipsIDsAcc): Unit = {
acc.subscriberIDs = mutable.Set()
acc.ownerIDs = mutable.Set()
}

override def getValue(acc: MembershipsIDsAcc): Row = {
Row.of(acc.subscriberIDs.toArray, acc.ownerIDs.toArray)
}

override def getResultType: TypeInformation[Row] = {
new RowTypeInfo(
createTypeInformation[Array[Long]],
createTypeInformation[Array[Long]]
)
}
}

// Usage
...
val membershipsByGroupId =
membershipsNotDeletedTable
.groupBy($"group_id")
.aggregate(
membershipsIDsAgg(
$"user_id",
$"owner"
) as ("subscriber_ids", "owner_ids")
)
.select($"group_id", $"subscriber_ids", $"owner_ids")
...

--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US