Logs printed when running with minicluster but not printed when submitted as a job

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

Logs printed when running with minicluster but not printed when submitted as a job

Manas Kale
Hi,
I have the following pattern:

public static void main(String[] args) {
       // Get the exec environment. This could be a cluster or a mini-cluster used for local development.
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Make the Flink runtime use event time as time metric.
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// Generate a watermark every WATERMARK_PERIODICITY ms.
env.getConfig().setAutoWatermarkInterval(WATERMARK_PERIODICITY);
Config.readProperties();
}
class Config {
private final static Logger LOGGER = LoggerFactory.getLogger(Config.class);
// Populates variables above with values read from config file.
public static void readProperties() throws Exception {
Properties prop = new Properties();

String propFileLocation = System.getenv("FLINK_CONFIG_LOCATION");
if (propFileLocation == null) {
System.err.println("Properties file pointer env variable FLINK_CONFIG_LOCATION missing!");
System.exit(1);
}
FileInputStream is = null;
try {
is = new FileInputStream(new File(propFileLocation));

} catch (Exception e) {
System.err.println("File " + propFileLocation + " not found!");
System.exit(1);
}

prop.load(is);
    LOGGER.info("......"); // prints content read from property file
  }
}

When I run this program as a minicluster, I am able to see the LOGGER.info() being printed in my console.
However, when I submit this job as a JAR to a flink cluster, the Config class's  LOGGER.info() line above is never printed in the taskmanager's logs! I don't understand why this is happening because log  statements from other operators are definitely being printed in the log files on the cluster. What am I doing wrong?

My log4j.properties file is:
log4j.rootLogger=INFO, console, fileAppender

log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n

log4j.appender.fileAppender=org.apache.log4j.RollingFileAppender
log4j.appender.fileAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.fileAppender.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
log4j.appender.fileAppender.File=dataProcessingEngine.log
log4j.appender.fileAppender.policies.type = Policies
log4j.appender.fileAppender.policies.size.type = SizeBasedTriggeringPolicy
log4j.appender.fileAppender.policies.size.size=10MB
log4j.appender.fileAppender.strategy.type = DefaultRolloverStrategy
log4j.appender.fileAppender.strategy.max = 5

Thank you,
Manas Kale




Reply | Threaded
Open this post in threaded view
|

Re: Logs printed when running with minicluster but not printed when submitted as a job

Chesnay Schepler
The main method is executed in the JobManager process and never reaches the TaskExecutors (only the individual functions do).
As such you have to take a peek into the JobManager logs.

On 10/21/2020 11:37 AM, Manas Kale wrote:
Hi,
I have the following pattern:

public static void main(String[] args) {
       // Get the exec environment. This could be a cluster or a mini-cluster used for local development.
      StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
      // Make the Flink runtime use event time as time metric.
      env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
      // Generate a watermark every WATERMARK_PERIODICITY ms.
      env.getConfig().setAutoWatermarkInterval(WATERMARK_PERIODICITY);
Config.readProperties();
}
class Config {
private final static Logger LOGGER = LoggerFactory.getLogger(Config.class);
// Populates variables above with values read from config file.
public static void readProperties() throws Exception {
    Properties prop = new Properties();

    String propFileLocation = System.getenv("FLINK_CONFIG_LOCATION");
    if (propFileLocation == null) {
        System.err.println("Properties file pointer env variable FLINK_CONFIG_LOCATION missing!");
        System.exit(1);
    }
    FileInputStream is = null;
    try {
       is = new FileInputStream(new File(propFileLocation));

    } catch (Exception e) {
        System.err.println("File " + propFileLocation + " not found!");
        System.exit(1);
    }

    prop.load(is);
    LOGGER.info("......"); // prints content read from property file
  }
}

When I run this program as a minicluster, I am able to see the LOGGER.info() being printed in my console.
However, when I submit this job as a JAR to a flink cluster, the Config class's  LOGGER.info() line above is never printed in the taskmanager's logs! I don't understand why this is happening because log  statements from other operators are definitely being printed in the log files on the cluster. What am I doing wrong?

My log4j.properties file is:
log4j.rootLogger=INFO, console, fileAppender

log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n

