Re: Output from Beam (on Flink) to Kafka

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Re: Output from Beam (on Flink) to Kafka

Maximilian Michels
FYI: The Runner registration has been fixed. The Flink runner
explicitly registers as of [1]. Also, the SDK tries to look up the
PipelineRunner class in case it has not been registered [2].

[1] https://github.com/apache/incubator-beam/pull/40
[2] https://github.com/apache/incubator-beam/pull/61

On Sat, Mar 19, 2016 at 6:43 PM, Maximilian Michels <[hidden email]> wrote:

> Great to see such a lively discussion here.
>
> I think we'll support sinks through the Write interface (like in
> batched execution) and also have a dedicated wrapper for the Flink
> sinks. This is a very pressing but easy to solve issue of the Flink
> runner. Expect it to be in next week.
>
> Also, the proper registration of the runner is about to to be merged.
> We just need an ok from the contributor to merge the changes.
>
> Best,
> Max
>
> On Sat, Mar 19, 2016 at 12:42 AM, Dan Halperin <[hidden email]> wrote:
>> Thanks Bill!
>>
>> Filed https://issues.apache.org/jira/browse/BEAM-136, but I'm glad it's not
>> blocking you!
>>
>> On Fri, Mar 18, 2016 at 4:04 PM, William McCarthy
>> <[hidden email]> wrote:
>>>
>>> I tried that, but still no dice: Just to be clear, it’s not a blocker for
>>> me, given that I have my example running, but for your information the
>>> exception is below.
>>>
>>> I’ll watch the commit log on the beam incubator and look forward to
>>> deleting my copy of Raghu’s contributions when they’re merger to master.
>>>
>>> Thanks again for everyone’s help,
>>>
>>> Bill
>>>
>>>
>>> Command followed by exception:
>>>
>>> $ flink run -c
>>> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample
>>> target/beam-1.0-SNAPSHOT.jar
>>> --runner=org.apache.beam.runners.flink.FlinkPipelineRunner
>>> --bootstrapServers=cl-pu4p:9092 --topics=test_in --outputTopic=test_out
>>>
>>> ------------------------------------------------------------
>>>  The program finished with the following exception:
>>>
>>> org.apache.flink.client.program.ProgramInvocationException: The main
>>> method caused an error.
>>> at
>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:520)
>>> at
>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
>>> at org.apache.flink.client.program.Client.runBlocking(Client.java:248)
>>> at
>>> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
>>> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
>>> at
>>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189)
>>> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239)
>>> Caused by: java.lang.IllegalArgumentException: Unknown 'runner' specified
>>> 'org.apache.beam.runners.flink.FlinkPipelineRunner', supported pipeline
>>> runners [BlockingDataflowPipelineRunner, DataflowPipelineRunner,
>>> DirectPipelineRunner]
>>> at
>>> com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Preconditions.checkArgument(Preconditions.java:146)
>>> at
>>> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.parseObjects(PipelineOptionsFactory.java:1445)
>>> at
>>> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.access$400(PipelineOptionsFactory.java:99)
>>> at
>>> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory$Builder.as(PipelineOptionsFactory.java:284)
>>> at
>>> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample.main(TopHashtagsExample.java:117)
>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>> at
>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>> at
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>> at java.lang.reflect.Method.invoke(Method.java:498)
>>> at
>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
>>> ... 6 more
>>>
>>> On Mar 18, 2016, at 5:35 PM, Thomas Groh <[hidden email]> wrote:
>>>
>>> I don't believe the FlinkPipelineRunner is registered the same way the
>>> Dataflow & Direct Pipeline runners are registered; using
>>> org.apache.beam.runners.flink.FlinkPipelineRunner should work
>>>
>>> On Fri, Mar 18, 2016 at 2:10 PM, William McCarthy
>>> <[hidden email]> wrote:
>>>>
>>>> Thanks Dan,
>>>>
>>>> I tried that, but getting the below. Note that the jar contains the
>>>> FlinkPipelineRunner.
>>>>
>>>>
>>>>
>>>> % jar -tf target/beam-1.0-SNAPSHOT.jar | grep FlinkPipeline
>>>> org/apache/beam/runners/flink/FlinkPipelineRunner.class
>>>> org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.class
>>>> org/apache/beam/runners/flink/FlinkPipelineOptions.class
>>>> org/apache/beam/runners/flink/translation/FlinkPipelineTranslator.class
>>>>
>>>> % flink run -c
>>>> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample
>>>> target/beam-1.0-SNAPSHOT.jar --runner=FlinkPipelineRunner
>>>> --bootstrapServers=cl-pu4p:9092 --topics=test_in --outputTopic=test_out
>>>>
>>>> ------------------------------------------------------------
>>>>  The program finished with the following exception:
>>>>
>>>> org.apache.flink.client.program.ProgramInvocationException: The main
>>>> method caused an error.
>>>> at
>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:520)
>>>> at
>>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
>>>> at org.apache.flink.client.program.Client.runBlocking(Client.java:248)
>>>> at
>>>> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
>>>> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
>>>> at
>>>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189)
>>>> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239)
>>>> Caused by: java.lang.IllegalArgumentException: Unknown 'runner' specified
>>>> 'FlinkPipelineRunner', supported pipeline runners
>>>> [BlockingDataflowPipelineRunner, DataflowPipelineRunner,
>>>> DirectPipelineRunner]
>>>> at
>>>> com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Preconditions.checkArgument(Preconditions.java:146)
>>>> at
>>>> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.parseObjects(PipelineOptionsFactory.java:1445)
>>>> at
>>>> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.access$400(PipelineOptionsFactory.java:99)
>>>> at
>>>> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory$Builder.as(PipelineOptionsFactory.java:284)
>>>> at
>>>> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample.main(TopHashtagsExample.java:117)
>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>> at
>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>> at
>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>> at java.lang.reflect.Method.invoke(Method.java:498)
>>>> at
>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
>>>> ... 6 more
>>>>
>>>>
>>>>
>>>> On Mar 18, 2016, at 5:00 PM, Dan Halperin <[hidden email]> wrote:
>>>>
>>>> Thanks for catching that, Aljoscha!
>>>>
>>>> Note that the Flink runner should be available via a command-line option
>>>> as well: --runner=FlinkPipelineRunner.
>>>>
>>>> The list of valid values for that flag is computed by walking the
>>>> classpath at runtime, so as long as the Flink jar is present it'll work.
>>>>
>>>> On Fri, Mar 18, 2016 at 1:21 PM, Aljoscha Krettek <[hidden email]>
>>>> wrote:
>>>>>
>>>>> Hi,
>>>>> looks like the example is being executed with the DirectPipelineRunner
>>>>> which does not seem to be able to cope with UnboundedSource. You need to set
>>>>> the runner to the FlinkRunner in the example code as described here:
>>>>> https://github.com/apache/incubator-beam/tree/master/runners/flink#executing-an-example
>>>>>
>>>>> The Flink runner should be able to deal with UnboundedSource but has the
>>>>> limitation that sources are always parallelism=1 (this is being worked on,
>>>>> however).
>>>>>
>>>>> Cheers,
>>>>> Aljoscha
>>>>> > On 18 Mar 2016, at 20:56, Dan Halperin <[hidden email]> wrote:
>>>>> >
>>>>> > Looks like the Flink runner may not yet support arbitrary code written
>>>>> > with the UnboundedSource API. That is, it looks like the Flink runner
>>>>> > expects the sources to get translated away.
>>>>> >
>>>>> > Max?
>>>>> >
>>>>> > Dan
>>>>> >
>>>>> > On Fri, Mar 18, 2016 at 12:20 PM, William McCarthy
>>>>> > <[hidden email]> wrote:
>>>>> > Thanks Raghu,
>>>>> >
>>>>> > When I try to run it on flink using the incubator-beam code, i.e.
>>>>> >
>>>>> > flink run -c
>>>>> > com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample
>>>>> > target/beam-1.0-SNAPSHOT.jar --bootstrapServers=cl-pu4p:9092
>>>>> > --topics=test_in --outputTopic=test_out
>>>>> >
>>>>> > I get this:
>>>>> >
>>>>> > org.apache.flink.client.program.ProgramInvocationException: The main
>>>>> > method caused an error.
>>>>> >       at
>>>>> > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:520)
>>>>> >       at
>>>>> > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
>>>>> >       at
>>>>> > org.apache.flink.client.program.Client.runBlocking(Client.java:248)
>>>>> >       at
>>>>> > org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
>>>>> >       at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
>>>>> >       at
>>>>> > org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189)
>>>>> >       at
>>>>> > org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239)
>>>>> > Caused by: java.lang.IllegalStateException: no evaluator registered
>>>>> > for Read(UnboundedKafkaSource)
>>>>> >       at
>>>>> > com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner$Evaluator.visitTransform(DirectPipelineRunner.java:852)
>>>>> >       at
>>>>> > com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:219)
>>>>> >       at
>>>>> > com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:215)
>>>>> >       at
>>>>> > com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:215)
>>>>> >       at
>>>>> > com.google.cloud.dataflow.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:102)
>>>>> >       at
>>>>> > com.google.cloud.dataflow.sdk.Pipeline.traverseTopologically(Pipeline.java:259)
>>>>> >       at
>>>>> > com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner$Evaluator.run(DirectPipelineRunner.java:814)
>>>>> >       at
>>>>> > com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:526)
>>>>> >       at
>>>>> > com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:96)
>>>>> >       at com.google.cloud.dataflow.sdk.Pipeline.run(Pipeline.java:180)
>>>>> >       at
>>>>> > com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample.main(TopHashtagsExample.java:140)
>>>>> >       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>> >       at
>>>>> > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>>> >       at
>>>>> > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>> >       at java.lang.reflect.Method.invoke(Method.java:498)
>>>>> >       at
>>>>> > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
>>>>> >       ... 6 more
>>>>> >
>>>>> > Any ideas?
>>>>> >
>>>>> > Bill
>>>>> >
>>>>> >> On Mar 18, 2016, at 2:47 PM, Raghu Angadi <[hidden email]> wrote:
>>>>> >>
>>>>> >> Thanks for trying it.
>>>>> >>
>>>>> >> I fixed the CheckStyle error  (not sure why my build is not failing).
>>>>> >> Let me know if you see any issues running with Beam. I haven't tried it. I
>>>>> >> should. In fact Daniel Halperin says my patch should be against Beam..
>>>>> >>
>>>>> >> Raghu.
>>>>> >>
>>>>> >> On Fri, Mar 18, 2016 at 11:22 AM, William McCarthy
>>>>> >> <[hidden email]> wrote:
>>>>> >> Thanks JB, Emanuele, Kostas & Raghu, especially Kostas and Raghu for
>>>>> >> pointing me to working code.
>>>>> >>
>>>>> >> I’m in the middle of a hack day at the moment, so the speed of your
>>>>> >> responses has been very welcome.
>>>>> >>
>>>>> >> In the first instance, I’ll try using your changes, Raghu. I’ve
>>>>> >> cloned your repo, switched to the kafka branch and built both contrib/kafka
>>>>> >> and contrib/examples/kafka. The contrib/kafka initially failed with a
>>>>> >> CheckStyle error
>>>>> >> (/Users/bill/dev/DataflowJavaSDK/contrib/kafka/src/main/java/com/google/cloud/dataflow/contrib/kafka/KafkaIO.java:683:12:
>>>>> >> 'private' modifier out of order with the JLS suggestions)… I’ve fixed that
>>>>> >> in my local clone and now it’s building fine. I hope to be able to run your
>>>>> >> contrib unchanged on top of the incubator-beam codebase, which will be what
>>>>> >> I attempt to do now.
>>>>> >>
>>>>> >> Thanks again to all, for your swift help.
>>>>> >>
>>>>> >> Bill
>>>>> >>
>>>>> >>> On Mar 18, 2016, at 12:55 PM, Raghu Angadi <[hidden email]>
>>>>> >>> wrote:
>>>>> >>>
>>>>> >>> Hi Bill,
>>>>> >>>
>>>>> >>> We have fairly well tested patch for KafkaIO (pr #121). It will be
>>>>> >>> merged soon. The example there keeps track of top hashtags in 10 minute
>>>>> >>> sliding window and writes the results to another Kafka topic. Please try it
>>>>> >>> if you can. It is well tested on Google Cloud Dataflow. I have not run it
>>>>> >>> using Flink runner.
>>>>> >>>
>>>>> >>> Raghu.
>>>>> >>>
>>>>> >>> On Fri, Mar 18, 2016 at 9:46 AM, Kostas Kloudas
>>>>> >>> <[hidden email]> wrote:
>>>>> >>> Hello Bill,
>>>>> >>>
>>>>> >>> This is a known limitation of the Flink Runner.
>>>>> >>> There is a JIRA issue for that
>>>>> >>> https://issues.apache.org/jira/browse/BEAM-127
>>>>> >>>
>>>>> >>> A wrapper for Flink sinks will come soon and as Beam evolves,
>>>>> >>> a more Beam-y solution will come as well.
>>>>> >>>
>>>>> >>> Kostas
>>>>> >>>> On Mar 18, 2016, at 5:23 PM, William McCarthy
>>>>> >>>> <[hidden email]> wrote:
>>>>> >>>>
>>>>> >>>> Hi,
>>>>> >>>>
>>>>> >>>> I’m trying to write a proof-of-concept which takes messages from
>>>>> >>>> Kafka, transforms them using Beam on Flink, then pushes the results onto a
>>>>> >>>> different Kafka topic.
>>>>> >>>>
>>>>> >>>> I’ve used the KafkaWindowedWordCountExample as a starting point,
>>>>> >>>> and that’s doing the first part of what I want to do, but it outputs to text
>>>>> >>>> files as opposed to Kafka. FlinkKafkaProducer08 looks promising, but I can’t
>>>>> >>>> figure out how to plug it into the pipeline. I was thinking that it would be
>>>>> >>>> wrapped with an UnboundedFlinkSink, or some such, but that doesn’t seem to
>>>>> >>>> exist.
>>>>> >>>>
>>>>> >>>> Any advice or thoughts on what I’m trying to do?
>>>>> >>>>
>>>>> >>>> I’m running the latest incubator-beam (as of last night from
>>>>> >>>> Github), Flink 1.0.0 in cluster mode and Kafka 0.9.0.1, all on Google
>>>>> >>>> Compute Engine (Debian Jessie).
>>>>> >>>>
>>>>> >>>> Thanks,
>>>>> >>>>
>>>>> >>>> Bill McCarthy
>>>>> >>>
>>>>> >>>
>>>>> >>
>>>>> >>
>>>>> >
>>>>> >
>>>>>
>>>>
>>>>
>>>
>>>
>>
Reply | Threaded
Open this post in threaded view
|

Re: Output from Beam (on Flink) to Kafka

Maximilian Michels
Sorry. Wrong mailing list...

On Mon, Mar 21, 2016 at 11:47 AM, Maximilian Michels <[hidden email]> wrote:

> FYI: The Runner registration has been fixed. The Flink runner
> explicitly registers as of [1]. Also, the SDK tries to look up the
> PipelineRunner class in case it has not been registered [2].
>
> [1] https://github.com/apache/incubator-beam/pull/40
> [2] https://github.com/apache/incubator-beam/pull/61
>
> On Sat, Mar 19, 2016 at 6:43 PM, Maximilian Michels <[hidden email]> wrote:
>> Great to see such a lively discussion here.
>>
>> I think we'll support sinks through the Write interface (like in
>> batched execution) and also have a dedicated wrapper for the Flink
>> sinks. This is a very pressing but easy to solve issue of the Flink
>> runner. Expect it to be in next week.
>>
>> Also, the proper registration of the runner is about to to be merged.
>> We just need an ok from the contributor to merge the changes.
>>
>> Best,
>> Max
>>
>> On Sat, Mar 19, 2016 at 12:42 AM, Dan Halperin <[hidden email]> wrote:
>>> Thanks Bill!
>>>
>>> Filed https://issues.apache.org/jira/browse/BEAM-136, but I'm glad it's not
>>> blocking you!
>>>
>>> On Fri, Mar 18, 2016 at 4:04 PM, William McCarthy
>>> <[hidden email]> wrote:
>>>>
>>>> I tried that, but still no dice: Just to be clear, it’s not a blocker for
>>>> me, given that I have my example running, but for your information the
>>>> exception is below.
>>>>
>>>> I’ll watch the commit log on the beam incubator and look forward to
>>>> deleting my copy of Raghu’s contributions when they’re merger to master.
>>>>
>>>> Thanks again for everyone’s help,
>>>>
>>>> Bill
>>>>
>>>>
>>>> Command followed by exception:
>>>>
>>>> $ flink run -c
>>>> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample
>>>> target/beam-1.0-SNAPSHOT.jar
>>>> --runner=org.apache.beam.runners.flink.FlinkPipelineRunner
>>>> --bootstrapServers=cl-pu4p:9092 --topics=test_in --outputTopic=test_out
>>>>
>>>> ------------------------------------------------------------
>>>>  The program finished with the following exception:
>>>>
>>>> org.apache.flink.client.program.ProgramInvocationException: The main
>>>> method caused an error.
>>>> at
>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:520)
>>>> at
>>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
>>>> at org.apache.flink.client.program.Client.runBlocking(Client.java:248)
>>>> at
>>>> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
>>>> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
>>>> at
>>>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189)
>>>> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239)
>>>> Caused by: java.lang.IllegalArgumentException: Unknown 'runner' specified
>>>> 'org.apache.beam.runners.flink.FlinkPipelineRunner', supported pipeline
>>>> runners [BlockingDataflowPipelineRunner, DataflowPipelineRunner,
>>>> DirectPipelineRunner]
>>>> at
>>>> com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Preconditions.checkArgument(Preconditions.java:146)
>>>> at
>>>> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.parseObjects(PipelineOptionsFactory.java:1445)
>>>> at
>>>> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.access$400(PipelineOptionsFactory.java:99)
>>>> at
>>>> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory$Builder.as(PipelineOptionsFactory.java:284)
>>>> at
>>>> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample.main(TopHashtagsExample.java:117)
>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>> at
>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>> at
>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>> at java.lang.reflect.Method.invoke(Method.java:498)
>>>> at
>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
>>>> ... 6 more
>>>>
>>>> On Mar 18, 2016, at 5:35 PM, Thomas Groh <[hidden email]> wrote:
>>>>
>>>> I don't believe the FlinkPipelineRunner is registered the same way the
>>>> Dataflow & Direct Pipeline runners are registered; using
>>>> org.apache.beam.runners.flink.FlinkPipelineRunner should work
>>>>
>>>> On Fri, Mar 18, 2016 at 2:10 PM, William McCarthy
>>>> <[hidden email]> wrote:
>>>>>
>>>>> Thanks Dan,
>>>>>
>>>>> I tried that, but getting the below. Note that the jar contains the
>>>>> FlinkPipelineRunner.
>>>>>
>>>>>
>>>>>
>>>>> % jar -tf target/beam-1.0-SNAPSHOT.jar | grep FlinkPipeline
>>>>> org/apache/beam/runners/flink/FlinkPipelineRunner.class
>>>>> org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.class
>>>>> org/apache/beam/runners/flink/FlinkPipelineOptions.class
>>>>> org/apache/beam/runners/flink/translation/FlinkPipelineTranslator.class
>>>>>
>>>>> % flink run -c
>>>>> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample
>>>>> target/beam-1.0-SNAPSHOT.jar --runner=FlinkPipelineRunner
>>>>> --bootstrapServers=cl-pu4p:9092 --topics=test_in --outputTopic=test_out
>>>>>
>>>>> ------------------------------------------------------------
>>>>>  The program finished with the following exception:
>>>>>
>>>>> org.apache.flink.client.program.ProgramInvocationException: The main
>>>>> method caused an error.
>>>>> at
>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:520)
>>>>> at
>>>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
>>>>> at org.apache.flink.client.program.Client.runBlocking(Client.java:248)
>>>>> at
>>>>> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
>>>>> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
>>>>> at
>>>>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189)
>>>>> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239)
>>>>> Caused by: java.lang.IllegalArgumentException: Unknown 'runner' specified
>>>>> 'FlinkPipelineRunner', supported pipeline runners
>>>>> [BlockingDataflowPipelineRunner, DataflowPipelineRunner,
>>>>> DirectPipelineRunner]
>>>>> at
>>>>> com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Preconditions.checkArgument(Preconditions.java:146)
>>>>> at
>>>>> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.parseObjects(PipelineOptionsFactory.java:1445)
>>>>> at
>>>>> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.access$400(PipelineOptionsFactory.java:99)
>>>>> at
>>>>> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory$Builder.as(PipelineOptionsFactory.java:284)
>>>>> at
>>>>> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample.main(TopHashtagsExample.java:117)
>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>> at
>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>>> at
>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>> at java.lang.reflect.Method.invoke(Method.java:498)
>>>>> at
>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
>>>>> ... 6 more
>>>>>
>>>>>
>>>>>
>>>>> On Mar 18, 2016, at 5:00 PM, Dan Halperin <[hidden email]> wrote:
>>>>>
>>>>> Thanks for catching that, Aljoscha!
>>>>>
>>>>> Note that the Flink runner should be available via a command-line option
>>>>> as well: --runner=FlinkPipelineRunner.
>>>>>
>>>>> The list of valid values for that flag is computed by walking the
>>>>> classpath at runtime, so as long as the Flink jar is present it'll work.
>>>>>
>>>>> On Fri, Mar 18, 2016 at 1:21 PM, Aljoscha Krettek <[hidden email]>
>>>>> wrote:
>>>>>>
>>>>>> Hi,
>>>>>> looks like the example is being executed with the DirectPipelineRunner
>>>>>> which does not seem to be able to cope with UnboundedSource. You need to set
>>>>>> the runner to the FlinkRunner in the example code as described here:
>>>>>> https://github.com/apache/incubator-beam/tree/master/runners/flink#executing-an-example
>>>>>>
>>>>>> The Flink runner should be able to deal with UnboundedSource but has the
>>>>>> limitation that sources are always parallelism=1 (this is being worked on,
>>>>>> however).
>>>>>>
>>>>>> Cheers,
>>>>>> Aljoscha
>>>>>> > On 18 Mar 2016, at 20:56, Dan Halperin <[hidden email]> wrote:
>>>>>> >
>>>>>> > Looks like the Flink runner may not yet support arbitrary code written
>>>>>> > with the UnboundedSource API. That is, it looks like the Flink runner
>>>>>> > expects the sources to get translated away.
>>>>>> >
>>>>>> > Max?
>>>>>> >
>>>>>> > Dan
>>>>>> >
>>>>>> > On Fri, Mar 18, 2016 at 12:20 PM, William McCarthy
>>>>>> > <[hidden email]> wrote:
>>>>>> > Thanks Raghu,
>>>>>> >
>>>>>> > When I try to run it on flink using the incubator-beam code, i.e.
>>>>>> >
>>>>>> > flink run -c
>>>>>> > com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample
>>>>>> > target/beam-1.0-SNAPSHOT.jar --bootstrapServers=cl-pu4p:9092
>>>>>> > --topics=test_in --outputTopic=test_out
>>>>>> >
>>>>>> > I get this:
>>>>>> >
>>>>>> > org.apache.flink.client.program.ProgramInvocationException: The main
>>>>>> > method caused an error.
>>>>>> >       at
>>>>>> > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:520)
>>>>>> >       at
>>>>>> > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
>>>>>> >       at
>>>>>> > org.apache.flink.client.program.Client.runBlocking(Client.java:248)
>>>>>> >       at
>>>>>> > org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
>>>>>> >       at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
>>>>>> >       at
>>>>>> > org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189)
>>>>>> >       at
>>>>>> > org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239)
>>>>>> > Caused by: java.lang.IllegalStateException: no evaluator registered
>>>>>> > for Read(UnboundedKafkaSource)
>>>>>> >       at
>>>>>> > com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner$Evaluator.visitTransform(DirectPipelineRunner.java:852)
>>>>>> >       at
>>>>>> > com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:219)
>>>>>> >       at
>>>>>> > com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:215)
>>>>>> >       at
>>>>>> > com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:215)
>>>>>> >       at
>>>>>> > com.google.cloud.dataflow.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:102)
>>>>>> >       at
>>>>>> > com.google.cloud.dataflow.sdk.Pipeline.traverseTopologically(Pipeline.java:259)
>>>>>> >       at
>>>>>> > com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner$Evaluator.run(DirectPipelineRunner.java:814)
>>>>>> >       at
>>>>>> > com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:526)
>>>>>> >       at
>>>>>> > com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:96)
>>>>>> >       at com.google.cloud.dataflow.sdk.Pipeline.run(Pipeline.java:180)
>>>>>> >       at
>>>>>> > com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample.main(TopHashtagsExample.java:140)
>>>>>> >       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>>> >       at
>>>>>> > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>>>> >       at
>>>>>> > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>>> >       at java.lang.reflect.Method.invoke(Method.java:498)
>>>>>> >       at
>>>>>> > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
>>>>>> >       ... 6 more
>>>>>> >
>>>>>> > Any ideas?
>>>>>> >
>>>>>> > Bill
>>>>>> >
>>>>>> >> On Mar 18, 2016, at 2:47 PM, Raghu Angadi <[hidden email]> wrote:
>>>>>> >>
>>>>>> >> Thanks for trying it.
>>>>>> >>
>>>>>> >> I fixed the CheckStyle error  (not sure why my build is not failing).
>>>>>> >> Let me know if you see any issues running with Beam. I haven't tried it. I
>>>>>> >> should. In fact Daniel Halperin says my patch should be against Beam..
>>>>>> >>
>>>>>> >> Raghu.
>>>>>> >>
>>>>>> >> On Fri, Mar 18, 2016 at 11:22 AM, William McCarthy
>>>>>> >> <[hidden email]> wrote:
>>>>>> >> Thanks JB, Emanuele, Kostas & Raghu, especially Kostas and Raghu for
>>>>>> >> pointing me to working code.
>>>>>> >>
>>>>>> >> I’m in the middle of a hack day at the moment, so the speed of your
>>>>>> >> responses has been very welcome.
>>>>>> >>
>>>>>> >> In the first instance, I’ll try using your changes, Raghu. I’ve
>>>>>> >> cloned your repo, switched to the kafka branch and built both contrib/kafka
>>>>>> >> and contrib/examples/kafka. The contrib/kafka initially failed with a
>>>>>> >> CheckStyle error
>>>>>> >> (/Users/bill/dev/DataflowJavaSDK/contrib/kafka/src/main/java/com/google/cloud/dataflow/contrib/kafka/KafkaIO.java:683:12:
>>>>>> >> 'private' modifier out of order with the JLS suggestions)… I’ve fixed that
>>>>>> >> in my local clone and now it’s building fine. I hope to be able to run your
>>>>>> >> contrib unchanged on top of the incubator-beam codebase, which will be what
>>>>>> >> I attempt to do now.
>>>>>> >>
>>>>>> >> Thanks again to all, for your swift help.
>>>>>> >>
>>>>>> >> Bill
>>>>>> >>
>>>>>> >>> On Mar 18, 2016, at 12:55 PM, Raghu Angadi <[hidden email]>
>>>>>> >>> wrote:
>>>>>> >>>
>>>>>> >>> Hi Bill,
>>>>>> >>>
>>>>>> >>> We have fairly well tested patch for KafkaIO (pr #121). It will be
>>>>>> >>> merged soon. The example there keeps track of top hashtags in 10 minute
>>>>>> >>> sliding window and writes the results to another Kafka topic. Please try it
>>>>>> >>> if you can. It is well tested on Google Cloud Dataflow. I have not run it
>>>>>> >>> using Flink runner.
>>>>>> >>>
>>>>>> >>> Raghu.
>>>>>> >>>
>>>>>> >>> On Fri, Mar 18, 2016 at 9:46 AM, Kostas Kloudas
>>>>>> >>> <[hidden email]> wrote:
>>>>>> >>> Hello Bill,
>>>>>> >>>
>>>>>> >>> This is a known limitation of the Flink Runner.
>>>>>> >>> There is a JIRA issue for that
>>>>>> >>> https://issues.apache.org/jira/browse/BEAM-127
>>>>>> >>>
>>>>>> >>> A wrapper for Flink sinks will come soon and as Beam evolves,
>>>>>> >>> a more Beam-y solution will come as well.
>>>>>> >>>
>>>>>> >>> Kostas
>>>>>> >>>> On Mar 18, 2016, at 5:23 PM, William McCarthy
>>>>>> >>>> <[hidden email]> wrote:
>>>>>> >>>>
>>>>>> >>>> Hi,
>>>>>> >>>>
>>>>>> >>>> I’m trying to write a proof-of-concept which takes messages from
>>>>>> >>>> Kafka, transforms them using Beam on Flink, then pushes the results onto a
>>>>>> >>>> different Kafka topic.
>>>>>> >>>>
>>>>>> >>>> I’ve used the KafkaWindowedWordCountExample as a starting point,
>>>>>> >>>> and that’s doing the first part of what I want to do, but it outputs to text
>>>>>> >>>> files as opposed to Kafka. FlinkKafkaProducer08 looks promising, but I can’t
>>>>>> >>>> figure out how to plug it into the pipeline. I was thinking that it would be
>>>>>> >>>> wrapped with an UnboundedFlinkSink, or some such, but that doesn’t seem to
>>>>>> >>>> exist.
>>>>>> >>>>
>>>>>> >>>> Any advice or thoughts on what I’m trying to do?
>>>>>> >>>>
>>>>>> >>>> I’m running the latest incubator-beam (as of last night from
>>>>>> >>>> Github), Flink 1.0.0 in cluster mode and Kafka 0.9.0.1, all on Google
>>>>>> >>>> Compute Engine (Debian Jessie).
>>>>>> >>>>
>>>>>> >>>> Thanks,
>>>>>> >>>>
>>>>>> >>>> Bill McCarthy
>>>>>> >>>
>>>>>> >>>
>>>>>> >>
>>>>>> >>
>>>>>> >
>>>>>> >
>>>>>>
>>>>>
>>>>>
>>>>
>>>>
>>>