FYI: The Runner registration has been fixed. The Flink runner
explicitly registers as of [1]. Also, the SDK tries to look up the PipelineRunner class in case it has not been registered [2]. [1] https://github.com/apache/incubator-beam/pull/40 [2] https://github.com/apache/incubator-beam/pull/61 On Sat, Mar 19, 2016 at 6:43 PM, Maximilian Michels <[hidden email]> wrote: > Great to see such a lively discussion here. > > I think we'll support sinks through the Write interface (like in > batched execution) and also have a dedicated wrapper for the Flink > sinks. This is a very pressing but easy to solve issue of the Flink > runner. Expect it to be in next week. > > Also, the proper registration of the runner is about to to be merged. > We just need an ok from the contributor to merge the changes. > > Best, > Max > > On Sat, Mar 19, 2016 at 12:42 AM, Dan Halperin <[hidden email]> wrote: >> Thanks Bill! >> >> Filed https://issues.apache.org/jira/browse/BEAM-136, but I'm glad it's not >> blocking you! >> >> On Fri, Mar 18, 2016 at 4:04 PM, William McCarthy >> <[hidden email]> wrote: >>> >>> I tried that, but still no dice: Just to be clear, it’s not a blocker for >>> me, given that I have my example running, but for your information the >>> exception is below. >>> >>> I’ll watch the commit log on the beam incubator and look forward to >>> deleting my copy of Raghu’s contributions when they’re merger to master. >>> >>> Thanks again for everyone’s help, >>> >>> Bill >>> >>> >>> Command followed by exception: >>> >>> $ flink run -c >>> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample >>> target/beam-1.0-SNAPSHOT.jar >>> --runner=org.apache.beam.runners.flink.FlinkPipelineRunner >>> --bootstrapServers=cl-pu4p:9092 --topics=test_in --outputTopic=test_out >>> >>> ------------------------------------------------------------ >>> The program finished with the following exception: >>> >>> org.apache.flink.client.program.ProgramInvocationException: The main >>> method caused an error. >>> at >>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:520) >>> at >>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403) >>> at org.apache.flink.client.program.Client.runBlocking(Client.java:248) >>> at >>> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866) >>> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333) >>> at >>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189) >>> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239) >>> Caused by: java.lang.IllegalArgumentException: Unknown 'runner' specified >>> 'org.apache.beam.runners.flink.FlinkPipelineRunner', supported pipeline >>> runners [BlockingDataflowPipelineRunner, DataflowPipelineRunner, >>> DirectPipelineRunner] >>> at >>> com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Preconditions.checkArgument(Preconditions.java:146) >>> at >>> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.parseObjects(PipelineOptionsFactory.java:1445) >>> at >>> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.access$400(PipelineOptionsFactory.java:99) >>> at >>> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory$Builder.as(PipelineOptionsFactory.java:284) >>> at >>> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample.main(TopHashtagsExample.java:117) >>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>> at >>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >>> at >>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>> at java.lang.reflect.Method.invoke(Method.java:498) >>> at >>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505) >>> ... 6 more >>> >>> On Mar 18, 2016, at 5:35 PM, Thomas Groh <[hidden email]> wrote: >>> >>> I don't believe the FlinkPipelineRunner is registered the same way the >>> Dataflow & Direct Pipeline runners are registered; using >>> org.apache.beam.runners.flink.FlinkPipelineRunner should work >>> >>> On Fri, Mar 18, 2016 at 2:10 PM, William McCarthy >>> <[hidden email]> wrote: >>>> >>>> Thanks Dan, >>>> >>>> I tried that, but getting the below. Note that the jar contains the >>>> FlinkPipelineRunner. >>>> >>>> >>>> >>>> % jar -tf target/beam-1.0-SNAPSHOT.jar | grep FlinkPipeline >>>> org/apache/beam/runners/flink/FlinkPipelineRunner.class >>>> org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.class >>>> org/apache/beam/runners/flink/FlinkPipelineOptions.class >>>> org/apache/beam/runners/flink/translation/FlinkPipelineTranslator.class >>>> >>>> % flink run -c >>>> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample >>>> target/beam-1.0-SNAPSHOT.jar --runner=FlinkPipelineRunner >>>> --bootstrapServers=cl-pu4p:9092 --topics=test_in --outputTopic=test_out >>>> >>>> ------------------------------------------------------------ >>>> The program finished with the following exception: >>>> >>>> org.apache.flink.client.program.ProgramInvocationException: The main >>>> method caused an error. >>>> at >>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:520) >>>> at >>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403) >>>> at org.apache.flink.client.program.Client.runBlocking(Client.java:248) >>>> at >>>> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866) >>>> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333) >>>> at >>>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189) >>>> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239) >>>> Caused by: java.lang.IllegalArgumentException: Unknown 'runner' specified >>>> 'FlinkPipelineRunner', supported pipeline runners >>>> [BlockingDataflowPipelineRunner, DataflowPipelineRunner, >>>> DirectPipelineRunner] >>>> at >>>> com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Preconditions.checkArgument(Preconditions.java:146) >>>> at >>>> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.parseObjects(PipelineOptionsFactory.java:1445) >>>> at >>>> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.access$400(PipelineOptionsFactory.java:99) >>>> at >>>> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory$Builder.as(PipelineOptionsFactory.java:284) >>>> at >>>> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample.main(TopHashtagsExample.java:117) >>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>>> at >>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >>>> at >>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>>> at java.lang.reflect.Method.invoke(Method.java:498) >>>> at >>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505) >>>> ... 6 more >>>> >>>> >>>> >>>> On Mar 18, 2016, at 5:00 PM, Dan Halperin <[hidden email]> wrote: >>>> >>>> Thanks for catching that, Aljoscha! >>>> >>>> Note that the Flink runner should be available via a command-line option >>>> as well: --runner=FlinkPipelineRunner. >>>> >>>> The list of valid values for that flag is computed by walking the >>>> classpath at runtime, so as long as the Flink jar is present it'll work. >>>> >>>> On Fri, Mar 18, 2016 at 1:21 PM, Aljoscha Krettek <[hidden email]> >>>> wrote: >>>>> >>>>> Hi, >>>>> looks like the example is being executed with the DirectPipelineRunner >>>>> which does not seem to be able to cope with UnboundedSource. You need to set >>>>> the runner to the FlinkRunner in the example code as described here: >>>>> https://github.com/apache/incubator-beam/tree/master/runners/flink#executing-an-example >>>>> >>>>> The Flink runner should be able to deal with UnboundedSource but has the >>>>> limitation that sources are always parallelism=1 (this is being worked on, >>>>> however). >>>>> >>>>> Cheers, >>>>> Aljoscha >>>>> > On 18 Mar 2016, at 20:56, Dan Halperin <[hidden email]> wrote: >>>>> > >>>>> > Looks like the Flink runner may not yet support arbitrary code written >>>>> > with the UnboundedSource API. That is, it looks like the Flink runner >>>>> > expects the sources to get translated away. >>>>> > >>>>> > Max? >>>>> > >>>>> > Dan >>>>> > >>>>> > On Fri, Mar 18, 2016 at 12:20 PM, William McCarthy >>>>> > <[hidden email]> wrote: >>>>> > Thanks Raghu, >>>>> > >>>>> > When I try to run it on flink using the incubator-beam code, i.e. >>>>> > >>>>> > flink run -c >>>>> > com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample >>>>> > target/beam-1.0-SNAPSHOT.jar --bootstrapServers=cl-pu4p:9092 >>>>> > --topics=test_in --outputTopic=test_out >>>>> > >>>>> > I get this: >>>>> > >>>>> > org.apache.flink.client.program.ProgramInvocationException: The main >>>>> > method caused an error. >>>>> > at >>>>> > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:520) >>>>> > at >>>>> > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403) >>>>> > at >>>>> > org.apache.flink.client.program.Client.runBlocking(Client.java:248) >>>>> > at >>>>> > org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866) >>>>> > at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333) >>>>> > at >>>>> > org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189) >>>>> > at >>>>> > org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239) >>>>> > Caused by: java.lang.IllegalStateException: no evaluator registered >>>>> > for Read(UnboundedKafkaSource) >>>>> > at >>>>> > com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner$Evaluator.visitTransform(DirectPipelineRunner.java:852) >>>>> > at >>>>> > com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:219) >>>>> > at >>>>> > com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:215) >>>>> > at >>>>> > com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:215) >>>>> > at >>>>> > com.google.cloud.dataflow.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:102) >>>>> > at >>>>> > com.google.cloud.dataflow.sdk.Pipeline.traverseTopologically(Pipeline.java:259) >>>>> > at >>>>> > com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner$Evaluator.run(DirectPipelineRunner.java:814) >>>>> > at >>>>> > com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:526) >>>>> > at >>>>> > com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:96) >>>>> > at com.google.cloud.dataflow.sdk.Pipeline.run(Pipeline.java:180) >>>>> > at >>>>> > com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample.main(TopHashtagsExample.java:140) >>>>> > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>>>> > at >>>>> > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >>>>> > at >>>>> > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>>>> > at java.lang.reflect.Method.invoke(Method.java:498) >>>>> > at >>>>> > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505) >>>>> > ... 6 more >>>>> > >>>>> > Any ideas? >>>>> > >>>>> > Bill >>>>> > >>>>> >> On Mar 18, 2016, at 2:47 PM, Raghu Angadi <[hidden email]> wrote: >>>>> >> >>>>> >> Thanks for trying it. >>>>> >> >>>>> >> I fixed the CheckStyle error (not sure why my build is not failing). >>>>> >> Let me know if you see any issues running with Beam. I haven't tried it. I >>>>> >> should. In fact Daniel Halperin says my patch should be against Beam.. >>>>> >> >>>>> >> Raghu. >>>>> >> >>>>> >> On Fri, Mar 18, 2016 at 11:22 AM, William McCarthy >>>>> >> <[hidden email]> wrote: >>>>> >> Thanks JB, Emanuele, Kostas & Raghu, especially Kostas and Raghu for >>>>> >> pointing me to working code. >>>>> >> >>>>> >> I’m in the middle of a hack day at the moment, so the speed of your >>>>> >> responses has been very welcome. >>>>> >> >>>>> >> In the first instance, I’ll try using your changes, Raghu. I’ve >>>>> >> cloned your repo, switched to the kafka branch and built both contrib/kafka >>>>> >> and contrib/examples/kafka. The contrib/kafka initially failed with a >>>>> >> CheckStyle error >>>>> >> (/Users/bill/dev/DataflowJavaSDK/contrib/kafka/src/main/java/com/google/cloud/dataflow/contrib/kafka/KafkaIO.java:683:12: >>>>> >> 'private' modifier out of order with the JLS suggestions)… I’ve fixed that >>>>> >> in my local clone and now it’s building fine. I hope to be able to run your >>>>> >> contrib unchanged on top of the incubator-beam codebase, which will be what >>>>> >> I attempt to do now. >>>>> >> >>>>> >> Thanks again to all, for your swift help. >>>>> >> >>>>> >> Bill >>>>> >> >>>>> >>> On Mar 18, 2016, at 12:55 PM, Raghu Angadi <[hidden email]> >>>>> >>> wrote: >>>>> >>> >>>>> >>> Hi Bill, >>>>> >>> >>>>> >>> We have fairly well tested patch for KafkaIO (pr #121). It will be >>>>> >>> merged soon. The example there keeps track of top hashtags in 10 minute >>>>> >>> sliding window and writes the results to another Kafka topic. Please try it >>>>> >>> if you can. It is well tested on Google Cloud Dataflow. I have not run it >>>>> >>> using Flink runner. >>>>> >>> >>>>> >>> Raghu. >>>>> >>> >>>>> >>> On Fri, Mar 18, 2016 at 9:46 AM, Kostas Kloudas >>>>> >>> <[hidden email]> wrote: >>>>> >>> Hello Bill, >>>>> >>> >>>>> >>> This is a known limitation of the Flink Runner. >>>>> >>> There is a JIRA issue for that >>>>> >>> https://issues.apache.org/jira/browse/BEAM-127 >>>>> >>> >>>>> >>> A wrapper for Flink sinks will come soon and as Beam evolves, >>>>> >>> a more Beam-y solution will come as well. >>>>> >>> >>>>> >>> Kostas >>>>> >>>> On Mar 18, 2016, at 5:23 PM, William McCarthy >>>>> >>>> <[hidden email]> wrote: >>>>> >>>> >>>>> >>>> Hi, >>>>> >>>> >>>>> >>>> I’m trying to write a proof-of-concept which takes messages from >>>>> >>>> Kafka, transforms them using Beam on Flink, then pushes the results onto a >>>>> >>>> different Kafka topic. >>>>> >>>> >>>>> >>>> I’ve used the KafkaWindowedWordCountExample as a starting point, >>>>> >>>> and that’s doing the first part of what I want to do, but it outputs to text >>>>> >>>> files as opposed to Kafka. FlinkKafkaProducer08 looks promising, but I can’t >>>>> >>>> figure out how to plug it into the pipeline. I was thinking that it would be >>>>> >>>> wrapped with an UnboundedFlinkSink, or some such, but that doesn’t seem to >>>>> >>>> exist. >>>>> >>>> >>>>> >>>> Any advice or thoughts on what I’m trying to do? >>>>> >>>> >>>>> >>>> I’m running the latest incubator-beam (as of last night from >>>>> >>>> Github), Flink 1.0.0 in cluster mode and Kafka 0.9.0.1, all on Google >>>>> >>>> Compute Engine (Debian Jessie). >>>>> >>>> >>>>> >>>> Thanks, >>>>> >>>> >>>>> >>>> Bill McCarthy >>>>> >>> >>>>> >>> >>>>> >> >>>>> >> >>>>> > >>>>> > >>>>> >>>> >>>> >>> >>> >> |
Sorry. Wrong mailing list...
On Mon, Mar 21, 2016 at 11:47 AM, Maximilian Michels <[hidden email]> wrote: > FYI: The Runner registration has been fixed. The Flink runner > explicitly registers as of [1]. Also, the SDK tries to look up the > PipelineRunner class in case it has not been registered [2]. > > [1] https://github.com/apache/incubator-beam/pull/40 > [2] https://github.com/apache/incubator-beam/pull/61 > > On Sat, Mar 19, 2016 at 6:43 PM, Maximilian Michels <[hidden email]> wrote: >> Great to see such a lively discussion here. >> >> I think we'll support sinks through the Write interface (like in >> batched execution) and also have a dedicated wrapper for the Flink >> sinks. This is a very pressing but easy to solve issue of the Flink >> runner. Expect it to be in next week. >> >> Also, the proper registration of the runner is about to to be merged. >> We just need an ok from the contributor to merge the changes. >> >> Best, >> Max >> >> On Sat, Mar 19, 2016 at 12:42 AM, Dan Halperin <[hidden email]> wrote: >>> Thanks Bill! >>> >>> Filed https://issues.apache.org/jira/browse/BEAM-136, but I'm glad it's not >>> blocking you! >>> >>> On Fri, Mar 18, 2016 at 4:04 PM, William McCarthy >>> <[hidden email]> wrote: >>>> >>>> I tried that, but still no dice: Just to be clear, it’s not a blocker for >>>> me, given that I have my example running, but for your information the >>>> exception is below. >>>> >>>> I’ll watch the commit log on the beam incubator and look forward to >>>> deleting my copy of Raghu’s contributions when they’re merger to master. >>>> >>>> Thanks again for everyone’s help, >>>> >>>> Bill >>>> >>>> >>>> Command followed by exception: >>>> >>>> $ flink run -c >>>> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample >>>> target/beam-1.0-SNAPSHOT.jar >>>> --runner=org.apache.beam.runners.flink.FlinkPipelineRunner >>>> --bootstrapServers=cl-pu4p:9092 --topics=test_in --outputTopic=test_out >>>> >>>> ------------------------------------------------------------ >>>> The program finished with the following exception: >>>> >>>> org.apache.flink.client.program.ProgramInvocationException: The main >>>> method caused an error. >>>> at >>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:520) >>>> at >>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403) >>>> at org.apache.flink.client.program.Client.runBlocking(Client.java:248) >>>> at >>>> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866) >>>> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333) >>>> at >>>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189) >>>> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239) >>>> Caused by: java.lang.IllegalArgumentException: Unknown 'runner' specified >>>> 'org.apache.beam.runners.flink.FlinkPipelineRunner', supported pipeline >>>> runners [BlockingDataflowPipelineRunner, DataflowPipelineRunner, >>>> DirectPipelineRunner] >>>> at >>>> com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Preconditions.checkArgument(Preconditions.java:146) >>>> at >>>> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.parseObjects(PipelineOptionsFactory.java:1445) >>>> at >>>> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.access$400(PipelineOptionsFactory.java:99) >>>> at >>>> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory$Builder.as(PipelineOptionsFactory.java:284) >>>> at >>>> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample.main(TopHashtagsExample.java:117) >>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>>> at >>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >>>> at >>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>>> at java.lang.reflect.Method.invoke(Method.java:498) >>>> at >>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505) >>>> ... 6 more >>>> >>>> On Mar 18, 2016, at 5:35 PM, Thomas Groh <[hidden email]> wrote: >>>> >>>> I don't believe the FlinkPipelineRunner is registered the same way the >>>> Dataflow & Direct Pipeline runners are registered; using >>>> org.apache.beam.runners.flink.FlinkPipelineRunner should work >>>> >>>> On Fri, Mar 18, 2016 at 2:10 PM, William McCarthy >>>> <[hidden email]> wrote: >>>>> >>>>> Thanks Dan, >>>>> >>>>> I tried that, but getting the below. Note that the jar contains the >>>>> FlinkPipelineRunner. >>>>> >>>>> >>>>> >>>>> % jar -tf target/beam-1.0-SNAPSHOT.jar | grep FlinkPipeline >>>>> org/apache/beam/runners/flink/FlinkPipelineRunner.class >>>>> org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.class >>>>> org/apache/beam/runners/flink/FlinkPipelineOptions.class >>>>> org/apache/beam/runners/flink/translation/FlinkPipelineTranslator.class >>>>> >>>>> % flink run -c >>>>> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample >>>>> target/beam-1.0-SNAPSHOT.jar --runner=FlinkPipelineRunner >>>>> --bootstrapServers=cl-pu4p:9092 --topics=test_in --outputTopic=test_out >>>>> >>>>> ------------------------------------------------------------ >>>>> The program finished with the following exception: >>>>> >>>>> org.apache.flink.client.program.ProgramInvocationException: The main >>>>> method caused an error. >>>>> at >>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:520) >>>>> at >>>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403) >>>>> at org.apache.flink.client.program.Client.runBlocking(Client.java:248) >>>>> at >>>>> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866) >>>>> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333) >>>>> at >>>>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189) >>>>> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239) >>>>> Caused by: java.lang.IllegalArgumentException: Unknown 'runner' specified >>>>> 'FlinkPipelineRunner', supported pipeline runners >>>>> [BlockingDataflowPipelineRunner, DataflowPipelineRunner, >>>>> DirectPipelineRunner] >>>>> at >>>>> com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Preconditions.checkArgument(Preconditions.java:146) >>>>> at >>>>> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.parseObjects(PipelineOptionsFactory.java:1445) >>>>> at >>>>> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.access$400(PipelineOptionsFactory.java:99) >>>>> at >>>>> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory$Builder.as(PipelineOptionsFactory.java:284) >>>>> at >>>>> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample.main(TopHashtagsExample.java:117) >>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>>>> at >>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >>>>> at >>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>>>> at java.lang.reflect.Method.invoke(Method.java:498) >>>>> at >>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505) >>>>> ... 6 more >>>>> >>>>> >>>>> >>>>> On Mar 18, 2016, at 5:00 PM, Dan Halperin <[hidden email]> wrote: >>>>> >>>>> Thanks for catching that, Aljoscha! >>>>> >>>>> Note that the Flink runner should be available via a command-line option >>>>> as well: --runner=FlinkPipelineRunner. >>>>> >>>>> The list of valid values for that flag is computed by walking the >>>>> classpath at runtime, so as long as the Flink jar is present it'll work. >>>>> >>>>> On Fri, Mar 18, 2016 at 1:21 PM, Aljoscha Krettek <[hidden email]> >>>>> wrote: >>>>>> >>>>>> Hi, >>>>>> looks like the example is being executed with the DirectPipelineRunner >>>>>> which does not seem to be able to cope with UnboundedSource. You need to set >>>>>> the runner to the FlinkRunner in the example code as described here: >>>>>> https://github.com/apache/incubator-beam/tree/master/runners/flink#executing-an-example >>>>>> >>>>>> The Flink runner should be able to deal with UnboundedSource but has the >>>>>> limitation that sources are always parallelism=1 (this is being worked on, >>>>>> however). >>>>>> >>>>>> Cheers, >>>>>> Aljoscha >>>>>> > On 18 Mar 2016, at 20:56, Dan Halperin <[hidden email]> wrote: >>>>>> > >>>>>> > Looks like the Flink runner may not yet support arbitrary code written >>>>>> > with the UnboundedSource API. That is, it looks like the Flink runner >>>>>> > expects the sources to get translated away. >>>>>> > >>>>>> > Max? >>>>>> > >>>>>> > Dan >>>>>> > >>>>>> > On Fri, Mar 18, 2016 at 12:20 PM, William McCarthy >>>>>> > <[hidden email]> wrote: >>>>>> > Thanks Raghu, >>>>>> > >>>>>> > When I try to run it on flink using the incubator-beam code, i.e. >>>>>> > >>>>>> > flink run -c >>>>>> > com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample >>>>>> > target/beam-1.0-SNAPSHOT.jar --bootstrapServers=cl-pu4p:9092 >>>>>> > --topics=test_in --outputTopic=test_out >>>>>> > >>>>>> > I get this: >>>>>> > >>>>>> > org.apache.flink.client.program.ProgramInvocationException: The main >>>>>> > method caused an error. >>>>>> > at >>>>>> > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:520) >>>>>> > at >>>>>> > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403) >>>>>> > at >>>>>> > org.apache.flink.client.program.Client.runBlocking(Client.java:248) >>>>>> > at >>>>>> > org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866) >>>>>> > at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333) >>>>>> > at >>>>>> > org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189) >>>>>> > at >>>>>> > org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239) >>>>>> > Caused by: java.lang.IllegalStateException: no evaluator registered >>>>>> > for Read(UnboundedKafkaSource) >>>>>> > at >>>>>> > com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner$Evaluator.visitTransform(DirectPipelineRunner.java:852) >>>>>> > at >>>>>> > com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:219) >>>>>> > at >>>>>> > com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:215) >>>>>> > at >>>>>> > com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:215) >>>>>> > at >>>>>> > com.google.cloud.dataflow.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:102) >>>>>> > at >>>>>> > com.google.cloud.dataflow.sdk.Pipeline.traverseTopologically(Pipeline.java:259) >>>>>> > at >>>>>> > com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner$Evaluator.run(DirectPipelineRunner.java:814) >>>>>> > at >>>>>> > com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:526) >>>>>> > at >>>>>> > com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:96) >>>>>> > at com.google.cloud.dataflow.sdk.Pipeline.run(Pipeline.java:180) >>>>>> > at >>>>>> > com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample.main(TopHashtagsExample.java:140) >>>>>> > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>>>>> > at >>>>>> > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >>>>>> > at >>>>>> > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>>>>> > at java.lang.reflect.Method.invoke(Method.java:498) >>>>>> > at >>>>>> > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505) >>>>>> > ... 6 more >>>>>> > >>>>>> > Any ideas? >>>>>> > >>>>>> > Bill >>>>>> > >>>>>> >> On Mar 18, 2016, at 2:47 PM, Raghu Angadi <[hidden email]> wrote: >>>>>> >> >>>>>> >> Thanks for trying it. >>>>>> >> >>>>>> >> I fixed the CheckStyle error (not sure why my build is not failing). >>>>>> >> Let me know if you see any issues running with Beam. I haven't tried it. I >>>>>> >> should. In fact Daniel Halperin says my patch should be against Beam.. >>>>>> >> >>>>>> >> Raghu. >>>>>> >> >>>>>> >> On Fri, Mar 18, 2016 at 11:22 AM, William McCarthy >>>>>> >> <[hidden email]> wrote: >>>>>> >> Thanks JB, Emanuele, Kostas & Raghu, especially Kostas and Raghu for >>>>>> >> pointing me to working code. >>>>>> >> >>>>>> >> I’m in the middle of a hack day at the moment, so the speed of your >>>>>> >> responses has been very welcome. >>>>>> >> >>>>>> >> In the first instance, I’ll try using your changes, Raghu. I’ve >>>>>> >> cloned your repo, switched to the kafka branch and built both contrib/kafka >>>>>> >> and contrib/examples/kafka. The contrib/kafka initially failed with a >>>>>> >> CheckStyle error >>>>>> >> (/Users/bill/dev/DataflowJavaSDK/contrib/kafka/src/main/java/com/google/cloud/dataflow/contrib/kafka/KafkaIO.java:683:12: >>>>>> >> 'private' modifier out of order with the JLS suggestions)… I’ve fixed that >>>>>> >> in my local clone and now it’s building fine. I hope to be able to run your >>>>>> >> contrib unchanged on top of the incubator-beam codebase, which will be what >>>>>> >> I attempt to do now. >>>>>> >> >>>>>> >> Thanks again to all, for your swift help. >>>>>> >> >>>>>> >> Bill >>>>>> >> >>>>>> >>> On Mar 18, 2016, at 12:55 PM, Raghu Angadi <[hidden email]> >>>>>> >>> wrote: >>>>>> >>> >>>>>> >>> Hi Bill, >>>>>> >>> >>>>>> >>> We have fairly well tested patch for KafkaIO (pr #121). It will be >>>>>> >>> merged soon. The example there keeps track of top hashtags in 10 minute >>>>>> >>> sliding window and writes the results to another Kafka topic. Please try it >>>>>> >>> if you can. It is well tested on Google Cloud Dataflow. I have not run it >>>>>> >>> using Flink runner. >>>>>> >>> >>>>>> >>> Raghu. >>>>>> >>> >>>>>> >>> On Fri, Mar 18, 2016 at 9:46 AM, Kostas Kloudas >>>>>> >>> <[hidden email]> wrote: >>>>>> >>> Hello Bill, >>>>>> >>> >>>>>> >>> This is a known limitation of the Flink Runner. >>>>>> >>> There is a JIRA issue for that >>>>>> >>> https://issues.apache.org/jira/browse/BEAM-127 >>>>>> >>> >>>>>> >>> A wrapper for Flink sinks will come soon and as Beam evolves, >>>>>> >>> a more Beam-y solution will come as well. >>>>>> >>> >>>>>> >>> Kostas >>>>>> >>>> On Mar 18, 2016, at 5:23 PM, William McCarthy >>>>>> >>>> <[hidden email]> wrote: >>>>>> >>>> >>>>>> >>>> Hi, >>>>>> >>>> >>>>>> >>>> I’m trying to write a proof-of-concept which takes messages from >>>>>> >>>> Kafka, transforms them using Beam on Flink, then pushes the results onto a >>>>>> >>>> different Kafka topic. >>>>>> >>>> >>>>>> >>>> I’ve used the KafkaWindowedWordCountExample as a starting point, >>>>>> >>>> and that’s doing the first part of what I want to do, but it outputs to text >>>>>> >>>> files as opposed to Kafka. FlinkKafkaProducer08 looks promising, but I can’t >>>>>> >>>> figure out how to plug it into the pipeline. I was thinking that it would be >>>>>> >>>> wrapped with an UnboundedFlinkSink, or some such, but that doesn’t seem to >>>>>> >>>> exist. >>>>>> >>>> >>>>>> >>>> Any advice or thoughts on what I’m trying to do? >>>>>> >>>> >>>>>> >>>> I’m running the latest incubator-beam (as of last night from >>>>>> >>>> Github), Flink 1.0.0 in cluster mode and Kafka 0.9.0.1, all on Google >>>>>> >>>> Compute Engine (Debian Jessie). >>>>>> >>>> >>>>>> >>>> Thanks, >>>>>> >>>> >>>>>> >>>> Bill McCarthy >>>>>> >>> >>>>>> >>> >>>>>> >> >>>>>> >> >>>>>> > >>>>>> > >>>>>> >>>>> >>>>> >>>> >>>> >>> |
Free forum by Nabble | Edit this page |