log4j.appender.fileAppender=org.apache.log4j.RollingFileAppender
log4j.appender.fileAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.fileAppender.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
log4j.appender.fileAppender.File=dataProcessingEngine.log
log4j.appender.fileAppender.policies.type = Policies
log4j.appender.fileAppender.policies.size.type = SizeBasedTriggeringPolicy
log4j.appender.fileAppender.policies.size.size=10MB
log4j.appender.fileAppender.strategy.type = DefaultRolloverStrategy
log4j.appender.fileAppender.strategy.max = 5

Thank you,
Manas Kale





Reply | Threaded
Open this post in threaded view
|

Re: Logs printed when running with minicluster but not printed when submitted as a job

Manas Kale
Hi Chesnay,
I checked the JobManager logs - it's not there either.

On Wed, Oct 21, 2020 at 3:51 PM Chesnay Schepler <[hidden email]> wrote:
The main method is executed in the JobManager process and never reaches the TaskExecutors (only the individual functions do).
As such you have to take a peek into the JobManager logs.

On 10/21/2020 11:37 AM, Manas Kale wrote:
Hi,
I have the following pattern:

public static void main(String[] args) {
       // Get the exec environment. This could be a cluster or a mini-cluster used for local development.
      StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
      // Make the Flink runtime use event time as time metric.
      env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
      // Generate a watermark every WATERMARK_PERIODICITY ms.
      env.getConfig().setAutoWatermarkInterval(WATERMARK_PERIODICITY);
Config.readProperties();
}
class Config {
private final static Logger LOGGER = LoggerFactory.getLogger(Config.class);
// Populates variables above with values read from config file.
public static void readProperties() throws Exception {
    Properties prop = new Properties();

    String propFileLocation = System.getenv("FLINK_CONFIG_LOCATION");
    if (propFileLocation == null) {
        System.err.println("Properties file pointer env variable FLINK_CONFIG_LOCATION missing!");
        System.exit(1);
    }
    FileInputStream is = null;
    try {
       is = new FileInputStream(new File(propFileLocation));

    } catch (Exception e) {
        System.err.println("File " + propFileLocation + " not found!");
        System.exit(1);
    }

    prop.load(is);
    LOGGER.info("......"); // prints content read from property file
  }
}

When I run this program as a minicluster, I am able to see the LOGGER.info() being printed in my console.
However, when I submit this job as a JAR to a flink cluster, the Config class's  LOGGER.info() line above is never printed in the taskmanager's logs! I don't understand why this is happening because log  statements from other operators are definitely being printed in the log files on the cluster. What am I doing wrong?

My log4j.properties file is:
log4j.rootLogger=INFO, console, fileAppender

log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n

log4j.appender.fileAppender=org.apache.log4j.RollingFileAppender
log4j.appender.fileAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.fileAppender.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
log4j.appender.fileAppender.File=dataProcessingEngine.log
log4j.appender.fileAppender.policies.type = Policies
log4j.appender.fileAppender.policies.size.type = SizeBasedTriggeringPolicy
log4j.appender.fileAppender.policies.size.size=10MB
log4j.appender.fileAppender.strategy.type = DefaultRolloverStrategy
log4j.appender.fileAppender.strategy.max = 5

Thank you,
Manas Kale





Reply | Threaded
Open this post in threaded view
|

Re: Logs printed when running with minicluster but not printed when submitted as a job

Chesnay Schepler
Hold on, let us clarify how you submit the job.

Do you upload the jar via the WebUI, or with the CLI (e.g., ./bin/flink run ...)?

If it is the former, then it show up in the JM logs.
If it is the latter, then it should appear in the logs of the client (i.e., log/flink-???-client-???.log).

On 10/21/2020 2:17 PM, Manas Kale wrote:
Hi Chesnay,
I checked the JobManager logs - it's not there either.

On Wed, Oct 21, 2020 at 3:51 PM Chesnay Schepler <[hidden email]> wrote:
The main method is executed in the JobManager process and never reaches the TaskExecutors (only the individual functions do).
As such you have to take a peek into the JobManager logs.

On 10/21/2020 11:37 AM, Manas Kale wrote:
Hi,
I have the following pattern:

