Hello Flink Team, We at IESE Fraunhofer are evaluating Flink for a project and I'm a bit frustrated in the moment. I've wrote a few testcases with the flink API and want to deploy them to an Flink EC2 Cluster. I setup the cluster using the karamel receipt which was adressed in the following video The setup works fine and the hello-flink app could be run. But afterwards I want to copy some data from s3 bucket to the local ec2 hdfs cluster. The hadoop fs -ls s3n.... works as well as cat,... But if I want to copy the data with distcp the command freezes, and does not respond until a timeout. After trying a few things I gave up and start another solution. I want to access the s3 Bucket directly with flink and import it using a small flink programm which just reads s3 and writes to local hadoop. This works fine locally, but on cluster the S3NFileSystem class is missing (ClassNotFound Exception) althoug it is included in the jar file of the installation. I forked the chef receipt and updated to flink 0.9.1 but the same issue. Is there another simple script to install flink with hadoop on an ec2 cluster and working s3n filesystem? Freelancer on Behalf of Fraunhofer IESE Kaiserslautern Viele Grüße
Thomas Götzinger Freiberuflicher Informatiker
Glockenstraße 2a D-66882 Hütschenhausen OT Spesbach Mobil: +49 (0)176 82180714 Homezone: +49 (0) 6371 735083 Privat: +49 (0) 6371 954050 epost: [hidden email] |
Hi Thomas, until recently, Flink provided an own implementation of a S3FileSystem which wasn't fully tested and buggy. We removed that implementation and are using now (in 0.10-SNAPSHOT) Hadoop's S3 implementation by default. If you want to continue using 0.9.1 you can configure Flink to use Hadoop's implementation. See this answer on StackOverflow and the linked email thread [1]. If you switch to the 0.10-SNAPSHOT version (which will be released in a few days as 0.10.0), things become a bit easier and Hadoop's implementation is used by default. The documentation shows how to configure your access keys [2] Please don't hesitate to ask if something is unclear or not working. Best, Fabian 2015-10-29 9:35 GMT+01:00 Thomas Götzinger <[hidden email]>:
|
In reply to this post by Thomas Götzinger
Hi Thomas, Try to switch to Emr amo 3.5 and register hadoop's s3 FileSystem instead of the one packed with flink *Sent from my ZenFone On Oct 29, 2015 4:36 AM, "Thomas Götzinger" <[hidden email]> wrote:
|
In reply to this post by Fabian Hueske-2
HI Fabian,
thanks for reply. I use a karamel receipt to install flink on ec2.Currently I am using flink-0.9.1-bin-hadoop24.tgz. In that file the NativeS3FileSystem is included. First I’ve tried it with the standard karamel receipt on github hopshadoop/flink-chef but it’s on Version 0.9.0 and the S3NFileSystem is not included. So I forked the github project by goetzingert/flink-chef Although the class file is include the application throws a ClassNotFoundException for the class above. In my Project I add the conf/core-site.xml <property> <name>fs.s3n.impl</name> <value>org.apache.hadoop.fs.s3native.NativeS3FileSystem</value> </property> <property> <name>fs.s3n.awsAccessKeyId</name> <value>….</value> </property> <property> <name>fs.s3n.awsSecretAccessKey</name> <value>...</value> </property> — I also tried to use the programmatic configuration XMLConfiguration config = new XMLConfiguration(configPath); env = ExecutionEnvironment.getExecutionEnvironment(); Configuration configuration = GlobalConfiguration.getConfiguration(); configuration.setString("fs.s3.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem"); configuration.setString("fs.s3n.awsAccessKeyId", “.."); configuration.setString("fs.s3n.awsSecretAccessKey”,”../"); configuration.setString("fs.hdfs.hdfssite",Template.class.getResource("/conf/core-site.xml").toString()); GlobalConfiguration.includeConfiguration(configuration); Any Idea why the class is not included in classpath? Is there another script to setup flink on ec2 cluster? When will flink 0.10 be released? Regards Thomas Götzinger Freiberuflicher Informatiker Glockenstraße 2a D-66882 Hütschenhausen OT Spesbach Mobil: +49 (0)176 82180714 Privat: +49 (0) 6371 954050 epost: [hidden email]
|
Sorry for Confusing,
the flink cluster throws following stack trace.. org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Failed to submit job 29a2a25d49aa0706588ccdb8b7e81c6c (Flink Java Job at Sun Nov 08 18:50:52 UTC 2015) at org.apache.flink.client.program.Client.run(Client.java:413) at org.apache.flink.client.program.Client.run(Client.java:356) at org.apache.flink.client.program.Client.run(Client.java:349) at org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:63) at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:789) at de.fraunhofer.iese.proopt.Template.run(Template.java:112) at de.fraunhofer.iese.proopt.Main.main(Main.java:8) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:437) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:353) at org.apache.flink.client.program.Client.run(Client.java:315) at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:582) at org.apache.flink.client.CliFrontend.run(CliFrontend.java:288) at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:878) at org.apache.flink.client.CliFrontend.main(CliFrontend.java:920) Caused by: org.apache.flink.runtime.client.JobExecutionException: Failed to submit job 29a2a25d49aa0706588ccdb8b7e81c6c (Flink Java Job at Sun Nov 08 18:50:52 UTC 2015) at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:594) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:190) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36) at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) at org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29) at akka.actor.Actor$class.aroundReceive(Actor.scala:465) at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at akka.actor.ActorCell.invoke(ActorCell.scala:487) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) at akka.dispatch.Mailbox.run(Mailbox.scala:221) at akka.dispatch.Mailbox.exec(Mailbox.scala:231) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: org.apache.flink.runtime.JobException: Creating the input splits caused an error: No file system found with scheme s3n, referenced in file URI '<a href="s3n://big-data-benchmark/pavlo/text/tiny/rankings'" class="">s3n://big-data-benchmark/pavlo/text/tiny/rankings'. at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:162) at org.apache.flink.runtime.executiongraph.ExecutionGraph.attachJobGraph(ExecutionGraph.java:469) at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:534) ... 19 more Caused by: java.io.IOException: No file system found with scheme s3n, referenced in file URI '<a href="s3n://big-data-benchmark/pavlo/text/tiny/rankings'" class="">s3n://big-data-benchmark/pavlo/text/tiny/rankings'. at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:247) at org.apache.flink.core.fs.Path.getFileSystem(Path.java:309) at org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:447) at org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:57) at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:146) ... 21 more -- Viele Grüße Thomas Götzinger Freiberuflicher Informatiker Glockenstraße 2a D-66882 Hütschenhausen OT Spesbach Mobil: +49 (0)176 82180714 Privat: +49 (0) 6371 954050 epost: [hidden email]
|
Hi Thomas,
It appears Flink couldn't pick up the Hadoop configuration. Did you set the environment variables HADOOP_CONF_DIR or HADOOP_HOME? Best, Max On Sun, Nov 8, 2015 at 7:52 PM, Thomas Götzinger <[hidden email]> wrote: > Sorry for Confusing, > > the flink cluster throws following stack trace.. > > org.apache.flink.client.program.ProgramInvocationException: The program > execution failed: Failed to submit job 29a2a25d49aa0706588ccdb8b7e81c6c > (Flink Java Job at Sun Nov 08 18:50:52 UTC 2015) > at org.apache.flink.client.program.Client.run(Client.java:413) > at org.apache.flink.client.program.Client.run(Client.java:356) > at org.apache.flink.client.program.Client.run(Client.java:349) > at > org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:63) > at > org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:789) > at de.fraunhofer.iese.proopt.Template.run(Template.java:112) > at de.fraunhofer.iese.proopt.Main.main(Main.java:8) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:437) > at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:353) > at org.apache.flink.client.program.Client.run(Client.java:315) > at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:582) > at org.apache.flink.client.CliFrontend.run(CliFrontend.java:288) > at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:878) > at org.apache.flink.client.CliFrontend.main(CliFrontend.java:920) > Caused by: org.apache.flink.runtime.client.JobExecutionException: Failed to > submit job 29a2a25d49aa0706588ccdb8b7e81c6c (Flink Java Job at Sun Nov 08 > 18:50:52 UTC 2015) > at > org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:594) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:190) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) > at > org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36) > at > org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29) > at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) > at > org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29) > at akka.actor.Actor$class.aroundReceive(Actor.scala:465) > at > org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) > at akka.actor.ActorCell.invoke(ActorCell.scala:487) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) > at akka.dispatch.Mailbox.run(Mailbox.scala:221) > at akka.dispatch.Mailbox.exec(Mailbox.scala:231) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: org.apache.flink.runtime.JobException: Creating the input splits > caused an error: No file system found with scheme s3n, referenced in file > URI 's3n://big-data-benchmark/pavlo/text/tiny/rankings'. > at > org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:162) > at > org.apache.flink.runtime.executiongraph.ExecutionGraph.attachJobGraph(ExecutionGraph.java:469) > at > org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:534) > ... 19 more > Caused by: java.io.IOException: No file system found with scheme s3n, > referenced in file URI 's3n://big-data-benchmark/pavlo/text/tiny/rankings'. > at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:247) > at org.apache.flink.core.fs.Path.getFileSystem(Path.java:309) > at > org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:447) > at > org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:57) > at > org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:146) > ... 21 more > > -- > > Viele Grüße > > > > Thomas Götzinger > > Freiberuflicher Informatiker > > > > Glockenstraße 2a > > D-66882 Hütschenhausen OT Spesbach > > Mobil: +49 (0)176 82180714 > > Privat: +49 (0) 6371 954050 > > mailto:[hidden email] > > epost: [hidden email] > > > > > > On 08.11.2015, at 19:06, Thomas Götzinger <[hidden email]> wrote: > > HI Fabian, > > thanks for reply. I use a karamel receipt to install flink on ec2.Currently > I am using flink-0.9.1-bin-hadoop24.tgz. > > In that file the NativeS3FileSystem is included. First I’ve tried it with > the standard karamel receipt on github hopshadoop/flink-chef but it’s on > Version 0.9.0 and the S3NFileSystem is not included. > So I forked the github project by goetzingert/flink-chef > Although the class file is include the application throws a > ClassNotFoundException for the class above. > In my Project I add the conf/core-site.xml > > <property> > <name>fs.s3n.impl</name> > <value>org.apache.hadoop.fs.s3native.NativeS3FileSystem</value> > </property> > <property> > <name>fs.s3n.awsAccessKeyId</name> > <value>….</value> > </property> > <property> > <name>fs.s3n.awsSecretAccessKey</name> > <value>...</value> > </property> > > — > I also tried to use the programmatic configuration > > XMLConfiguration config = new XMLConfiguration(configPath); > > env = ExecutionEnvironment.getExecutionEnvironment(); > Configuration configuration = GlobalConfiguration.getConfiguration(); > configuration.setString("fs.s3.impl", > "org.apache.hadoop.fs.s3native.NativeS3FileSystem"); > configuration.setString("fs.s3n.awsAccessKeyId", “.."); > configuration.setString("fs.s3n.awsSecretAccessKey”,”../"); > configuration.setString("fs.hdfs.hdfssite",Template.class.getResource("/conf/core-site.xml").toString()); > GlobalConfiguration.includeConfiguration(configuration); > > > Any Idea why the class is not included in classpath? Is there another script > to setup flink on ec2 cluster? > > When will flink 0.10 be released? > > > Regards > > > > Thomas Götzinger > > Freiberuflicher Informatiker > > > > Glockenstraße 2a > > D-66882 Hütschenhausen OT Spesbach > > Mobil: +49 (0)176 82180714 > > Privat: +49 (0) 6371 954050 > > mailto:[hidden email] > > epost: [hidden email] > > > > > > On 29.10.2015, at 09:47, Fabian Hueske <[hidden email]> wrote: > > Hi Thomas, > > until recently, Flink provided an own implementation of a S3FileSystem which > wasn't fully tested and buggy. > We removed that implementation and are using now (in 0.10-SNAPSHOT) Hadoop's > S3 implementation by default. > > If you want to continue using 0.9.1 you can configure Flink to use Hadoop's > implementation. See this answer on StackOverflow and the linked email thread > [1]. > If you switch to the 0.10-SNAPSHOT version (which will be released in a few > days as 0.10.0), things become a bit easier and Hadoop's implementation is > used by default. The documentation shows how to configure your access keys > [2] > > Please don't hesitate to ask if something is unclear or not working. > > Best, Fabian > > [1] > http://stackoverflow.com/questions/32959790/run-apache-flink-with-amazon-s3 > [2] > https://ci.apache.org/projects/flink/flink-docs-master/apis/example_connectors.html > > 2015-10-29 9:35 GMT+01:00 Thomas Götzinger <[hidden email]>: >> >> Hello Flink Team, >> >> We at IESE Fraunhofer are evaluating Flink for a project and I'm a bit >> frustrated in the moment. >> >> I've wrote a few testcases with the flink API and want to deploy them to >> an Flink EC2 Cluster. I setup the cluster using the >> karamel receipt which was adressed in the following video >> >> >> https://www.google.de/url?sa=t&rct=j&q=&esrc=s&source=video&cd=1&cad=rja&uact=8&ved=0CDIQtwIwAGoVChMIy86Tq6rQyAIVR70UCh0IRwuJ&url=http%3A%2F%2Fwww.youtube.com%2Fwatch%3Fv%3Dm_SkhyMV0to&usg=AFQjCNGKUzFv521yg-OTy-1XqS2-rbZKug&bvm=bv.105454873,d.bGg >> >> The setup works fine and the hello-flink app could be run. But afterwards >> I want to copy some data from s3 bucket to the local ec2 hdfs cluster. >> >> The hadoop fs -ls s3n.... works as well as cat,... >> But if I want to copy the data with distcp the command freezes, and does >> not respond until a timeout. >> >> After trying a few things I gave up and start another solution. I want to >> access the s3 Bucket directly with flink and import it using a small flink >> programm which just reads s3 and writes to local hadoop. This works fine >> locally, but on cluster the S3NFileSystem class is missing (ClassNotFound >> Exception) althoug it is included in the jar file of the installation. >> >> >> I forked the chef receipt and updated to flink 0.9.1 but the same issue. >> >> Is there another simple script to install flink with hadoop on an ec2 >> cluster and working s3n filesystem? >> >> >> >> Freelancer >> >> on Behalf of Fraunhofer IESE Kaiserslautern >> >> >> -- >> >> Viele Grüße >> >> >> >> Thomas Götzinger >> >> Freiberuflicher Informatiker >> >> >> >> Glockenstraße 2a >> >> D-66882 Hütschenhausen OT Spesbach >> >> Mobil: +49 (0)176 82180714 >> >> Homezone: +49 (0) 6371 735083 >> >> Privat: +49 (0) 6371 954050 >> >> mailto:[hidden email] >> >> epost: [hidden email] > > > > |
But the default wordcount example in which flink is accessing hadoop runs? Or is that something different? Am 09.11.2015 11:54 schrieb "Maximilian Michels" <[hidden email]>:
Hi Thomas, |
Hi Thomas, I'm sorry that nobody responded anymore. As you you've probably noticed, there is a lot of traffic on the mailing lists and sometimes stuff gets lost. Were you able to get the S3 file system running with Flink? If not, lets try to figure out why it is not picking up the config correctly. On Mon, Nov 9, 2015 at 2:41 PM, Thomas Götzinger <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |