keyBy called twice. Second time, INetAddress and Array[Byte] are empty

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

keyBy called twice. Second time, INetAddress and Array[Byte] are empty

Jonas Gröger
Hi!

I have two streams that I connect and call keyBy after.
    aStream
      .connect(bStream)
      .keyBy(aKeySelector, bKeySelector)

I put some debugging code in the bKeySelector. Turns out, it gets called twice from different areas. Stacktrace here: https://gist.github.com/JonasGroeger/8ce218ee1c19f0639fa990f43b5f9e2b

It contains 1 package which gets keyed twice for some reason from
    org.apache.flink.streaming.api.scala.KeySelectorWithType.getKey(ConnectedStreams.scala:288)
The case class MyPackage has mostly Ints and Strings. The IP addresses however are done with
java.net.InetAddress
 references:
    case class MyPackage(..., ip: java.net.InetAddress, ...)
Now for some reason these are
0.0.0.0
on the second call of the keying function which looks like this:
    // myPackage.ip == 8.1.1.9
    myPackage.ip.getHostAddress.split('.').head.toInt // 8
Also all Arrays have another reference set. Before and after:
    [B@7fbef45a
    [B@25bb2aa
)

How come?
Reply | Threaded
Open this post in threaded view
|

Re: keyBy called twice. Second time, INetAddress and Array[Byte] are empty

Jonas Gröger
So I created a minimal working example where this behaviour can still be seen. It is 15 LOC and can be downloaded here: https://github.com/JonasGroeger/flink-inetaddress-zeroed

To run it, use sbt:
    git clone https://github.com/JonasGroeger/flink-inetaddress-zeroed
    cd flink-inetaddress-zeroed
    sbt run
If you don't want to do the above fear not, here is the code:
    package org.example

    import java.net.InetAddress

    import org.apache.flink.api.scala._
    import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment

    case class Person(id: Int, name: String, ip: InetAddress) extends Serializable

    object PersonStream {
      def main(args: Array[String]) {
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        val personStream = env.fromCollection(Seq(
          Person(1, "Toby North", InetAddress.getByName("192.168.0.40"))
        ))

        // Expected: Person(1,Toby North,/192.168.0.40)
        // Actual  : Person(1,Toby North,/0.0.0.0)
        personStream.print()

        env.execute
      }
    }
For some reason, java.net.InetAddress objects get zeroed. Why is that?
Reply | Threaded
Open this post in threaded view
|

Re: keyBy called twice. Second time, INetAddress and Array[Byte] are empty

Jamie Grier
Hi Jonas,

The issue has to do with serializing/deserializing InetAddress.  If you look at the InetAddress class the data members that hold the actual ip address are transient fields and such are not serialized/deserialized in the way that you would expect.  This is what is causing the issue.

I suggest you simply do not use InetAddress in your Person data type but rather a simple string or other properly serializable type for instance.

-Jamie


On Mon, Jan 9, 2017 at 9:46 AM, Jonas <[hidden email]> wrote:
So I created a minimal working example where this behaviour can still be
seen. It is 15 LOC and can be downloaded here:
https://github.com/JonasGroeger/flink-inetaddress-zeroed

To run it, use sbt:

If you don't want to do the above fear not, here is the code:

For some reason, java.net.InetAddress objects get zeroed. Why is that?



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/keyBy-called-twice-Second-time-INetAddress-and-Array-Byte-are-empty-tp10907p10947.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.



--

Jamie Grier
data Artisans, Director of Applications Engineering

Reply | Threaded
Open this post in threaded view
|

Re: keyBy called twice. Second time, INetAddress and Array[Byte] are empty

Jonas Gröger
Hey Jamie,

It turns out you were right :) I wrote my own implementation of IPAddress and then it worked.