Hi!
I have two streams that I connect and call keyBy after. aStream .connect(bStream) .keyBy(aKeySelector, bKeySelector) I put some debugging code in the bKeySelector. Turns out, it gets called twice from different areas. Stacktrace here: https://gist.github.com/JonasGroeger/8ce218ee1c19f0639fa990f43b5f9e2b It contains 1 package which gets keyed twice for some reason from org.apache.flink.streaming.api.scala.KeySelectorWithType.getKey(ConnectedStreams.scala:288)The case class MyPackage has mostly Ints and Strings. The IP addresses however are done with java.net.InetAddressreferences: case class MyPackage(..., ip: java.net.InetAddress, ...)Now for some reason these are 0.0.0.0on the second call of the keying function which looks like this: // myPackage.ip == 8.1.1.9 myPackage.ip.getHostAddress.split('.').head.toInt // 8Also all Arrays have another reference set. Before and after: [B@7fbef45a [B@25bb2aa) How come? |
So I created a minimal working example where this behaviour can still be seen. It is 15 LOC and can be downloaded here: https://github.com/JonasGroeger/flink-inetaddress-zeroed
To run it, use sbt: git clone https://github.com/JonasGroeger/flink-inetaddress-zeroed cd flink-inetaddress-zeroed sbt runIf you don't want to do the above fear not, here is the code: package org.example import java.net.InetAddress import org.apache.flink.api.scala._ import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment case class Person(id: Int, name: String, ip: InetAddress) extends Serializable object PersonStream { def main(args: Array[String]) { val env = StreamExecutionEnvironment.getExecutionEnvironment val personStream = env.fromCollection(Seq( Person(1, "Toby North", InetAddress.getByName("192.168.0.40")) )) // Expected: Person(1,Toby North,/192.168.0.40) // Actual : Person(1,Toby North,/0.0.0.0) personStream.print() env.execute } }For some reason, java.net.InetAddress objects get zeroed. Why is that? |
Hi Jonas, The issue has to do with serializing/deserializing InetAddress. If you look at the InetAddress class the data members that hold the actual ip address are transient fields and such are not serialized/deserialized in the way that you would expect. This is what is causing the issue. I suggest you simply do not use InetAddress in your Person data type but rather a simple string or other properly serializable type for instance. -Jamie On Mon, Jan 9, 2017 at 9:46 AM, Jonas <[hidden email]> wrote: So I created a minimal working example where this behaviour can still be |
Hey Jamie,
It turns out you were right :) I wrote my own implementation of IPAddress and then it worked. |
Free forum by Nabble | Edit this page |