Join tables created from Datastream whose element scala type has field Option[_]

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Join tables created from Datastream whose element scala type has field Option[_]

uuuuuu
Hi, all

I am trying to join two datastream whose element types are respectively
```
case class MyEvent(
  _id: Long = 0L,
  _cId: Long = 0L,
  _url: Option[String] = None,
)
```
and
```
case class MyCategory(
  _id: Long = 0L,
  _name: Option[String] = None,
)
```

When I tried to join those two tables with
```
SELECT * FROM rawCategory INNER JOIN rawEvent ON rawEvent._cId = rawCategory._id
```

The following exception is thrown,

```
Exception in thread "main" org.apache.flink.table.api.ValidationException: Type 'scala.Option' cannot be used in a join operation because it does not implement a proper hashCode() method.
at org.apache.flink.table.typeutils.TypeCheckUtils$.validateEqualsHashCode(TypeCheckUtils.scala:176)
at org.apache.flink.table.typeutils.TypeCheckUtils$.validateEqualsHashCode(TypeCheckUtils.scala:153)
at org.apache.flink.table.typeutils.TypeCheckUtils$.$anonfun$validateEqualsHashCode$1(TypeCheckUtils.scala:149)
at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:158)
at org.apache.flink.table.typeutils.TypeCheckUtils$.validateEqualsHashCode(TypeCheckUtils.scala:147)
at org.apache.flink.table.runtime.join.NonWindowJoin.<init>(NonWindowJoin.scala:57)
at org.apache.flink.table.runtime.join.NonWindowInnerJoin.<init>(NonWindowInnerJoin.scala:50)
at org.apache.flink.table.plan.nodes.datastream.DataStreamJoinToCoProcessTranslator.createJoinOperator(DataStreamJoinToCoProcessTranslator.scala:118)
at org.apache.flink.table.plan.nodes.datastream.DataStreamJoinToCoProcessTranslator.getJoinOperator(DataStreamJoinToCoProcessTranslator.scala:102)
at org.apache.flink.table.plan.nodes.datastream.DataStreamJoin.translateToPlan(DataStreamJoin.scala:119)
at org.apache.flink.table.planner.StreamPlanner.translateToCRow(StreamPlanner.scala:251)
at org.apache.flink.table.planner.StreamPlanner.translateOptimized(StreamPlanner.scala:412)
at org.apache.flink.table.planner.StreamPlanner.translateToType(StreamPlanner.scala:402)
at org.apache.flink.table.planner.StreamPlanner.translate(StreamPlanner.scala:185)
at org.apache.flink.table.planner.StreamPlanner.$anonfun$translate$1(StreamPlanner.scala:117)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:273)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
at scala.collection.IterableLike.foreach(IterableLike.scala:74)
at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
at scala.collection.TraversableLike.map(TraversableLike.scala:273)
at scala.collection.TraversableLike.map$(TraversableLike.scala:266)
at scala.collection.AbstractTraversable.map(Traversable.scala:108)
at org.apache.flink.table.planner.StreamPlanner.translate(StreamPlanner.scala:117)
at org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.scala:210)
at org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.scala:107)
at org.apache.flink.table.api.scala.TableConversions.toAppendStream(TableConversions.scala:101)
at io.redacted.sub.package$.testJoin(package.scala:143)
at io.redacted.sub.package$.process(package.scala:128)
at io.redacted.DataAggregator$.main(DataAggregator.scala:15)
at io.redacted.DataAggregator.main(DataAggregator.scala)

Process finished with exit code 1
```

I tried using vanilla String with null. I encountered several NPE.
My intention is to use Option to indicate some value is missing, just like null in database and hopefully without NPE.

How should I define my data types? And which configuration should I take special care?

Bests,
Yi

Reply | Threaded
Open this post in threaded view
|

Re: Join tables created from Datastream whose element scala type has field Option[_]

Jark Wu-3
Could you try to use blink planner? I guess this works in blink planner. 

