http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/Integrate-Flink-with-S3-on-EMR-cluster-tp5894p5925.html
of setting up all credentials at runtime. You don't need to hardcode
Hadoop 2.7. Did you do that? Can you please retry with an
Hope this helps! Please report back. :-)
> Hello Ufuk,
>
> I'm using EMR 4.4.0.
>
> Thanks,
> Timur
>
> On Tue, Apr 5, 2016 at 2:18 AM, Ufuk Celebi <
[hidden email]> wrote:
>>
>> Hey Timur,
>>
>> which EMR version are you using?
>>
>> – Ufuk
>>
>> On Tue, Apr 5, 2016 at 1:43 AM, Timur Fayruzov <
[hidden email]>
>> wrote:
>> > Thanks for the answer, Ken.
>> >
>> > My understanding is that file system selection is driven by the
>> > following
>> > sections in core-site.xml:
>> > <property>
>> > <name>fs.s3.impl</name>
>> > <!--<value>com.amazon.ws.emr.hadoop.fs.EmrFileSystem</value>--> <!--
>> > This
>> > was the original value -->
>> > <value>org.apache.hadoop.fs.s3native.NativeS3FileSystem</value>
>> > </property>
>> >
>> > <property>
>> > <name>fs.s3n.impl</name>
>> > <value>com.amazon.ws.emr.hadoop.fs.EmrFileSystem</value>
>> > </property>
>> >
>> > If I run the program using configuration above with s3n (and also
>> > modifying
>> > credential keys to use s3n) it fails with the same error, but there is
>> > no
>> > "... opening key ..." logs. S3a seems to be not supported, it fails with
>> > the
>> > following:
>> > Caused by: java.io.IOException: No file system found with scheme s3a,
>> > referenced in file URI 's3a://<my key>'.
>> > at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:296)
>> > at org.apache.flink.core.fs.Path.getFileSystem(Path.java:311)
>> > at
>> >
>> > org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:450)
>> > at
>> >
>> > org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:57)
>> > at
>> >
>> > org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:156)
>> > ... 23 more
>> >
>> > I am puzzled by the fact that EMRFS is still apparently referenced
>> > somewhere
>> > as an implementation for S3 protocol, I'm not able to locate where this
>> > configuration is set.
>> >
>> >
>> > On Mon, Apr 4, 2016 at 4:07 PM, Ken Krugler
>> > <
[hidden email]>
>> > wrote:
>> >>
>> >> Normally in Hadoop jobs you’d want to use s3n:// as the protocol, not
>> >> s3.
>> >>
>> >> Though EMR has some support for magically treating the s3 protocol as
>> >> s3n
>> >> (or maybe s3a now, with Hadoop 2.6 or later)
>> >>
>> >> What happens if you use s3n://<key info>/<path to file> for the --input
>> >> parameter?
>> >>
>> >> — Ken
>> >>
>> >> On Apr 4, 2016, at 2:51pm, Timur Fayruzov <
[hidden email]>
>> >> wrote:
>> >>
>> >> Hello,
>> >>
>> >> I'm trying to run a Flink WordCount job on an AWS EMR cluster. I
>> >> succeeded
>> >> with a three-step procedure: load data from S3 to cluster's HDFS, run
>> >> Flink
>> >> Job, unload outputs from HDFS to S3.
>> >>
>> >> However, ideally I'd like to read/write data directly from/to S3. I
>> >> followed the instructions here:
>> >>
>> >>
https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/batch/connectors.html,
>> >> more specifically I:
>> >> 1. Modified flink-conf to point to /etc/hadoop/conf
>> >> 2. Modified core-site.xml per link above (not clear why why it is not
>> >> using IAM, I had to provide AWS keys explicitly).
>> >>
>> >> Run the following command:
>> >> HADOOP_CONF_DIR=/etc/hadoop/conf flink-1.0.0/bin/flink run -m
>> >> yarn-cluster
>> >> -yn 1 -yjm 768 -ytm 768 flink-1.0.0/examples/batch/WordCount.jar
>> >> --input
>> >> s3://<my key> --output hdfs:///flink-output
>> >>
>> >> First, I see messages like that:
>> >> 2016-04-04 21:37:10,418 INFO
>> >> org.apache.hadoop.fs.s3native.NativeS3FileSystem - Opening
>> >> key
>> >> '<my key>' for reading at position '333000'
>> >>
>> >> Then, it fails with the following error:
>> >>
>> >> ------------------------------------------------------------
>> >>
>> >> The program finished with the following exception:
>> >>
>> >>
>> >> org.apache.flink.client.program.ProgramInvocationException: The program
>> >> execution failed: Failed to submit job fc13373d993539e647f164e12d82bf90
>> >> (WordCount Example)
>> >>
>> >> at org.apache.flink.client.program.Client.runBlocking(Client.java:381)
>> >>
>> >> at org.apache.flink.client.program.Client.runBlocking(Client.java:355)
>> >>
>> >> at org.apache.flink.client.program.Client.runBlocking(Client.java:315)
>> >>
>> >> at
>> >>
>> >> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:60)
>> >>
>> >> at
>> >>
>> >> org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:90)
>> >>
>> >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> >>
>> >> at
>> >>
>> >> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>> >>
>> >> at
>> >>
>> >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> >>
>> >> at java.lang.reflect.Method.invoke(Method.java:606)
>> >>
>> >> at
>> >>
>> >> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
>> >>
>> >> at
>> >>
>> >> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
>> >>
>> >> at org.apache.flink.client.program.Client.runBlocking(Client.java:248)
>> >>
>> >> at
>> >>
>> >> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
>> >>
>> >> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
>> >>
>> >> at
>> >>
>> >> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189)
>> >>
>> >> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239)
>> >>
>> >> Caused by: org.apache.flink.runtime.client.JobExecutionException:
>> >> Failed
>> >> to submit job fc13373d993539e647f164e12d82bf90 (WordCount Example)
>> >>
>> >> at
>> >>
>> >> org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1100)
>> >>
>> >> at
>> >>
>> >> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:380)
>> >>
>> >> at
>> >>
>> >> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
>> >>
>> >> at
>> >>
>> >> org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnMessage$1.applyOrElse(YarnJobManager.scala:153)
>> >>
>> >> at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)
>> >>
>> >> at
>> >>
>> >> org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:36)
>> >>
>> >> at
>> >>
>> >> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
>> >>
>> >> at
>> >>
>> >> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
>> >>
>> >> at
>> >>
>> >> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
>> >>
>> >> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
>> >>
>> >> at
>> >>
>> >> org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
>> >>
>> >> at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
>> >>
>> >> at
>> >>
>> >> org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:106)
>> >>
>> >> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>> >>
>> >> at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>> >>
>> >> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
>> >>
>> >> at akka.dispatch.Mailbox.run(Mailbox.scala:221)
>> >>
>> >> at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
>> >>
>> >> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>> >>
>> >> at
>> >>
>> >> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>> >>
>> >> at
>> >>
>> >> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>> >>
>> >> at
>> >>
>> >> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>> >>
>> >> Caused by: org.apache.flink.runtime.JobException: Creating the input
>> >> splits caused an error:
>> >>
>> >> org.apache.hadoop.conf.Configuration.addResource(Lorg/apache/hadoop/conf/Configuration;)V
>> >>
>> >> at
>> >>
>> >> org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:172)
>> >>
>> >> at
>> >>
>> >> org.apache.flink.runtime.executiongraph.ExecutionGraph.attachJobGraph(ExecutionGraph.java:696)
>> >>
>> >> at
>> >>
>> >> org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1023)
>> >>
>> >> ... 21 more
>> >>
>> >> Caused by: java.lang.NoSuchMethodError:
>> >>
>> >> org.apache.hadoop.conf.Configuration.addResource(Lorg/apache/hadoop/conf/Configuration;)V
>> >>
>> >> at
>> >>
>> >> com.amazon.ws.emr.hadoop.fs.EmrFileSystem.initialize(EmrFileSystem.java:96)
>> >>
>> >> at
>> >>
>> >> org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.initialize(HadoopFileSystem.java:321)
>> >>
>> >> at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:291)
>> >>
>> >> at org.apache.flink.core.fs.Path.getFileSystem(Path.java:311)
>> >>
>> >> at
>> >>
>> >> org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:450)
>> >>
>> >> at
>> >>
>> >> org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:57)
>> >>
>> >> at
>> >>
>> >> org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:156)
>> >>
>> >> ... 23 more
>> >>
>> >>
>> >> Somehow, it's still tries to use EMRFS (which may be a valid thing?),
>> >> but
>> >> it is failing to initialize. I don't know enough about EMRFS/S3 interop
>> >> so
>> >> don't know how diagnose it further.
>> >>
>> >> I run Flink 1.0.0 compiled for Scala 2.11.
>> >>
>> >> Any advice on how to make it work is highly appreciated.
>> >>
>> >>
>> >> Thanks,
>> >>
>> >> Timur
>> >>
>> >>
>> >>
>> >>
>> >> --------------------------
>> >> Ken Krugler
>> >> +1 530-210-6378
>> >>
http://www.scaleunlimited.com>> >> custom big data solutions & training
>> >> Hadoop, Cascading, Cassandra & Solr
>> >>
>> >>
>> >>
>> >>
>> >>
>> >
>
>