Re: Integrate Flink with S3 on EMR cluster

Posted by Timur Fayruzov on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/Integrate-Flink-with-S3-on-EMR-cluster-tp5894p5926.html

Yes, Hadoop version was the culprit. It turns out that EMRFS requires at least 2.4.0 (judging from the exception in the initial post, I was not able to find the official requirements).

Rebuilding Flink with Hadoop 2.7.1 and with Scala 2.11 worked like a charm and I was able to run WordCount using S3 both for inputs and outputs. I did *not* need to change any configuration (as outlined https://ci.apache.org/projects/flink/flink-docs-master/apis/batch/connectors.html).

Thanks for bearing with me, Ufuk!

While looking for the solution I found this issue: https://issues.apache.org/jira/browse/FLINK-1337. I have a setup for EMR cluster now, so I can make  PR describing it. if it's still relevant. I, for one example, would have saved couple days if I had a guide like that.

Thanks,
Timur


On Tue, Apr 5, 2016 at 10:43 AM, Ufuk Celebi <[hidden email]> wrote:
Hey Timur,

if you are using EMR with IAM roles, Flink should work out of the box.
You don't need to change the Hadoop config and the IAM role takes care
of setting up all credentials at runtime. You don't need to hardcode
any keys in your application that way and this is the recommended way
to go in order to not worry about securely exchanging the keys and
then keeping them secure afterwards.

With EMR 4.4.0 you have to use a Flink binary version built against
Hadoop 2.7. Did you do that? Can you please retry with an
out-of-the-box Flink and just run it like this:

HADOOP_CONF_DIR =/etc/hadoop/conf bin/flink etc.

Hope this helps! Please report back. :-)

– Ufuk


On Tue, Apr 5, 2016 at 5:47 PM, Timur Fayruzov <[hidden email]> wrote:
> Hello Ufuk,
>
> I'm using EMR 4.4.0.
>
> Thanks,
> Timur
>
> On Tue, Apr 5, 2016 at 2:18 AM, Ufuk Celebi <[hidden email]> wrote:
>>
>> Hey Timur,
>>
>> which EMR version are you using?
>>
>> – Ufuk
>>
>> On Tue, Apr 5, 2016 at 1:43 AM, Timur Fayruzov <[hidden email]>
>> wrote:
>> > Thanks for the answer, Ken.
>> >
>> > My understanding is that file system selection is driven by the
>> > following
>> > sections in core-site.xml:
>> > <property>
>> >   <name>fs.s3.impl</name>
>> >   <!--<value>com.amazon.ws.emr.hadoop.fs.EmrFileSystem</value>--> <!--
>> > This
>> > was the original value -->
>> >   <value>org.apache.hadoop.fs.s3native.NativeS3FileSystem</value>
>> > </property>
>> >
>> > <property>
>> >   <name>fs.s3n.impl</name>
>> >   <value>com.amazon.ws.emr.hadoop.fs.EmrFileSystem</value>
>> > </property>
>> >
>> > If I run the program using configuration above with s3n (and also
>> > modifying
>> > credential keys to use s3n) it fails with the same error, but there is
>> > no
>> > "... opening key ..." logs. S3a seems to be not supported, it fails with
>> > the
>> > following:
>> > Caused by: java.io.IOException: No file system found with scheme s3a,
>> > referenced in file URI 's3a://<my key>'.
>> > at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:296)
>> > at org.apache.flink.core.fs.Path.getFileSystem(Path.java:311)
>> > at
>> >
>> > org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:450)
>> > at
>> >
>> > org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:57)
>> > at
>> >
>> > org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:156)
>> > ... 23 more
>> >
>> > I am puzzled by the fact that EMRFS is still apparently referenced
>> > somewhere
>> > as an implementation for S3 protocol, I'm not able to locate where this
>> > configuration is set.
>> >
>> >
>> > On Mon, Apr 4, 2016 at 4:07 PM, Ken Krugler
>> > <[hidden email]>
>> > wrote:
>> >>
>> >> Normally in Hadoop jobs you’d want to use s3n:// as the protocol, not
>> >> s3.
>> >>
>> >> Though EMR has some support for magically treating the s3 protocol as
>> >> s3n
>> >> (or maybe s3a now, with Hadoop 2.6 or later)
>> >>
>> >> What happens if you use s3n://<key info>/<path to file> for the --input
>> >> parameter?
>> >>
>> >> — Ken
>> >>
>> >> On Apr 4, 2016, at 2:51pm, Timur Fayruzov <[hidden email]>
>> >> wrote:
>> >>
>> >> Hello,
>> >>
>> >> I'm trying to run a Flink WordCount job on an AWS EMR cluster. I
>> >> succeeded
>> >> with a three-step procedure: load data from S3 to cluster's HDFS, run
>> >> Flink
>> >> Job, unload outputs from HDFS to S3.
>> >>
>> >> However, ideally I'd like to read/write data directly from/to S3. I
>> >> followed the instructions here:
>> >>
>> >> https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/batch/connectors.html,
>> >> more specifically I:
>> >>   1. Modified flink-conf to point to /etc/hadoop/conf
>> >>   2. Modified core-site.xml per link above (not clear why why it is not
>> >> using IAM, I had to provide AWS keys explicitly).
>> >>
>> >> Run the following command:
>> >> HADOOP_CONF_DIR=/etc/hadoop/conf flink-1.0.0/bin/flink run -m
>> >> yarn-cluster
>> >> -yn 1 -yjm 768 -ytm 768 flink-1.0.0/examples/batch/WordCount.jar
>> >> --input
>> >> s3://<my key> --output hdfs:///flink-output
>> >>
>> >> First, I see messages like that:
>> >> 2016-04-04 21:37:10,418 INFO
>> >> org.apache.hadoop.fs.s3native.NativeS3FileSystem              - Opening
>> >> key
>> >> '<my key>' for reading at position '333000'
>> >>
>> >> Then, it fails with the following error:
>> >>
>> >> ------------------------------------------------------------
>> >>
>> >>  The program finished with the following exception:
>> >>
>> >>
>> >> org.apache.flink.client.program.ProgramInvocationException: The program
>> >> execution failed: Failed to submit job fc13373d993539e647f164e12d82bf90
>> >> (WordCount Example)
>> >>
>> >> at org.apache.flink.client.program.Client.runBlocking(Client.java:381)
>> >>
>> >> at org.apache.flink.client.program.Client.runBlocking(Client.java:355)
>> >>
>> >> at org.apache.flink.client.program.Client.runBlocking(Client.java:315)
>> >>
>> >> at
>> >>
>> >> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:60)
>> >>
>> >> at
>> >>
>> >> org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:90)
>> >>
>> >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> >>
>> >> at
>> >>
>> >> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>> >>
>> >> at
>> >>
>> >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> >>
>> >> at java.lang.reflect.Method.invoke(Method.java:606)
>> >>
>> >> at
>> >>
>> >> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
>> >>
>> >> at
>> >>
>> >> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
>> >>
>> >> at org.apache.flink.client.program.Client.runBlocking(Client.java:248)
>> >>
>> >> at
>> >>
>> >> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
>> >>
>> >> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
>> >>
>> >> at
>> >>
>> >> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189)
>> >>
>> >> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239)
>> >>
>> >> Caused by: org.apache.flink.runtime.client.JobExecutionException:
>> >> Failed
>> >> to submit job fc13373d993539e647f164e12d82bf90 (WordCount Example)
>> >>
>> >> at
>> >>
>> >> org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1100)
>> >>
>> >> at
>> >>
>> >> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:380)
>> >>
>> >> at
>> >>
>> >> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
>> >>
>> >> at
>> >>
>> >> org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnMessage$1.applyOrElse(YarnJobManager.scala:153)
>> >>
>> >> at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)
>> >>
>> >> at
>> >>
>> >> org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:36)
>> >>
>> >> at
>> >>
>> >> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
>> >>
>> >> at
>> >>
>> >> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
>> >>
>> >> at
>> >>
>> >> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
>> >>
>> >> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
>> >>
>> >> at
>> >>
>> >> org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
>> >>
>> >> at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
>> >>
>> >> at
>> >>
>> >> org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:106)
>> >>
>> >> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>> >>
>> >> at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>> >>
>> >> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
>> >>
>> >> at akka.dispatch.Mailbox.run(Mailbox.scala:221)
>> >>
>> >> at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
>> >>
>> >> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>> >>
>> >> at
>> >>
>> >> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>> >>
>> >> at
>> >>
>> >> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>> >>
>> >> at
>> >>
>> >> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>> >>
>> >> Caused by: org.apache.flink.runtime.JobException: Creating the input
>> >> splits caused an error:
>> >>
>> >> org.apache.hadoop.conf.Configuration.addResource(Lorg/apache/hadoop/conf/Configuration;)V
>> >>
>> >> at
>> >>
>> >> org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:172)
>> >>
>> >> at
>> >>
>> >> org.apache.flink.runtime.executiongraph.ExecutionGraph.attachJobGraph(ExecutionGraph.java:696)
>> >>
>> >> at
>> >>
>> >> org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1023)
>> >>
>> >> ... 21 more
>> >>
>> >> Caused by: java.lang.NoSuchMethodError:
>> >>
>> >> org.apache.hadoop.conf.Configuration.addResource(Lorg/apache/hadoop/conf/Configuration;)V
>> >>
>> >> at
>> >>
>> >> com.amazon.ws.emr.hadoop.fs.EmrFileSystem.initialize(EmrFileSystem.java:96)
>> >>
>> >> at
>> >>
>> >> org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.initialize(HadoopFileSystem.java:321)
>> >>
>> >> at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:291)
>> >>
>> >> at org.apache.flink.core.fs.Path.getFileSystem(Path.java:311)
>> >>
>> >> at
>> >>
>> >> org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:450)
>> >>
>> >> at
>> >>
>> >> org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:57)
>> >>
>> >> at
>> >>
>> >> org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:156)
>> >>
>> >> ... 23 more
>> >>
>> >>
>> >> Somehow, it's still tries to use EMRFS (which may be a valid thing?),
>> >> but
>> >> it is failing to initialize. I don't know enough about EMRFS/S3 interop
>> >> so
>> >> don't know how diagnose it further.
>> >>
>> >> I run Flink 1.0.0 compiled for Scala 2.11.
>> >>
>> >> Any advice on how to make it work is highly appreciated.
>> >>
>> >>
>> >> Thanks,
>> >>
>> >> Timur
>> >>
>> >>
>> >>
>> >>
>> >> --------------------------
>> >> Ken Krugler
>> >> +1 530-210-6378
>> >> http://www.scaleunlimited.com
>> >> custom big data solutions & training
>> >> Hadoop, Cascading, Cassandra & Solr
>> >>
>> >>
>> >>
>> >>
>> >>
>> >
>
>