|
Hi!
I have two streams that I connect and call keyBy after.
aStream
.connect(bStream)
.keyBy(aKeySelector, bKeySelector)
I put some debugging code in the bKeySelector. Turns out, it gets called twice from different areas. Stacktrace here: https://gist.github.com/JonasGroeger/8ce218ee1c19f0639fa990f43b5f9e2b It contains 1 package which gets keyed twice for some reason from
org.apache.flink.streaming.api.scala.KeySelectorWithType.getKey(ConnectedStreams.scala:288)The case class MyPackage has mostly Ints and Strings. The IP addresses however are done with java.net.InetAddressreferences:
case class MyPackage(..., ip: java.net.InetAddress, ...)
Now for some reason these are 0.0.0.0on the second call of the keying function which looks like this:
// myPackage.ip == 8.1.1.9
myPackage.ip.getHostAddress.split('.').head.toInt // 8
Also all Arrays have another reference set. Before and after:
[B@7fbef45a
[B@25bb2aa
)
How come? |
|
So I created a minimal working example where this behaviour can still be seen. It is 15 LOC and can be downloaded here: https://github.com/JonasGroeger/flink-inetaddress-zeroed
To run it, use sbt:
git clone https://github.com/JonasGroeger/flink-inetaddress-zeroed
cd flink-inetaddress-zeroed
sbt run
If you don't want to do the above fear not, here is the code:
package org.example
import java.net.InetAddress
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
case class Person(id: Int, name: String, ip: InetAddress) extends Serializable
object PersonStream {
def main(args: Array[String]) {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val personStream = env.fromCollection(Seq(
Person(1, "Toby North", InetAddress.getByName("192.168.0.40"))
))
// Expected: Person(1,Toby North,/192.168.0.40)
// Actual : Person(1,Toby North,/0.0.0.0)
personStream.print()
env.execute
}
}
For some reason, java.net.InetAddress objects get zeroed. Why is that?
|
|
Hi Jonas, The issue has to do with serializing/deserializing InetAddress. If you look at the InetAddress class the data members that hold the actual ip address are transient fields and such are not serialized/deserialized in the way that you would expect. This is what is causing the issue. I suggest you simply do not use InetAddress in your Person data type but rather a simple string or other properly serializable type for instance. -Jamie On Mon, Jan 9, 2017 at 9:46 AM, Jonas <[hidden email]> wrote: So I created a minimal working example where this behaviour can still be |
|
Hey Jamie,
It turns out you were right :) I wrote my own implementation of IPAddress and then it worked. |
| Free forum by Nabble | Edit this page |
