Re: Integrate Flink with S3 on EMR cluster

Posted by Ken Krugler on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/Integrate-Flink-with-S3-on-EMR-cluster-tp5894p5895.html

Normally in Hadoop jobs you’d want to use s3n:// as the protocol, not s3.

Though EMR has some support for magically treating the s3 protocol as s3n (or maybe s3a now, with Hadoop 2.6 or later)

What happens if you use s3n://<key info>/<path to file> for the --input parameter?

— Ken

On Apr 4, 2016, at 2:51pm, Timur Fayruzov <[hidden email]> wrote:

Hello,

I'm trying to run a Flink WordCount job on an AWS EMR cluster. I succeeded with a three-step procedure: load data from S3 to cluster's HDFS, run Flink Job, unload outputs from HDFS to S3.

However, ideally I'd like to read/write data directly from/to S3. I followed the instructions here: https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/batch/connectors.html, more specifically I:
  1. Modified flink-conf to point to /etc/hadoop/conf
  2. Modified core-site.xml per link above (not clear why why it is not using IAM, I had to provide AWS keys explicitly).

Run the following command:
HADOOP_CONF_DIR=/etc/hadoop/conf flink-1.0.0/bin/flink run -m yarn-cluster -yn 1 -yjm 768 -ytm 768 flink-1.0.0/examples/batch/WordCount.jar --input s3://<my key> --output <a href="hdfs:///flink-output" class="">hdfs:///flink-output  

First, I see messages like that:
2016-04-04 21:37:10,418 INFO  org.apache.hadoop.fs.s3native.NativeS3FileSystem              - Opening key '<my key>' for reading at position '333000'
 
Then, it fails with the following error:

------------------------------------------------------------

 The program finished with the following exception:


org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Failed to submit job fc13373d993539e647f164e12d82bf90 (WordCount Example)

at org.apache.flink.client.program.Client.runBlocking(Client.java:381)

at org.apache.flink.client.program.Client.runBlocking(Client.java:355)

at org.apache.flink.client.program.Client.runBlocking(Client.java:315)

at org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:60)

at org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:90)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)

at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:606)

at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)

at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)

at org.apache.flink.client.program.Client.runBlocking(Client.java:248)

at org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)

at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)

at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189)

at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239)

Caused by: org.apache.flink.runtime.client.JobExecutionException: Failed to submit job fc13373d993539e647f164e12d82bf90 (WordCount Example)

at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1100)

at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:380)

at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)

at org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnMessage$1.applyOrElse(YarnJobManager.scala:153)

at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)

at org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:36)

at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)

at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)

at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)

at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)

at org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)

at akka.actor.Actor$class.aroundReceive(Actor.scala:465)

at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:106)

at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)

at akka.actor.ActorCell.invoke(ActorCell.scala:487)

at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)

at akka.dispatch.Mailbox.run(Mailbox.scala:221)

at akka.dispatch.Mailbox.exec(Mailbox.scala:231)

at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)

at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)

at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)

at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

Caused by: org.apache.flink.runtime.JobException: Creating the input splits caused an error: org.apache.hadoop.conf.Configuration.addResource(Lorg/apache/hadoop/conf/Configuration;)V

at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:172)

at org.apache.flink.runtime.executiongraph.ExecutionGraph.attachJobGraph(ExecutionGraph.java:696)

at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1023)

... 21 more

Caused by: java.lang.NoSuchMethodError: org.apache.hadoop.conf.Configuration.addResource(Lorg/apache/hadoop/conf/Configuration;)V

at com.amazon.ws.emr.hadoop.fs.EmrFileSystem.initialize(EmrFileSystem.java:96)

at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.initialize(HadoopFileSystem.java:321)

at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:291)

at org.apache.flink.core.fs.Path.getFileSystem(Path.java:311)

at org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:450)

at org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:57)

at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:156)

... 23 more 


Somehow, it's still tries to use EMRFS (which may be a valid thing?), but it is failing to initialize. I don't know enough about EMRFS/S3 interop so don't know how diagnose it further.

I run Flink 1.0.0 compiled for Scala 2.11.

Any advice on how to make it work is highly appreciated.


Thanks,

Timur




--------------------------
Ken Krugler
+1 530-210-6378
custom big data solutions & training
Hadoop, Cascading, Cassandra & Solr