Hi Ana,
good to hear that you found the logging statements. You can check in Yarn’s web interface whether there are still occupied containers. Alternatively you can go to the different machines and run jps
which lists you the running Java processes. If you see an ApplicationMaster
or YarnTaskManagerRunner
process, then there is still a container running with Flink on this machine. I hope this helps you.
Cheers,
Till
Hi Till,
Thanks for that! I can see the "Logger in LineSplitter.flatMap” output if I retrieve the task manager logs manually (under /var/log/hadoop-yarn/containers/application_X/…). However that solution is not ideal when for instance I am using 32 machines for my mapReduce operations.
I would like to know why Yarn’s log aggregation is not working. Can you tell me how to check if there are some Yarn containers running after the Flink job has finished? I have tried:hadoop job -listbut I cannot see any jobs there, although I am not sure that it means that there are not containers running...
Thanks,Ana
On 08 Jan 2016, at 16:24, Till Rohrmann <[hidden email]> wrote:
You’re right that the log statements of the
LineSplitter
are in the logs of the cluster nodes, because that’s where the LineSplitter code is executed. In contrast, you create aTestClass
on the client when you submit the program. Therefore, you see the logging statement “Logger in TestClass” on the command line or in the cli log file.So I would assume that the problem is Yarn’s log aggregation. Either your configuration is not correct or there are still some Yarn containers running after the Flink job has finished. Yarn will only show you the logs after all containers are terminated. Maybe you could check that. Alternatively, you can try to retrieve the taskmanager logs manually by going to the machine where your yarn container was executed. Then under hadoop/logs/userlogs you should find somewhere the logs.
Cheers,
Till
On Fri, Jan 8, 2016 at 3:50 PM, Ana M. Martinez <[hidden email]> wrote:
Thanks for the tip Robert! It was a good idea to rule out other possible causes, but I am afraid that is not the problem. If we stick to the WordCountExample (for simplicity), the Exception is thrown if placed into the flatMap function.
I am going to try to re-write my problem and all the settings below:
When I try to aggregate all logs:$yarn logs -applicationId application_1452250761414_0005
the following message is retrieved:16/01/08 13:32:37 INFO client.RMProxy: Connecting to ResourceManager at ip-172-31-33-221.us-west-2.compute.internal/172.31.33.221:8032/var/log/hadoop-yarn/apps/hadoop/logs/application_1452250761414_0005does not exist.Log aggregation has not completed or is not enabled.
(Tried the same command a few minutes later and got the same message, so might it be that log aggregation is not properly enabled??)
I am going to carefully enumerate all the steps I have followed (and settings) to see if someone can identify why the Logger messages from CORE nodes (in an Amazon cluster) are not shown.
1) Enable yarn.log-aggregation-enable property to true in /etc/alternatives/hadoop-conf/yarn-site.xml.
2) Include log messages in my WordCountExample as follows:import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
public class WordCountExample {
static Logger logger = LoggerFactory.getLogger(WordCountExample.class);
public static void main(String[] args) throws Exception {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
logger.info("Entering application.");
DataSet<String> text = env.fromElements(
"Who's there?",
"I think I hear them. Stand, ho! Who's there?");
List<Integer> elements = new ArrayList<Integer>();
elements.add(0);
DataSet<TestClass> set = env.fromElements(new TestClass(elements));
DataSet<Tuple2<String, Integer>> wordCounts = text
.flatMap(new LineSplitter())
.withBroadcastSet(set, "set")
.groupBy(0)
.sum(1);
wordCounts.writeAsText(“output.txt", FileSystem.WriteMode.OVERWRITE);
}
public static class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
static Logger loggerLineSplitter = LoggerFactory.getLogger(LineSplitter.class);
@Override
public void flatMap(String line, Collector<Tuple2<String, Integer>> out) {
loggerLineSplitter.info("Logger in LineSplitter.flatMap");for (String word : line.split(" ")) {
out.collect(new Tuple2<String, Integer>(word, 1));
//throw new RuntimeException("LineSplitter class called");
}}
}
public static class TestClass implements Serializable {
private static final long serialVersionUID = -2932037991574118651L;
static Logger loggerTestClass = LoggerFactory.getLogger("TestClass.class");
List<Integer> integerList;
public TestClass(List<Integer> integerList){
this.integerList=integerList;
loggerTestClass.info("Logger in TestClass");
}
}
}3) Start a yarn-cluster and execute my program with the following command:
$./bin/flink run -m yarn-cluster -yn 1 -ys 4 -yjm 1024 -ytm 1024 -c eu.amidst.flinklink.examples.WordCountExample ../flinklink.jar
4) The output in the log folder is as follows:
13:31:04,945 INFO org.apache.flink.client.CliFrontend - --------------------------------------------------------------------------------13:31:04,947 INFO org.apache.flink.client.CliFrontend - Starting Command Line Client (Version: 0.10.0, Rev:ab2cca4, Date:10.11.2015 @ 13:50:14 UTC)13:31:04,947 INFO org.apache.flink.client.CliFrontend - Current user: hadoop13:31:04,947 INFO org.apache.flink.client.CliFrontend - JVM: OpenJDK 64-Bit Server VM - Oracle Corporation - 1.8/25.65-b0113:31:04,947 INFO org.apache.flink.client.CliFrontend - Maximum heap size: 3344 MiBytes13:31:04,947 INFO org.apache.flink.client.CliFrontend - JAVA_HOME: /etc/alternatives/jre13:31:04,950 INFO org.apache.flink.client.CliFrontend - Hadoop version: 2.6.013:31:04,950 INFO org.apache.flink.client.CliFrontend - JVM Options:13:31:04,950 INFO org.apache.flink.client.CliFrontend - -Dlog.file=/home/hadoop/flink-0.10.0/log/flink-hadoop-client-ip-172-31-33-221.log13:31:04,950 INFO org.apache.flink.client.CliFrontend - -Dlog4j.configuration=file:/home/hadoop/flink-0.10.0/conf/log4j-cli.properties13:31:04,950 INFO org.apache.flink.client.CliFrontend - -Dlogback.configurationFile=file:/home/hadoop/flink-0.10.0/conf/logback.xml13:31:04,951 INFO org.apache.flink.client.CliFrontend - Program Arguments:13:31:04,951 INFO org.apache.flink.client.CliFrontend - run13:31:04,951 INFO org.apache.flink.client.CliFrontend - -m13:31:04,951 INFO org.apache.flink.client.CliFrontend - yarn-cluster13:31:04,951 INFO org.apache.flink.client.CliFrontend - -yn13:31:04,951 INFO org.apache.flink.client.CliFrontend - 113:31:04,951 INFO org.apache.flink.client.CliFrontend - -ys13:31:04,951 INFO org.apache.flink.client.CliFrontend - 413:31:04,951 INFO org.apache.flink.client.CliFrontend - -yjm13:31:04,951 INFO org.apache.flink.client.CliFrontend - 102413:31:04,951 INFO org.apache.flink.client.CliFrontend - -ytm13:31:04,951 INFO org.apache.flink.client.CliFrontend - 102413:31:04,952 INFO org.apache.flink.client.CliFrontend - -c13:31:04,952 INFO org.apache.flink.client.CliFrontend - eu.amidst.flinklink.examples.WordCountExample13:31:04,952 INFO org.apache.flink.client.CliFrontend - ../flinklink.jar13:31:04,952 INFO org.apache.flink.client.CliFrontend - --------------------------------------------------------------------------------13:31:04,954 INFO org.apache.flink.client.CliFrontend - Using configuration directory /home/hadoop/flink-0.10.0/conf13:31:04,954 INFO org.apache.flink.client.CliFrontend - Trying to load configuration file13:31:05,193 INFO org.apache.flink.client.CliFrontend - Running 'run' command.13:31:05,201 INFO org.apache.flink.client.CliFrontend - Building program from JAR file13:31:05,326 INFO org.apache.flink.client.CliFrontend - YARN cluster mode detected. Switching Log4j output to console13:31:05,385 INFO org.apache.hadoop.yarn.client.RMProxy - Connecting to ResourceManager at ip-172-31-33-221.us-west-2.compute.internal/172.31.33.221:803213:31:05,534 INFO org.apache.flink.client.FlinkYarnSessionCli - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.FlinkYarnClient to locate the jar13:31:05,545 INFO org.apache.flink.yarn.FlinkYarnClient - Using values:13:31:05,547 INFO org.apache.flink.yarn.FlinkYarnClient - TaskManager count = 113:31:05,547 INFO org.apache.flink.yarn.FlinkYarnClient - JobManager memory = 102413:31:05,547 INFO org.apache.flink.yarn.FlinkYarnClient - TaskManager memory = 102413:31:06,112 INFO org.apache.flink.yarn.Utils - Copying from file:/home/hadoop/flink-0.10.0/lib/flink-dist-0.10.0.jar to hdfs://ip-172-31-33-221.us-west-2.compute.internal:8020/user/hadoop/.flink/application_1452250761414_0005/flink-dist-0.10.0.jar13:31:06,843 INFO org.apache.flink.yarn.Utils - Copying from /home/hadoop/flink-0.10.0/conf/flink-conf.yaml to hdfs://ip-172-31-33-221.us-west-2.compute.internal:8020/user/hadoop/.flink/application_1452250761414_0005/flink-conf.yaml13:31:06,857 INFO org.apache.flink.yarn.Utils - Copying from file:/home/hadoop/flink-0.10.0/conf/logback.xml to hdfs://ip-172-31-33-221.us-west-2.compute.internal:8020/user/hadoop/.flink/application_1452250761414_0005/logback.xml13:31:06,869 INFO org.apache.flink.yarn.Utils - Copying from file:/home/hadoop/flink-0.10.0/conf/log4j.properties to hdfs://ip-172-31-33-221.us-west-2.compute.internal:8020/user/hadoop/.flink/application_1452250761414_0005/log4j.properties13:31:06,892 INFO org.apache.flink.yarn.FlinkYarnClient - Submitting application master application_1452250761414_000513:31:06,917 INFO org.apache.hadoop.yarn.client.api.impl.YarnClientImpl - Submitted application application_1452250761414_000513:31:06,917 INFO org.apache.flink.yarn.FlinkYarnClient - Waiting for the cluster to be allocated13:31:06,919 INFO org.apache.flink.yarn.FlinkYarnClient - Deploying cluster, current state ACCEPTED13:31:07,920 INFO org.apache.flink.yarn.FlinkYarnClient - Deploying cluster, current state ACCEPTED13:31:08,922 INFO org.apache.flink.yarn.FlinkYarnClient - Deploying cluster, current state ACCEPTED13:31:09,924 INFO org.apache.flink.yarn.FlinkYarnClient - Deploying cluster, current state ACCEPTED13:31:10,925 INFO org.apache.flink.yarn.FlinkYarnClient - YARN application has been deployed successfully.13:31:10,929 INFO org.apache.flink.yarn.FlinkYarnCluster - Start actor system.13:31:11,412 INFO akka.event.slf4j.Slf4jLogger - Slf4jLogger started13:31:11,472 INFO Remoting - Starting remoting13:31:11,698 INFO Remoting - Remoting started; listening on addresses :[akka.tcp://flink@172.31.33.221:39464]13:31:11,733 INFO org.apache.flink.yarn.FlinkYarnCluster - Start application client.13:31:11,737 INFO org.apache.flink.client.CliFrontend - YARN cluster started13:31:11,737 INFO org.apache.flink.client.CliFrontend - JobManager web interface address http://ip-172-31-33-221.us-west-2.compute.internal:20888/proxy/application_1452250761414_0005/13:31:11,737 INFO org.apache.flink.client.CliFrontend - Waiting until all TaskManagers have connected13:31:11,748 INFO org.apache.flink.yarn.ApplicationClient - Notification about new leader address akka.tcp://flink@172.31.45.98:46965/user/jobmanager with session ID null.13:31:11,752 INFO org.apache.flink.client.CliFrontend - No status updates from the YARN cluster received so far. Waiting ...13:31:11,752 INFO org.apache.flink.yarn.ApplicationClient - Received address of new leader akka.tcp://flink@172.31.45.98:46965/user/jobmanager with session ID null.13:31:11,753 INFO org.apache.flink.yarn.ApplicationClient - Disconnect from JobManager null.13:31:11,757 INFO org.apache.flink.yarn.ApplicationClient - Trying to register at JobManager akka.tcp://flink@172.31.45.98:46965/user/jobmanager.13:31:12,040 INFO org.apache.flink.yarn.ApplicationClient - Successfully registered at the JobManager Actor[akka.tcp://flink@172.31.45.98:46965/user/jobmanager#1549383782]13:31:12,253 INFO org.apache.flink.client.CliFrontend - TaskManager status (0/1)13:31:12,753 INFO org.apache.flink.client.CliFrontend - TaskManager status (0/1)13:31:13,254 INFO org.apache.flink.client.CliFrontend - TaskManager status (0/1)13:31:13,755 INFO org.apache.flink.client.CliFrontend - TaskManager status (0/1)13:31:14,255 INFO org.apache.flink.client.CliFrontend - TaskManager status (0/1)13:31:14,756 INFO org.apache.flink.client.CliFrontend - TaskManager status (0/1)13:31:15,257 INFO org.apache.flink.client.CliFrontend - TaskManager status (0/1)13:31:15,758 INFO org.apache.flink.client.CliFrontend - TaskManager status (0/1)13:31:16,258 INFO org.apache.flink.client.CliFrontend - All TaskManagers are connected13:31:16,264 INFO org.apache.flink.client.program.Client - Starting client actor system13:31:16,265 INFO org.apache.flink.runtime.client.JobClient - Starting JobClient actor system13:31:16,283 INFO akka.event.slf4j.Slf4jLogger - Slf4jLogger started13:31:16,288 INFO Remoting - Starting remoting13:31:16,301 INFO Remoting - Remoting started; listening on addresses :[akka.tcp://flink@127.0.0.1:45919]13:31:16,302 INFO org.apache.flink.runtime.client.JobClient - Started JobClient actor system at 127.0.0.1:4591913:31:16,302 INFO org.apache.flink.client.CliFrontend - Using the parallelism provided by the remote cluster (4). To use another parallelism, set it at the ./bin/flink client.13:31:16,302 INFO org.apache.flink.client.CliFrontend - Starting execution of program13:31:16,303 INFO org.apache.flink.client.program.Client - Starting program in interactive mode13:31:16,313 INFO eu.amidst.flinklink.examples.WordCountExample - Entering application.13:31:16,342 INFO TestClass.class - Logger in TestClass13:31:16,346 INFO org.apache.flink.api.java.typeutils.TypeExtractor - class eu.amidst.flinklink.examples.WordCountExample$TestClass is not a valid POJO type13:31:16,376 INFO org.apache.flink.client.CliFrontend - Program execution finished13:31:16,384 INFO akka.remote.RemoteActorRefProvider$RemotingTerminator - Shutting down remote daemon.13:31:16,386 INFO akka.remote.RemoteActorRefProvider$RemotingTerminator - Remote daemon shut down; proceeding with flushing remote transports.13:31:16,408 INFO akka.remote.RemoteActorRefProvider$RemotingTerminator - Remoting shut down.13:31:16,431 INFO org.apache.flink.client.CliFrontend - Shutting down YARN cluster13:31:16,431 INFO org.apache.flink.yarn.FlinkYarnCluster - Sending shutdown request to the Application Master13:31:16,432 INFO org.apache.flink.yarn.ApplicationClient - Sending StopYarnSession request to ApplicationMaster.13:31:16,568 INFO org.apache.flink.yarn.ApplicationClient - Remote JobManager has been stopped successfully. Stopping local application client13:31:16,570 INFO org.apache.flink.yarn.ApplicationClient - Stopped Application client.13:31:16,570 INFO org.apache.flink.yarn.ApplicationClient - Disconnect from JobManager Actor[akka.tcp://flink@172.31.45.98:46965/user/jobmanager#1549383782].13:31:16,573 INFO akka.remote.RemoteActorRefProvider$RemotingTerminator - Shutting down remote daemon.13:31:16,573 INFO akka.remote.RemoteActorRefProvider$RemotingTerminator - Remote daemon shut down; proceeding with flushing remote transports.13:31:16,584 INFO akka.remote.RemoteActorRefProvider$RemotingTerminator - Remoting shut down.13:31:16,595 INFO org.apache.flink.yarn.FlinkYarnCluster - Deleting files in hdfs://ip-172-31-33-221.us-west-2.compute.internal:8020/user/hadoop/.flink/application_1452250761414_000513:31:16,596 INFO org.apache.flink.yarn.FlinkYarnCluster - Application application_1452250761414_0005 finished with state FINISHED and final state SUCCEEDED at 145225987644513:31:16,747 INFO org.apache.flink.yarn.FlinkYarnCluster - YARN Client is shutting down
You can see the log messages from the WordCountExample and TestClass classes. But I have problems to show the logger message (INFO) in the LineSplitter class. Presumably, because it is executed in the CORE nodes and node in the MASTER node (it all runs well in my local computer).
Any tips?Ana
On 06 Jan 2016, at 15:58, Ana M. Martinez <[hidden email]> wrote:
Hi Till,
I am afraid it does not work in any case.
I am following the steps you indicate on your websites (for yarn configuration and loggers with slf4j):
1) Enable log aggregation in yarn-site:
2) Include Loggers as indicated here (see WordCountExample below):https://ci.apache.org/projects/flink/flink-docs-release-0.7/internal_logging.html
But I cannot get the log messages that run in the map functions. Am I missing something?
Thanks,
Ana
On 04 Jan 2016, at 14:00, Till Rohrmann <[hidden email]> wrote:
I think the YARN application has to be finished in order for the logs to be accessible.
Judging from you commands, you’re starting a long running YARN application running Flink with
./bin/yarn-session.sh -n 1 -tm 2048 -s 4
. This cluster won’t be used though, because you’re executing your job with./bin/flink run -m yarn-cluster
which will start another YARN application which is only alive as long as the Flink job is executed. If you want to run your job on the long running YARN application, then you simply have to omit-m yarn-cluster
.Cheers,
Till
On Mon, Jan 4, 2016 at 12:36 PM, Ana M. Martinez <[hidden email]> wrote:
Hi Till,
Sorry for the delay (Xmas break). I have activated log aggregation on flink-conf.yaml with yarn.log-aggregation-enable: true (as I can’t find a yarn-site.xml).But the command yarn logs -applicationId application_1451903796996_0008 gives me the following output:
INFO client.RMProxy: Connecting to ResourceManager at xxx/var/log/hadoop-yarn/apps/hadoop/logs/application_1451903796996_0008does not exist.Log aggregation has not completed or is not enabled
I’ve tried to restart the Flink JobManager and TaskManagers as follows:./bin/yarn-session.sh -n 1 -tm 2048 -s 4and then with a detached screen, run my application with ./bin/flink run -m yarn-cluster ...
I am not sure if my problem is that I am not setting the log-aggregation-enable property well or I am not restarting the Flink JobManager and TaskManagers as I should… Any idea?
Thanks,Ana
On 18 Dec 2015, at 16:29, Till Rohrmann <[hidden email]> wrote:
In which log file are you exactly looking for the logging statements? And on what machine? You have to look on the machines on which the yarn container were started. Alternatively if you have log aggregation activated, then you can simply retrieve the log files via yarn logs.
Cheers,Till
On Fri, Dec 18, 2015 at 3:49 PM, Ana M. Martinez <[hidden email]> wrote:
Hi Till,
Many thanks for your quick response.
I have modified the WordCountExample to re-reproduce my problem in a simple example.
I run the code below with the following command:./bin/flink run -m yarn-cluster -yn 1 -ys 4 -yjm 1024 -ytm 1024 -c mypackage.WordCountExample ../flinklink.jar
And if I check the log file I see all logger messages except the one in the flatMap function of the inner LineSplitter class, which is actually the one I am most interested in.
Is that an expected behaviour?
Thanks,Ana
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
public class WordCountExample {
static Logger logger = LoggerFactory.getLogger(WordCountExample.class);
public static void main(String[] args) throws Exception {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
logger.info("Entering application.");
DataSet<String> text = env.fromElements(
"Who's there?",
"I think I hear them. Stand, ho! Who's there?");
List<Integer> elements = new ArrayList<Integer>();
elements.add(0);
DataSet<TestClass> set = env.fromElements(new TestClass(elements));
DataSet<Tuple2<String, Integer>> wordCounts = text
.flatMap(new LineSplitter())
.withBroadcastSet(set, "set")
.groupBy(0)
.sum(1);
wordCounts.print();
}
public static class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
static Logger loggerLineSplitter = LoggerFactory.getLogger(LineSplitter.class);
@Override
public void flatMap(String line, Collector<Tuple2<String, Integer>> out) {
loggerLineSplitter.info("Logger in LineSplitter.flatMap");
for (String word : line.split(" ")) {
out.collect(new Tuple2<String, Integer>(word, 1));
}
}
}
public static class TestClass implements Serializable {
private static final long serialVersionUID = -2932037991574118651L;
static Logger loggerTestClass = LoggerFactory.getLogger("WordCountExample.TestClass");
List<Integer> integerList;
public TestClass(List<Integer> integerList){
this.integerList=integerList;
loggerTestClass.info("Logger in TestClass");
}
}
}
On 17 Dec 2015, at 16:08, Till Rohrmann <[hidden email]> wrote:
Hi Ana,
you can simply modify the `log4j.properties` file in the `conf` directory. It should be automatically included in the Yarn application.
Concerning your logging problem, it might be that you have set the logging level too high. Could you share the code with us?
Cheers,Till
On Thu, Dec 17, 2015 at 1:56 PM, Ana M. Martinez <[hidden email]> wrote:
Hi flink community,
I am trying to show log messages using log4j.
It works fine overall except for the messages I want to show in an inner class that implements org.apache.flink.api.common.aggregators.ConvergenceCriterion.
I am very new to this, but it seems that I’m having problems to show the messages included in the isConverged function, as it runs in the task managers?
E.g. the log messages in the outer class (before map-reduce operations) are properly shown.
I am also interested in providing my own log4j.properties file. I am using the ./bin/flink run -m yarn-cluster on Amazon clusters.
Thanks,
Ana
Free forum by Nabble | Edit this page |