Hi,
I'm having trouble integrating existing Scala code with Flink, due to POJO-only requirement. We're using AnyVal heavily for type safety, and immutable classes as a default. For example, the following does not work: object Test { class Id(val underlying: Int) extends AnyVal class X(var id: Id) { def this() { this(new Id(0)) } } class MySource extends SourceFunction[X] { def run(ctx: SourceFunction.SourceContext[X]) { ctx.collect(new X(new Id(1))) } def cancel() {} } def main(args: Array[String]) { val env = StreamExecutionContext.getExecutionContext env.addSource(new MySource).print env.execute("Test") } } Currently I'm thinking that I would need to have duplicate classes and code for Flint and for non-Flint code, or somehow use immutable interfaces for non-Flint code. Both ways are expensive in terms of development time. Would you have any guidance on how to integrate Flink with a code base that has immutability as a norm? Thanks |
Hi Jack! This should be supported, there is no strict requirement for mutable types. The POJO rules apply only if you want to use the "by-field-name" addressing for keys. In Scala, you should be able to use case classes as well, even if they are immutable. Can you post the exception that you get? Greetings, Stephan On Wed, Sep 23, 2015 at 1:29 PM, Jack <[hidden email]> wrote: Hi, |
Hi Stephan! Here's the trace (flink 0.9.1 + scala 2.10.5) Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed. at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:314) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36) at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) at org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29) at akka.actor.Actor$class.aroundReceive(Actor.scala:465) at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at akka.actor.ActorCell.invoke(ActorCell.scala:487) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) at akka.dispatch.Mailbox.run(Mailbox.scala:221) at akka.dispatch.Mailbox.exec(Mailbox.scala:231) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: java.lang.RuntimeException: Cannot instantiate StreamRecord. at org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.createInstance(StreamRecordSerializer.java:63) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.readNext(OneInputStreamTask.java:66) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.invoke(OneInputStreamTask.java:101) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.RuntimeException: Cannot instantiate class. at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.createInstance(PojoSerializer.java:225) at org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.createInstance(StreamRecordSerializer.java:60) ... 4 more Caused by: java.lang.IllegalArgumentException: Can not set int field org.myorg.quickstart.Test$X.id to org.myorg.quickstart.Test$Id at sun.reflect.UnsafeFieldAccessorImpl.throwSetIllegalArgumentException(UnsafeFieldAccessorImpl.java:167) at sun.reflect.UnsafeFieldAccessorImpl.throwSetIllegalArgumentException(UnsafeFieldAccessorImpl.java:171) at sun.reflect.UnsafeIntegerFieldAccessorImpl.set(UnsafeIntegerFieldAccessorImpl.java:98) at java.lang.reflect.Field.set(Field.java:764) at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.initializeFields(PojoSerializer.java:232) at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.createInstance(PojoSerializer.java:221) ... 5 more
|
In reply to this post by Stephan Ewen
Hi Jack, Stephan is right, this should work. Unfortunately the TypeAnalyzer does not correctly detect that it cannot treat your Id class as a Pojo. I will add a Jira issue for that. For the time being you can use this command to force the system to use Kryo: I hope this helps. Regards, Aljoscha On Wed, 23 Sep 2015 at 13:37 Stephan Ewen <[hidden email]> wrote:
|
Thanks Aljoscha, that works! I tried passing values to base class constructor. Modifying the previous example slightly: class Base(var a: Int) class X(b: Int) extends Base(b) { this() { this(0) } } The code runs (even without Kryo) but compiler complains about b being immutable.
|
Free forum by Nabble | Edit this page |