public static void main(String[] args) {
       // Get the exec environment. This could be a cluster or a mini-cluster used for local development.
      StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
      // Make the Flink runtime use event time as time metric.
      env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
      // Generate a watermark every WATERMARK_PERIODICITY ms.
      env.getConfig().setAutoWatermarkInterval(WATERMARK_PERIODICITY);
Config.readProperties();
}
class Config {
private final static Logger LOGGER = LoggerFactory.getLogger(Config.class);
// Populates variables above with values read from config file.
public static void readProperties() throws Exception {
    Properties prop = new Properties();

    String propFileLocation = System.getenv("FLINK_CONFIG_LOCATION");
    if (propFileLocation == null) {
        System.err.println("Properties file pointer env variable FLINK_CONFIG_LOCATION missing!");
        System.exit(1);
    }
    FileInputStream is = null;
    try {
       is = new FileInputStream(new File(propFileLocation));

    } catch (Exception e) {
        System.err.println("File " + propFileLocation + " not found!");
        System.exit(1);
    }

    prop.load(is);
    LOGGER.info("......"); // prints content read from property file
  }
}

When I run this program as a minicluster, I am able to see the LOGGER.info() being printed in my console.
However, when I submit this job as a JAR to a flink cluster, the Config class's  LOGGER.info() line above is never printed in the taskmanager's logs! I don't understand why this is happening because log  statements from other operators are definitely being printed in the log files on the cluster. What am I doing wrong?

My log4j.properties file is:
log4j.rootLogger=INFO, console, fileAppender

log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n

log4j.appender.fileAppender=org.apache.log4j.RollingFileAppender
log4j.appender.fileAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.fileAppender.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
log4j.appender.fileAppender.File=dataProcessingEngine.log
log4j.appender.fileAppender.policies.type = Policies
log4j.appender.fileAppender.policies.size.type = SizeBasedTriggeringPolicy
log4j.appender.fileAppender.policies.size.size=10MB
log4j.appender.fileAppender.strategy.type = DefaultRolloverStrategy
log4j.appender.fileAppender.strategy.max = 5

Thank you,
Manas Kale






Reply | Threaded
Open this post in threaded view
|

Re: Logs printed when running with minicluster but not printed when submitted as a job

Manas Kale
I see, thanks for that clarification - I incorrectly assumed both methods of submission produce logs in the same place. I will have an update tomorrow!

On Wed, Oct 21, 2020 at 6:12 PM Chesnay Schepler <[hidden email]> wrote:
Hold on, let us clarify how you submit the job.

Do you upload the jar via the WebUI, or with the CLI (e.g., ./bin/flink run ...)?

If it is the former, then it show up in the JM logs.
If it is the latter, then it should appear in the logs of the client (i.e., log/flink-???-client-???.log).

On 10/21/2020 2:17 PM, Manas Kale wrote:
Hi Chesnay,
I checked the JobManager logs - it's not there either.

On Wed, Oct 21, 2020 at 3:51 PM Chesnay Schepler <[hidden email]> wrote:
The main method is executed in the JobManager process and never reaches the TaskExecutors (only the individual functions do).
As such you have to take a peek into the JobManager logs.

On 10/21/2020 11:37 AM, Manas Kale wrote:
Hi,
I have the following pattern:

public static void main(String[] args) {
       // Get the exec environment. This could be a cluster or a mini-cluster used for local development.
      StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
      // Make the Flink runtime use event time as time metric.
      env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
      // Generate a watermark every WATERMARK_PERIODICITY ms.
      env.getConfig().setAutoWatermarkInterval(WATERMARK_PERIODICITY);
Config.readProperties();
}
class Config {
private final static Logger LOGGER = LoggerFactory.getLogger(Config.class);
// Populates variables above with values read from config file.
public static void readProperties() throws Exception {
    Properties prop = new Properties();

    String propFileLocation = System.getenv("FLINK_CONFIG_LOCATION");
    if (propFileLocation == null) {
        System.err.println("Properties file pointer env variable FLINK_CONFIG_LOCATION missing!");
        System.exit(1);
    }
    FileInputStream is = null;
    try {
       is = new FileInputStream(new File(propFileLocation));

    } catch (Exception e) {
        System.err.println("File " + propFileLocation + " not found!");
        System.exit(1);
    }

    prop.load(is);
    LOGGER.info("......"); // prints content read from property file
  }
}

When I run this program as a minicluster, I am able to see the LOGGER.info() being printed in my console.
However, when I submit this job as a JAR to a flink cluster, the Config class's  LOGGER.info() line above is never printed in the taskmanager's logs! I don't understand why this is happening because log  statements from other operators are definitely being printed in the log files on the cluster. What am I doing wrong?

My log4j.properties file is:
log4j.rootLogger=INFO, console, fileAppender

log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n

log4j.appender.fileAppender=org.apache.log4j.RollingFileAppender
log4j.appender.fileAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.fileAppender.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
log4j.appender.fileAppender.File=dataProcessingEngine.log
log4j.appender.fileAppender.policies.type = Policies
log4j.appender.fileAppender.policies.size.type = SizeBasedTriggeringPolicy
log4j.appender.fileAppender.policies.size.size=10MB
log4j.appender.fileAppender.strategy.type = DefaultRolloverStrategy
log4j.appender.fileAppender.strategy.max = 5

Thank you,
Manas Kale






Reply | Threaded
Open this post in threaded view
|

Re: Logs printed when running with minicluster but not printed when submitted as a job

Manas Kale
Thank you Chesnay. I found the logs being printed in the standalone session when I used CLI to submit the job. However this only deepens the mystery of the configuration file on the other thread - I see from the logs that the configuration values are being read correctly, but when these values are actually used, they are null! 

On Wed, Oct 21, 2020 at 7:58 PM Manas Kale <[hidden email]> wrote:
I see, thanks for that clarification - I incorrectly assumed both methods of submission produce logs in the same place. I will have an update tomorrow!

On Wed, Oct 21, 2020 at 6:12 PM Chesnay Schepler <[hidden email]> wrote:
Hold on, let us clarify how you submit the job.

Do you upload the jar via the WebUI, or with the CLI (e.g., ./bin/flink run ...)?

If it is the former, then it show up in the JM logs.
If it is the latter, then it should appear in the logs of the client (i.e., log/flink-???-client-???.log).

On 10/21/2020 2:17 PM, Manas Kale wrote:
Hi Chesnay,
I checked the JobManager logs - it's not there either.

On Wed, Oct 21, 2020 at 3:51 PM Chesnay Schepler <[hidden email]> wrote:
The main method is executed in the JobManager process and never reaches the TaskExecutors (only the individual functions do).
As such you have to take a peek into the JobManager logs.

On 10/21/2020 11:37 AM, Manas Kale wrote:
Hi,
I have the following pattern:

public static void main(String[] args) {
       // Get the exec environment. This could be a cluster or a mini-cluster used for local development.
      StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
      // Make the Flink runtime use event time as time metric.
      env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
      // Generate a watermark every WATERMARK_PERIODICITY ms.
      env.getConfig().setAutoWatermarkInterval(WATERMARK_PERIODICITY);
Config.readProperties();
}
class Config {
private final static Logger LOGGER = LoggerFactory.getLogger(Config.class);
// Populates variables above with values read from config file.
public static void readProperties() throws Exception {
    Properties prop = new Properties();

    String propFileLocation = System.getenv("FLINK_CONFIG_LOCATION");
    if (propFileLocation == null) {
        System.err.println("Properties file pointer env variable FLINK_CONFIG_LOCATION missing!");
        System.exit(1);
    }
    FileInputStream is = null;
    try {
       is = new FileInputStream(new File(propFileLocation));

    } catch (Exception e) {
        System.err.println("File " + propFileLocation + " not found!");
        System.exit(1);
    }

    prop.load(is);
    LOGGER.info("......"); // prints content read from property file
  }
}

When I run this program as a minicluster, I am able to see the LOGGER.info() being printed in my console.
However, when I submit this job as a JAR to a flink cluster, the Config class's  LOGGER.info() line above is never printed in the taskmanager's logs! I don't understand why this is happening because log  statements from other operators are definitely being printed in the log files on the cluster. What am I doing wrong?

My log4j.properties file is:
log4j.rootLogger=INFO, console, fileAppender

log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n

log4j.appender.fileAppender=org.apache.log4j.RollingFileAppender
log4j.appender.fileAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.fileAppender.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
log4j.appender.fileAppender.File=dataProcessingEngine.log
log4j.appender.fileAppender.policies.type = Policies
log4j.appender.fileAppender.policies.size.type = SizeBasedTriggeringPolicy
log4j.appender.fileAppender.policies.size.size=10MB
log4j.appender.fileAppender.strategy.type = DefaultRolloverStrategy
log4j.appender.fileAppender.strategy.max = 5

Thank you,
Manas Kale