Hello,
I try to set up apache zeppelin with a flink cluster (one jobmanager, one task manager). What i did was using the dockerfiles in flink-contrib/docker-flink + the latest binary release of apache zeppelin with all interpreters: https://github.com/apache/flink/blob/master/flink-contrib/docker-flink/Dockerfile (i changed the flink version to 1.0.3 to match zeppelin's flink version) I built another docker image around the latest binary release of zeppelin (with all interpreters), and i reconfigure the flink interpreter:
when i try to submit a flink job, i get an error state and the following exception appears in the log (nothing appears in the jobmanager log) ERROR [2016-08-23 11:44:57,932] ({Thread-16} JobProgressPoller.java[run]:54) - Can not get or update progress org.apache.zeppelin.interpreter.InterpreterException: org.apache.thrift.transport.TTransportException at org.apache.zeppelin.interpreter.remote.RemoteInterpreter.getProgress(RemoteInterpreter.java:373) at org.apache.zeppelin.interpreter.LazyOpenInterpreter.getProgress(LazyOpenInterpreter.java:111) at org.apache.zeppelin.notebook.Paragraph.progress(Paragraph.java:237) at org.apache.zeppelin.scheduler.JobProgressPoller.run(JobProgressPoller.java:51) Caused by: org.apache.thrift.transport.TTransportException at org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:132) at org.apache.thrift.transport.TTransport.readAll(TTransport.java:86) at org.apache.thrift.protocol.TBinaryProtocol.readAll(TBinaryProtocol.java:429) at org.apache.thrift.protocol.TBinaryProtocol.readI32(TBinaryProtocol.java:318) at org.apache.thrift.protocol.TBinaryProtocol.readMessageBegin(TBinaryProtocol.java:219) at org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:69) at org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService$Client.recv_getProgress(RemoteInterpreterService.java:296) at org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService$Client.getProgress(RemoteInterpreterService.java:281) at org.apache.zeppelin.interpreter.remote.RemoteInterpreter.getProgress(RemoteInterpreter.java:370) ... 3 more Flink in local mode works fine on zeppelin. Could somebody point me to what i'm doing wrong ? Thanks a lot! Frank |
Hello, for reference: i already found out that "connect to existing process" was my error here: it means connecting to an existing zeppelin interpreter, not an existing flink cluster. After fixing my error, i'm now in the same situation as described here: i guess it's more a zeppelin problem than a flink problem tho, as i see both interpreter JVM and main zeppelin JVM waiting on thrift input (so it seems they are waiting for each other) greetings, Frank On Tue, Aug 23, 2016 at 2:09 PM, Frank Dekervel <[hidden email]> wrote:
|
Hi!
There are some people familiar with the Zeppelin integration. CCing Till and Trevor. Otherwise, you could also send this to the Zeppelin community. Cheers, Max On Wed, Aug 24, 2016 at 12:58 PM, Frank Dekervel <[hidden email]> wrote: > Hello, > > for reference: > > i already found out that "connect to existing process" was my error here: it > means connecting to an existing zeppelin interpreter, not an existing flink > cluster. After fixing my error, i'm now in the same situation as described > here: > > https://stackoverflow.com/questions/38688277/flink-zeppelin-not-responding > > i guess it's more a zeppelin problem than a flink problem tho, as i see both > interpreter JVM and main zeppelin JVM waiting on thrift input (so it seems > they are waiting for each other) > > greetings, > Frank > > > > > On Tue, Aug 23, 2016 at 2:09 PM, Frank Dekervel <[hidden email]> wrote: >> >> Hello, >> >> I try to set up apache zeppelin with a flink cluster (one jobmanager, one >> task manager). >> >> What i did was using the dockerfiles in flink-contrib/docker-flink + the >> latest binary release of apache zeppelin with all interpreters: >> >> >> https://github.com/apache/flink/blob/master/flink-contrib/docker-flink/Dockerfile >> (i changed the flink version to 1.0.3 to match zeppelin's flink version) >> >> I built another docker image around the latest binary release of zeppelin >> (with all interpreters), and i reconfigure the flink interpreter: >> >> connect to existing process >> host: jobmanager, port: 6123 >> i removed all other properties >> >> when i try to submit a flink job, i get an error state and the following >> exception appears in the log (nothing appears in the jobmanager log) >> >> ERROR [2016-08-23 11:44:57,932] ({Thread-16} >> JobProgressPoller.java[run]:54) - Can not get or update progress >> org.apache.zeppelin.interpreter.InterpreterException: >> org.apache.thrift.transport.TTransportException >> at >> org.apache.zeppelin.interpreter.remote.RemoteInterpreter.getProgress(RemoteInterpreter.java:373) >> at >> org.apache.zeppelin.interpreter.LazyOpenInterpreter.getProgress(LazyOpenInterpreter.java:111) >> at >> org.apache.zeppelin.notebook.Paragraph.progress(Paragraph.java:237) >> at >> org.apache.zeppelin.scheduler.JobProgressPoller.run(JobProgressPoller.java:51) >> Caused by: org.apache.thrift.transport.TTransportException >> at >> org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:132) >> at >> org.apache.thrift.transport.TTransport.readAll(TTransport.java:86) >> at >> org.apache.thrift.protocol.TBinaryProtocol.readAll(TBinaryProtocol.java:429) >> at >> org.apache.thrift.protocol.TBinaryProtocol.readI32(TBinaryProtocol.java:318) >> at >> org.apache.thrift.protocol.TBinaryProtocol.readMessageBegin(TBinaryProtocol.java:219) >> at >> org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:69) >> at >> org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService$Client.recv_getProgress(RemoteInterpreterService.java:296) >> at >> org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService$Client.getProgress(RemoteInterpreterService.java:281) >> at >> org.apache.zeppelin.interpreter.remote.RemoteInterpreter.getProgress(RemoteInterpreter.java:370) >> ... 3 more >> >> Flink in local mode works fine on zeppelin. >> Could somebody point me to what i'm doing wrong ? >> >> Thanks a lot! >> Frank >> >> >> > |
Hey Frank, Saw your post on the Zeppelin list yesterday. I can look at it later this morning, but my gut feeling is a ghost Zeppelin daemon is running in the background and it's local Flink is holding the port 6123. This is fairly common and would explain the issue. Idk if you're on linux or windows or whatever, but have you tried rebooting the machine? (sorry if you said you did higher in the email). Also I very vaguely remember there is a boot order that matters with Flink and Zeppelin, like you need to start flink first then zeppelin, or vice verse. I feel like it is Flink first, then Zeppelin. Hope that helps, will dig in later if not. tg Trevor Grant Data Scientist "Fortunate is he, who is able to know the causes of things." -Virgil On Wed, Aug 24, 2016 at 6:50 AM, Maximilian Michels <[hidden email]> wrote: Hi! |
In reply to this post by Maximilian Michels
Frank, can you post the zeppelin flink log please? You can probably find it in zeppelin_dir/logs/*flink*.log You've got a few moving pieces here. I've never run zeppelin against Flink in a docker container. But I think the Zeppelin-Flink log is the first place to look. You say you can't get Zeppelin to work in local mode either right? Just curious, is Zeppelin running in a docker too? Thanks, tg Trevor Grant Data Scientist "Fortunate is he, who is able to know the causes of things." -Virgil On Wed, Aug 24, 2016 at 6:50 AM, Maximilian Michels <[hidden email]> wrote: Hi! |
Hello Trevor, Thanks for your suggestion. The log does not explain a lot: on the flink side i don't see anything at all, on the zeppelin side i see this: Your suggestion sounds plausible, as i always start zeppelin, and then change the configuration from local to remote.. however, port 6123 locally doesn't seem to be open ==> zeppelin--94490c51d71e.log <== INFO [2016-08-25 12:53:24,168] ({qtp846063400-48} InterpreterFactory.java[createInterpretersForNote]:576) - Create interpreter instance flink for note 2BW8NMCKW INFO [2016-08-25 12:53:24,168] ({qtp846063400-48} InterpreterFactory.java[createInterpretersForNote]:606) - Interpreter org.apache.zeppelin.flink.FlinkInterpreter 795344042 created INFO [2016-08-25 12:53:24,169] ({pool-1-thread-3} SchedulerFactory.java[jobStarted]:131) - Job paragraph_1471964818018_1833520437 started by scheduler org.apache.zeppelin.interpreter.remote.RemoteInterpretershared_session513606587 INFO [2016-08-25 12:53:24,170] ({pool-1-thread-3} Paragraph.java[jobRun]:252) - run paragraph 20160823-150658_99117457 using null org.apache.zeppelin.interpreter.LazyOpenInterpreter@2f67fcaa INFO [2016-08-25 12:53:24,170] ({pool-1-thread-3} RemoteInterpreterProcess.java[reference]:148) - Run interpreter process [/opt/zeppelin/bin/interpreter.sh, -d, /opt/zeppelin/interpreter/flink, -p, 45769, -l, /opt/zeppelin/local-repo/2BVEQGGEN] INFO [2016-08-25 12:53:24,672] ({pool-1-thread-3} RemoteInterpreter.java[init]:170) - Create remote interpreter org.apache.zeppelin.flink.FlinkInterpreter after doing %flink, i see this in ps auxw: /usr/lib/jvm/java-1.8-openjdk/bin/java -Dfile.encoding=UTF-8 -Dlog4j.configuration=file:///opt/zeppelin/conf/log4j.properties -Dzeppelin.log.file=/opt/zeppelin/logs/zeppelin-interpreter-flink--94490c51d71e.log -Xms1024m -Xmx1024m -XX:MaxPermSize=512m -cp ::/opt/zeppelin/interpreter/flink/*::/opt/zeppelin/lib/zeppelin-interpreter-0.6.1.jar org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer 45769 /usr/lib/jvm/java-1.8-openjdk/bin/java -Dfile.encoding=UTF-8 -Xms1024m -Xmx1024m -XX:MaxPermSize=512m -Dlog4j.configuration=file:///opt/zeppelin/conf/log4j.properties -Dzeppelin.log.file=/opt/zeppelin/logs/zeppelin--94490c51d71e.log -cp ::/opt/zeppelin/lib/*:/opt/zeppelin/*::/opt/zeppelin/conf org.apache.zeppelin.server.ZeppelinServer the stdout of zeppelin flink process after doing a stackdump looks like this (note the exception, i didn't notice it before) zeppelin_1 | Exception in thread "pool-1-thread-2" java.lang.NoClassDefFoundError: scala/collection/Seq zeppelin_1 | at java.lang.Class.forName0(Native Method) zeppelin_1 | at java.lang.Class.forName(Class.java:264) zeppelin_1 | at org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer.createInterpreter(RemoteInterpreterServer.java:148) zeppelin_1 | at org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService$Processor$createInterpreter.getResult(RemoteInterpreterService.java:1409) zeppelin_1 | at org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService$Processor$createInterpreter.getResult(RemoteInterpreterService.java:1394) zeppelin_1 | at org.apache.thrift.ProcessFunction.process(ProcessFunction.java:39) zeppelin_1 | at org.apache.thrift.TBaseProcessor.process(TBaseProcessor.java:39) zeppelin_1 | at org.apache.thrift.server.TThreadPoolServer$WorkerProcess.run(TThreadPoolServer.java:285) zeppelin_1 | at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) zeppelin_1 | at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) zeppelin_1 | at java.lang.Thread.run(Thread.java:745) zeppelin_1 | Caused by: java.lang.ClassNotFoundException: scala.collection.Seq zeppelin_1 | at java.net.URLClassLoader.findClass(URLClassLoader.java:381) zeppelin_1 | at java.lang.ClassLoader.loadClass(ClassLoader.java:424) zeppelin_1 | at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331) zeppelin_1 | at java.lang.ClassLoader.loadClass(ClassLoader.java:357) zeppelin_1 | ... 11 more zeppelin_1 | 2016-08-25 12:56:03 zeppelin_1 | Full thread dump OpenJDK 64-Bit Server VM (25.92-b14 mixed mode): zeppelin_1 | zeppelin_1 | "pool-1-thread-5" #15 prio=5 os_prio=0 tid=0x00005567976e8000 nid=0x108 waiting on condition [0x00007fa83ca8d000] zeppelin_1 | java.lang.Thread.State: WAITING (parking) zeppelin_1 | at sun.misc.Unsafe.park(Native Method) zeppelin_1 | - parking to wait for <0x00000000ebc3dae0> (a java.util.concurrent.SynchronousQueue$TransferStack) zeppelin_1 | at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) zeppelin_1 | at java.util.concurrent.SynchronousQueue$TransferStack.awaitFulfill(SynchronousQueue.java:458) zeppelin_1 | at java.util.concurrent.SynchronousQueue$TransferStack.transfer(SynchronousQueue.java:362) zeppelin_1 | at java.util.concurrent.SynchronousQueue.take(SynchronousQueue.java:924) zeppelin_1 | at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067) zeppelin_1 | at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127) zeppelin_1 | at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) zeppelin_1 | at java.lang.Thread.run(Thread.java:745) zeppelin_1 | zeppelin_1 | "pool-1-thread-4" #14 prio=5 os_prio=0 tid=0x0000556797874800 nid=0x107 in Object.wait() [0x00007fa83cb8d000] zeppelin_1 | java.lang.Thread.State: TIMED_WAITING (on object monitor) zeppelin_1 | at java.lang.Object.wait(Native Method) zeppelin_1 | - waiting on <0x00000000eba208c0> (a java.util.LinkedList) zeppelin_1 | at org.apache.zeppelin.interpreter.remote.RemoteInterpreterEventClient.pollEvent(RemoteInterpreterEventClient.java:200) zeppelin_1 | - locked <0x00000000eba208c0> (a java.util.LinkedList) zeppelin_1 | at org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer.getEvent(RemoteInterpreterServer.java:543) zeppelin_1 | at org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService$Processor$getEvent.getResult(RemoteInterpreterService.java:1610) zeppelin_1 | at org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService$Processor$getEvent.getResult(RemoteInterpreterService.java:1595) zeppelin_1 | at org.apache.thrift.ProcessFunction.process(ProcessFunction.java:39) zeppelin_1 | at org.apache.thrift.TBaseProcessor.process(TBaseProcessor.java:39) zeppelin_1 | at org.apache.thrift.server.TThreadPoolServer$WorkerProcess.run(TThreadPoolServer.java:285) zeppelin_1 | at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) zeppelin_1 | at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) zeppelin_1 | at java.lang.Thread.run(Thread.java:745) zeppelin_1 | zeppelin_1 | "pool-1-thread-3" #13 prio=5 os_prio=0 tid=0x0000556797871800 nid=0x106 runnable [0x00007fa83cc8e000] zeppelin_1 | java.lang.Thread.State: RUNNABLE zeppelin_1 | at java.net.SocketInputStream.socketRead0(Native Method) zeppelin_1 | at java.net.SocketInputStream.socketRead(SocketInputStream.java:116) zeppelin_1 | at java.net.SocketInputStream.read(SocketInputStream.java:170) zeppelin_1 | at java.net.SocketInputStream.read(SocketInputStream.java:141) zeppelin_1 | at java.io.BufferedInputStream.fill(BufferedInputStream.java:246) zeppelin_1 | at java.io.BufferedInputStream.read1(BufferedInputStream.java:286) zeppelin_1 | at java.io.BufferedInputStream.read(BufferedInputStream.java:345) zeppelin_1 | - locked <0x00000000ebfb7be0> (a java.io.BufferedInputStream) zeppelin_1 | at org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:127) zeppelin_1 | at org.apache.thrift.transport.TTransport.readAll(TTransport.java:86) zeppelin_1 | at org.apache.thrift.protocol.TBinaryProtocol.readAll(TBinaryProtocol.java:429) zeppelin_1 | at org.apache.thrift.protocol.TBinaryProtocol.readI32(TBinaryProtocol.java:318) zeppelin_1 | at org.apache.thrift.protocol.TBinaryProtocol.readMessageBegin(TBinaryProtocol.java:219) zeppelin_1 | at org.apache.thrift.TBaseProcessor.process(TBaseProcessor.java:27) zeppelin_1 | at org.apache.thrift.server.TThreadPoolServer$WorkerProcess.run(TThreadPoolServer.java:285) zeppelin_1 | at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) zeppelin_1 | at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) zeppelin_1 | at java.lang.Thread.run(Thread.java:745) the local port 6123 is closed (it is open on the jobmanager tho) bash-4.3$ telnet localhost 6123 telnet: can't connect to remote host (127.0.0.1): Connection refused thanks! greetings, Frank On Wed, Aug 24, 2016 at 5:15 PM, Trevor Grant <[hidden email]> wrote:
|
Hello, For reference, below is the dockerfile i used to build the zeppelin image (basically just openjdk 8 with the latest binary release of zeppelin) the "docker-entrypoint.sh" script is just starting zeppelin.sh (oneliner) FROM openjdk:alpine RUN apk add --no-cache bash snappy ARG ZEPPELIN_VERSION=0.6.1 ARG INSTALL_PATH=/opt ENV APP_HOME $INSTALL_PATH/zeppelin ENV PATH $PATH:$APP_HOME/bin RUN set -x && \ mkdir -p $INSTALL_PATH && \ apk --update add --virtual build-dependencies curl && \ curl -s $(curl -s <a href="https://www.apache.org/dyn/closer.cgi\?preferred\=true)/zeppelin/zeppelin-0.6.1/zeppelin-$ZEPPELIN_VERSION-bin-all.tgz">https://www.apache.org/dyn/closer.cgi\?preferred\=true)/zeppelin/zeppelin-0.6.1/zeppelin-$ZEPPELIN_VERSION-bin-all.tgz | \ tar xvz -C $INSTALL_PATH && \ ln -s $INSTALL_PATH/zeppelin-$ZEPPELIN_VERSION-bin-all $APP_HOME && \ addgroup -S zeppelin && adduser -D -S -H -G zeppelin -h $APP_HOME zeppelin && \ chown -R zeppelin:zeppelin $INSTALL_PATH && \ chown -h zeppelin:zeppelin $APP_HOME && \ apk del build-dependencies && \ rm -rf /var/cache/apk/* # Configure container USER zeppelin ADD docker-entrypoint.sh $APP_HOME/bin/ ENTRYPOINT ["docker-entrypoint.sh"] CMD ["sh", "-c"] greetings, Frank On Thu, Aug 25, 2016 at 2:58 PM, Frank Dekervel <[hidden email]> wrote:
|
Hello, Sorry for the spam, but i got it working after copying all scala libraries from another interpreter to the interpreter/flink directory. so i think the error is the scala libraries are missing from the binary release in the zeppelin/interpreters/flink/ directory. For now i'm adding the copy commands to the dockerfile, but I'm sure this is not the proper way to fix it, but i don't know maven enough to understand why the scala libs are missing for the flink interpreter but not for the ignite interpreter. I'm also unable to figure out why a local interpreter worked fine given the missing libraries ... greetings, Frank On Thu, Aug 25, 2016 at 3:08 PM, Frank Dekervel <[hidden email]> wrote:
|
I'm glad you were able to work it out! Your setup is somewhat unique, and as Zeppelin is the result of multiple drive-by commits, interesting and unexpected things happen in the tail cases. Could you please report your problem and solution on the Zeppelin user list? What you've discovered may in fact be a bug or a regression caused by some of the recent Spark 2.0/scala 2.11 mess (I see you installed Zeppelin 0.6.1). Suffice to say, I don't think this is a Flink issue. Finally, out of curiosity- what jars did you copy to the interpreter/flink directory to get this to work? I'd like to check the Zeppelin/flink/pom.xml Happy to be a sounding board if nothing else ;) tg Trevor Grant Data Scientist "Fortunate is he, who is able to know the causes of things." -Virgil On Thu, Aug 25, 2016 at 8:57 AM, Frank Dekervel <[hidden email]> wrote:
|
Hello, i added this to my Dockerfile to end up with a working setup: RUN cp /opt/zeppelin/interpreter/ignite/scala*jar /opt/zeppelin/interpreter/flink/ which would copy: scala-compiler-2.11.7.jar scala-library-2.11.7.jar scala-parser-combinators_2.11-1.0.4.jar scala-reflect-2.11.7.jar scala-xml_2.11-1.0.4.jar actually "working" means "able to run the word count example" (i'm not sure if that really qualifies as working ...). i'll follow up on this on the zeppelin user list. Frank On Thu, Aug 25, 2016 at 6:01 PM, Trevor Grant <[hidden email]> wrote:
|
That is a regression of upgrading Zeppelin to spark 2.0/Scala 2.11. as it broke existing functionality, hopefully whoever did the upgrade will fix... Please report to Zeppelin, thanks and good find! On Aug 26, 2016 8:39 AM, "Frank Dekervel" <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |