Immutable data

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Immutable data

Jack
Hi,

I'm having trouble integrating existing Scala code with Flink, due to POJO-only requirement.

We're using AnyVal heavily for type safety, and immutable classes as a default. For example, the following does not work:

object Test {
  class Id(val underlying: Int) extends AnyVal

  class X(var id: Id) {
    def this() { this(new Id(0)) }
  }

  class MySource extends SourceFunction[X] {
    def run(ctx: SourceFunction.SourceContext[X]) {
      ctx.collect(new X(new Id(1)))
    }
    def cancel() {}
  }

  def main(args: Array[String]) {
    val env = StreamExecutionContext.getExecutionContext
    env.addSource(new MySource).print
    env.execute("Test")
  }
}

Currently I'm thinking that I would need to have duplicate classes and code for Flint and for non-Flint code, or somehow use immutable interfaces for non-Flint code. Both ways are expensive in terms of development time.

Would you have any guidance on how to integrate Flink with a code base that has immutability as a norm?

Thanks
Reply | Threaded
Open this post in threaded view
|

Re: Immutable data

Stephan Ewen
Hi Jack!

This should be supported, there is no strict requirement for mutable types.

The POJO rules apply only if you want to use the "by-field-name" addressing for keys. In Scala, you should be able to use case classes as well, even if they are immutable.

Can you post the exception that you get?

Greetings,
Stephan


On Wed, Sep 23, 2015 at 1:29 PM, Jack <[hidden email]> wrote:
Hi,

I'm having trouble integrating existing Scala code with Flink, due to POJO-only requirement.

We're using AnyVal heavily for type safety, and immutable classes as a default. For example, the following does not work:

object Test {
  class Id(val underlying: Int) extends AnyVal

  class X(var id: Id) {
    def this() { this(new Id(0)) }
  }

  class MySource extends SourceFunction[X] {
    def run(ctx: SourceFunction.SourceContext[X]) {
      ctx.collect(new X(new Id(1)))
    }
    def cancel() {}
  }

  def main(args: Array[String]) {
    val env = StreamExecutionContext.getExecutionContext
    env.addSource(new MySource).print
    env.execute("Test")
  }
}

Currently I'm thinking that I would need to have duplicate classes and code for Flint and for non-Flint code, or somehow use immutable interfaces for non-Flint code. Both ways are expensive in terms of development time.

Would you have any guidance on how to integrate Flink with a code base that has immutability as a norm?

Thanks

Reply | Threaded
Open this post in threaded view
|

Re: Immutable data

Jack
Hi Stephan!

Here's the trace (flink 0.9.1 + scala 2.10.5)

Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:314)
at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36)
at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
at org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)
at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
at akka.dispatch.Mailbox.run(Mailbox.scala:221)
at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.RuntimeException: Cannot instantiate StreamRecord.
at org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.createInstance(StreamRecordSerializer.java:63)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.readNext(OneInputStreamTask.java:66)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.invoke(OneInputStreamTask.java:101)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: Cannot instantiate class.
at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.createInstance(PojoSerializer.java:225)
at org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.createInstance(StreamRecordSerializer.java:60)
... 4 more
Caused by: java.lang.IllegalArgumentException: Can not set int field org.myorg.quickstart.Test$X.id to org.myorg.quickstart.Test$Id
at sun.reflect.UnsafeFieldAccessorImpl.throwSetIllegalArgumentException(UnsafeFieldAccessorImpl.java:167)
at sun.reflect.UnsafeFieldAccessorImpl.throwSetIllegalArgumentException(UnsafeFieldAccessorImpl.java:171)
at sun.reflect.UnsafeIntegerFieldAccessorImpl.set(UnsafeIntegerFieldAccessorImpl.java:98)
at java.lang.reflect.Field.set(Field.java:764)
at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.initializeFields(PojoSerializer.java:232)
at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.createInstance(PojoSerializer.java:221)
... 5 more

On 23 Sep 2015, at 14:37, Stephan Ewen <[hidden email]> wrote:

Hi Jack!

This should be supported, there is no strict requirement for mutable types.

The POJO rules apply only if you want to use the "by-field-name" addressing for keys. In Scala, you should be able to use case classes as well, even if they are immutable.

Can you post the exception that you get?

Greetings,
Stephan


On Wed, Sep 23, 2015 at 1:29 PM, Jack <[hidden email]> wrote:
Hi,

I'm having trouble integrating existing Scala code with Flink, due to POJO-only requirement.

We're using AnyVal heavily for type safety, and immutable classes as a default. For example, the following does not work:

object Test {
  class Id(val underlying: Int) extends AnyVal

  class X(var id: Id) {
    def this() { this(new Id(0)) }
  }

  class MySource extends SourceFunction[X] {
    def run(ctx: SourceFunction.SourceContext[X]) {
      ctx.collect(new X(new Id(1)))
    }
    def cancel() {}
  }

  def main(args: Array[String]) {
    val env = StreamExecutionContext.getExecutionContext
    env.addSource(new MySource).print
    env.execute("Test")
  }
}

Currently I'm thinking that I would need to have duplicate classes and code for Flint and for non-Flint code, or somehow use immutable interfaces for non-Flint code. Both ways are expensive in terms of development time.

Would you have any guidance on how to integrate Flink with a code base that has immutability as a norm?

Thanks

Reply | Threaded
Open this post in threaded view
|

Re: Immutable data

Aljoscha Krettek
In reply to this post by Stephan Ewen
Hi Jack,
Stephan is right, this should work. Unfortunately the TypeAnalyzer does not correctly detect that it cannot treat your Id class as a Pojo. I will add a Jira issue for that. For the time being you can use this command to force the system to use Kryo:

env.getConfig.enableForceKryo();

I hope this helps.

Regards,
Aljoscha


On Wed, 23 Sep 2015 at 13:37 Stephan Ewen <[hidden email]> wrote:
Hi Jack!

This should be supported, there is no strict requirement for mutable types.

The POJO rules apply only if you want to use the "by-field-name" addressing for keys. In Scala, you should be able to use case classes as well, even if they are immutable.

Can you post the exception that you get?

Greetings,
Stephan


On Wed, Sep 23, 2015 at 1:29 PM, Jack <[hidden email]> wrote:
Hi,

I'm having trouble integrating existing Scala code with Flink, due to POJO-only requirement.

We're using AnyVal heavily for type safety, and immutable classes as a default. For example, the following does not work:

object Test {
  class Id(val underlying: Int) extends AnyVal

  class X(var id: Id) {
    def this() { this(new Id(0)) }
  }

  class MySource extends SourceFunction[X] {
    def run(ctx: SourceFunction.SourceContext[X]) {
      ctx.collect(new X(new Id(1)))
    }
    def cancel() {}
  }

  def main(args: Array[String]) {
    val env = StreamExecutionContext.getExecutionContext
    env.addSource(new MySource).print
    env.execute("Test")
  }
}

Currently I'm thinking that I would need to have duplicate classes and code for Flint and for non-Flint code, or somehow use immutable interfaces for non-Flint code. Both ways are expensive in terms of development time.

Would you have any guidance on how to integrate Flink with a code base that has immutability as a norm?

Thanks

Reply | Threaded
Open this post in threaded view
|

Re: Immutable data

Jack
Thanks Aljoscha, that works!

I tried passing values to base class constructor. Modifying the previous example slightly:

class Base(var a: Int)
class X(b: Int) extends Base(b) {
 this() { this(0) }
}

The code runs (even without Kryo) but compiler complains about b being immutable. 

On 23 Sep 2015, at 15:02, Aljoscha Krettek <[hidden email]> wrote:

Hi Jack,
Stephan is right, this should work. Unfortunately the TypeAnalyzer does not correctly detect that it cannot treat your Id class as a Pojo. I will add a Jira issue for that. For the time being you can use this command to force the system to use Kryo:

env.getConfig.enableForceKryo();

I hope this helps.

Regards,
Aljoscha


On Wed, 23 Sep 2015 at 13:37 Stephan Ewen <[hidden email]> wrote:
Hi Jack!

This should be supported, there is no strict requirement for mutable types.

The POJO rules apply only if you want to use the "by-field-name" addressing for keys. In Scala, you should be able to use case classes as well, even if they are immutable.

Can you post the exception that you get?

Greetings,
Stephan


On Wed, Sep 23, 2015 at 1:29 PM, Jack <[hidden email]> wrote:
Hi,

I'm having trouble integrating existing Scala code with Flink, due to POJO-only requirement.

We're using AnyVal heavily for type safety, and immutable classes as a default. For example, the following does not work:

object Test {
  class Id(val underlying: Int) extends AnyVal

  class X(var id: Id) {
    def this() { this(new Id(0)) }
  }

  class MySource extends SourceFunction[X] {
    def run(ctx: SourceFunction.SourceContext[X]) {
      ctx.collect(new X(new Id(1)))
    }
    def cancel() {}
  }

  def main(args: Array[String]) {
    val env = StreamExecutionContext.getExecutionContext
    env.addSource(new MySource).print
    env.execute("Test")
  }
}

Currently I'm thinking that I would need to have duplicate classes and code for Flint and for non-Flint code, or somehow use immutable interfaces for non-Flint code. Both ways are expensive in terms of development time.

Would you have any guidance on how to integrate Flink with a code base that has immutability as a norm?

Thanks