Besides, it is suggested to use String with null values instead of Option[String]. 
Flink SQL/Table doesn't know Option and will recognize it as a RAW/Generic type which is rather slower. 
There should be no NPE, otherwise, it might be a bug in Flink SQL. 

Best,
Jark

On Fri, 19 Jun 2020 at 11:08, YI <[hidden email]> wrote:
Hi, all

I am trying to join two datastream whose element types are respectively
```
case class MyEvent(
  _id: Long = 0L,
  _cId: Long = 0L,
  _url: Option[String] = None,
)
```
and
```
case class MyCategory(
  _id: Long = 0L,
  _name: Option[String] = None,
)
```

When I tried to join those two tables with
```
SELECT * FROM rawCategory INNER JOIN rawEvent ON rawEvent._cId = rawCategory._id
```

The following exception is thrown,

```
Exception in thread "main" org.apache.flink.table.api.ValidationException: Type 'scala.Option' cannot be used in a join operation because it does not implement a proper hashCode() method.
at org.apache.flink.table.typeutils.TypeCheckUtils$.validateEqualsHashCode(TypeCheckUtils.scala:176)
at org.apache.flink.table.typeutils.TypeCheckUtils$.validateEqualsHashCode(TypeCheckUtils.scala:153)
at org.apache.flink.table.typeutils.TypeCheckUtils$.$anonfun$validateEqualsHashCode$1(TypeCheckUtils.scala:149)
at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:158)
at org.apache.flink.table.typeutils.TypeCheckUtils$.validateEqualsHashCode(TypeCheckUtils.scala:147)
at org.apache.flink.table.runtime.join.NonWindowJoin.<init>(NonWindowJoin.scala:57)
at org.apache.flink.table.runtime.join.NonWindowInnerJoin.<init>(NonWindowInnerJoin.scala:50)
at org.apache.flink.table.plan.nodes.datastream.DataStreamJoinToCoProcessTranslator.createJoinOperator(DataStreamJoinToCoProcessTranslator.scala:118)
at org.apache.flink.table.plan.nodes.datastream.DataStreamJoinToCoProcessTranslator.getJoinOperator(DataStreamJoinToCoProcessTranslator.scala:102)
at org.apache.flink.table.plan.nodes.datastream.DataStreamJoin.translateToPlan(DataStreamJoin.scala:119)
at org.apache.flink.table.planner.StreamPlanner.translateToCRow(StreamPlanner.scala:251)
at org.apache.flink.table.planner.StreamPlanner.translateOptimized(StreamPlanner.scala:412)
at org.apache.flink.table.planner.StreamPlanner.translateToType(StreamPlanner.scala:402)
at org.apache.flink.table.planner.StreamPlanner.translate(StreamPlanner.scala:185)
at org.apache.flink.table.planner.StreamPlanner.$anonfun$translate$1(StreamPlanner.scala:117)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:273)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
at scala.collection.IterableLike.foreach(IterableLike.scala:74)
at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
at scala.collection.TraversableLike.map(TraversableLike.scala:273)
at scala.collection.TraversableLike.map$(TraversableLike.scala:266)
at scala.collection.AbstractTraversable.map(Traversable.scala:108)
at org.apache.flink.table.planner.StreamPlanner.translate(StreamPlanner.scala:117)
at org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.scala:210)
at org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.scala:107)
at org.apache.flink.table.api.scala.TableConversions.toAppendStream(TableConversions.scala:101)
at io.redacted.sub.package$.testJoin(package.scala:143)
at io.redacted.sub.package$.process(package.scala:128)
at io.redacted.DataAggregator$.main(DataAggregator.scala:15)
at io.redacted.DataAggregator.main(DataAggregator.scala)

Process finished with exit code 1
```

I tried using vanilla String with null. I encountered several NPE.
My intention is to use Option to indicate some value is missing, just like null in database and hopefully without NPE.

How should I define my data types? And which configuration should I take special care?

Bests,
Yi