logging question

classic Classic list List threaded Threaded
8 messages Options
jp
Reply | Threaded
Open this post in threaded view
|

logging question

jp
Hello,
I am learning Flink and using the docker image along with the AMIDST
library for this.
Below is a sample task from AMIDST which provides INFO output up until I
reach updateModel(). I pasted the short method as well and wonder what
prevents the Logger from

         //Set-up Flink session
         env = ExecutionEnvironment.getExecutionEnvironment();
         env.getConfig().disableSysoutLogging();
         Logger LOG = LoggerFactory.getLogger(">>>>> ParallelMLExample");

         //generate a random dataset
         DataFlink<DataInstance> dataFlink = new
DataSetGenerator().generate(env, 1234, 1000, 5, 0);

         //Creates a DAG with the NaiveBayes structure for the random
dataset
         DAG dag =
DAGGenerator.getNaiveBayesStructure(dataFlink.getAttributes(),
"DiscreteVar4");
         LOG.info(dag.toString());

         //Create the Learner object
         ParameterLearningAlgorithm learningAlgorithmFlink = new
ParallelMaximumLikelihood();

         //Learning parameters
         learningAlgorithmFlink.setBatchSize(10);
         learningAlgorithmFlink.setDAG(dag);

         //Initialize the learning process
         learningAlgorithmFlink.initLearning();

         //Learn from the flink data
         LOG.info("BEFORE UPDATEMODEL");
         learningAlgorithmFlink.updateModel(dataFlink);
         LOG.info("AFTER UPDATEMODEL");

         //Print the learnt Bayes Net
         BayesianNetwork bn =
learningAlgorithmFlink.getLearntBayesianNetwork();
         LOG.info(bn.toString());


Below is the updateModel method.

     public double updateModel(DataFlink<DataInstance> dataUpdate) {
         try {
             Configuration config = new Configuration();
             config.setString(BN_NAME, this.dag.getName());
             config.setBytes(EFBN_NAME,
Serialization.serializeObject(efBayesianNetwork));

             DataSet<DataInstance> dataset = dataUpdate.getDataSet();
             this.sumSS = dataset.map(new SufficientSatisticsMAP())
                     .withParameters(config)
                     .reduce(new SufficientSatisticsReduce())
                     .collect().get(0);

             //Add the prior
             
sumSS.sum(efBayesianNetwork.createInitSufficientStatistics());

             JobExecutionResult result =
dataset.getExecutionEnvironment().getLastJobExecutionResult();

             numInstances =
result.getAccumulatorResult(ParallelMaximumLikelihood.COUNTER_NAME+"_"+this.dag.getName());
             numInstances++;//Initial counts

         }catch(Exception ex){
             throw new UndeclaredThrowableException(ex);
         }

         return this.getLogMarginalProbability();
     }


Not sure why LOG.info past that method are not output to the console.
TIA
JP
Reply | Threaded
Open this post in threaded view
|

Re: logging question

Nico Kruber
Just a guess, but probably our logging initialisation changes the global
log level (see conf/log4j.properties). DataStream.collect() executes the
program along with creating a local Flink "cluster" (if you are testing
locally / in an IDE) and initializing logging, among other things.

Please comment the first line out and uncomment the following one to
read like this:
==========
# This affects logging for both user code and Flink
#log4j.rootLogger=INFO, file

# Uncomment this if you want to _only_ change Flink's logging
log4j.logger.org.apache.flink=INFO
==========


Nico

On 13/01/18 13:52, [hidden email] wrote:

> Hello,
> I am learning Flink and using the docker image along with the AMIDST
> library for this.
> Below is a sample task from AMIDST which provides INFO output up until I
> reach updateModel(). I pasted the short method as well and wonder what
> prevents the Logger from
>
>         //Set-up Flink session
>         env = ExecutionEnvironment.getExecutionEnvironment();
>         env.getConfig().disableSysoutLogging();
>         Logger LOG = LoggerFactory.getLogger(">>>>> ParallelMLExample");
>
>         //generate a random dataset
>         DataFlink<DataInstance> dataFlink = new
> DataSetGenerator().generate(env, 1234, 1000, 5, 0);
>
>         //Creates a DAG with the NaiveBayes structure for the random
> dataset
>         DAG dag =
> DAGGenerator.getNaiveBayesStructure(dataFlink.getAttributes(),
> "DiscreteVar4");
>         LOG.info(dag.toString());
>
>         //Create the Learner object
>         ParameterLearningAlgorithm learningAlgorithmFlink = new
> ParallelMaximumLikelihood();
>
>         //Learning parameters
>         learningAlgorithmFlink.setBatchSize(10);
>         learningAlgorithmFlink.setDAG(dag);
>
>         //Initialize the learning process
>         learningAlgorithmFlink.initLearning();
>
>         //Learn from the flink data
>         LOG.info("BEFORE UPDATEMODEL");
>         learningAlgorithmFlink.updateModel(dataFlink);
>         LOG.info("AFTER UPDATEMODEL");
>
>         //Print the learnt Bayes Net
>         BayesianNetwork bn =
> learningAlgorithmFlink.getLearntBayesianNetwork();
>         LOG.info(bn.toString());
>
>
> Below is the updateModel method.
>
>     public double updateModel(DataFlink<DataInstance> dataUpdate) {
>         try {
>             Configuration config = new Configuration();
>             config.setString(BN_NAME, this.dag.getName());
>             config.setBytes(EFBN_NAME,
> Serialization.serializeObject(efBayesianNetwork));
>
>             DataSet<DataInstance> dataset = dataUpdate.getDataSet();
>             this.sumSS = dataset.map(new SufficientSatisticsMAP())
>                     .withParameters(config)
>                     .reduce(new SufficientSatisticsReduce())
>                     .collect().get(0);
>
>             //Add the prior
>             sumSS.sum(efBayesianNetwork.createInitSufficientStatistics());
>
>             JobExecutionResult result =
> dataset.getExecutionEnvironment().getLastJobExecutionResult();
>
>             numInstances =
> result.getAccumulatorResult(ParallelMaximumLikelihood.COUNTER_NAME+"_"+this.dag.getName());
>
>             numInstances++;//Initial counts
>
>         }catch(Exception ex){
>             throw new UndeclaredThrowableException(ex);
>         }
>
>         return this.getLogMarginalProbability();
>     }
>
>
> Not sure why LOG.info past that method are not output to the console.
> TIA
> JP


signature.asc (201 bytes) Download Attachment
jp
Reply | Threaded
Open this post in threaded view
|

Re: logging question

jp
Hello Nico,

took me a while to respond. Thank you for the comments. I had explored a
little more the docker-image and startup scripts. That allowed me to
better understand the log4j properties file used but I am still facing
this odd behavior.

I created a stackoverflow entry for this

https://stackoverflow.com/questions/48853497/docker-flink-not-showing-all-log-statements

Below, I am just showing the properties file below which I hadn't put on SO.

# This affects logging for both user code and Flink
log4j.rootLogger=INFO, file, console
 
# Uncomment this if you want to _only_ change Flink's logging
log4j.logger.org.apache.flink=OFF
 
# The following lines keep the log level of common libraries/connectors on
# log level INFO. The root logger does not override this. You have to
manually
# change the log levels here.
log4j.logger.akka=INFO
log4j.logger.org.apache.kafka=INFO
log4j.logger.org.apache.hadoop=INFO
log4j.logger.org.apache.zookeeper=INFO
 
# Log all infos in the given file
log4j.appender.file=org.apache.log4j.FileAppender
log4j.appender.file.file=${log.file}
log4j.appender.file.append=false
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS}
%-5p %-60c %x - %m%n
 
# Log all infos to the console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.Target=System.out
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yyyy-MM-dd
HH:mm:ss,SSS} %-5p %-60c %x - %m%n
 
# Suppress the irrelevant (wrong) warnings
log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, file
log4j.logger.org.apache.hadoop.util.NativeCodeLoader=OFF

JP


On 01/16/2018 10:50 AM, Nico Kruber wrote:

> Just a guess, but probably our logging initialisation changes the global
> log level (see conf/log4j.properties). DataStream.collect() executes the
> program along with creating a local Flink "cluster" (if you are testing
> locally / in an IDE) and initializing logging, among other things.
>
> Please comment the first line out and uncomment the following one to
> read like this:
> ==========
> # This affects logging for both user code and Flink
> #log4j.rootLogger=INFO, file
>
> # Uncomment this if you want to _only_ change Flink's logging
> log4j.logger.org.apache.flink=INFO
> ==========
>
>
> Nico
>
> On 13/01/18 13:52, [hidden email] wrote:
>> Hello,
>> I am learning Flink and using the docker image along with the AMIDST
>> library for this.
>> Below is a sample task from AMIDST which provides INFO output up until I
>> reach updateModel(). I pasted the short method as well and wonder what
>> prevents the Logger from
>>
>>         //Set-up Flink session
>>         env = ExecutionEnvironment.getExecutionEnvironment();
>>         env.getConfig().disableSysoutLogging();
>>         Logger LOG = LoggerFactory.getLogger(">>>>> ParallelMLExample");
>>
>>         //generate a random dataset
>>         DataFlink<DataInstance> dataFlink = new
>> DataSetGenerator().generate(env, 1234, 1000, 5, 0);
>>
>>         //Creates a DAG with the NaiveBayes structure for the random
>> dataset
>>         DAG dag =
>> DAGGenerator.getNaiveBayesStructure(dataFlink.getAttributes(),
>> "DiscreteVar4");
>>         LOG.info(dag.toString());
>>
>>         //Create the Learner object
>>         ParameterLearningAlgorithm learningAlgorithmFlink = new
>> ParallelMaximumLikelihood();
>>
>>         //Learning parameters
>>         learningAlgorithmFlink.setBatchSize(10);
>>         learningAlgorithmFlink.setDAG(dag);
>>
>>         //Initialize the learning process
>>         learningAlgorithmFlink.initLearning();
>>
>>         //Learn from the flink data
>>         LOG.info("BEFORE UPDATEMODEL");
>>         learningAlgorithmFlink.updateModel(dataFlink);
>>         LOG.info("AFTER UPDATEMODEL");
>>
>>         //Print the learnt Bayes Net
>>         BayesianNetwork bn =
>> learningAlgorithmFlink.getLearntBayesianNetwork();
>>         LOG.info(bn.toString());
>>
>>
>> Below is the updateModel method.
>>
>>     public double updateModel(DataFlink<DataInstance> dataUpdate) {
>>         try {
>>             Configuration config = new Configuration();
>>             config.setString(BN_NAME, this.dag.getName());
>>             config.setBytes(EFBN_NAME,
>> Serialization.serializeObject(efBayesianNetwork));
>>
>>             DataSet<DataInstance> dataset = dataUpdate.getDataSet();
>>             this.sumSS = dataset.map(new SufficientSatisticsMAP())
>>                     .withParameters(config)
>>                     .reduce(new SufficientSatisticsReduce())
>>                     .collect().get(0);
>>
>>             //Add the prior
>>             sumSS.sum(efBayesianNetwork.createInitSufficientStatistics());
>>
>>             JobExecutionResult result =
>> dataset.getExecutionEnvironment().getLastJobExecutionResult();
>>
>>             numInstances =
>> result.getAccumulatorResult(ParallelMaximumLikelihood.COUNTER_NAME+"_"+this.dag.getName());
>>
>>             numInstances++;//Initial counts
>>
>>         }catch(Exception ex){
>>             throw new UndeclaredThrowableException(ex);
>>         }
>>
>>         return this.getLogMarginalProbability();
>>     }
>>
>>
>> Not sure why LOG.info past that method are not output to the console.
>> TIA
>> JP


signature.asc (484 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: logging question

Nico Kruber
I'm a bit curious on how you hand your log4j into the docker image for
consumption. On SO you are referring to bin/flink-console.sh but
executing Flink in docker is a bit different.
Maybe I'm wrong, but looking at the sources of the docker image [1], it
will not forward any additional parameters to the docker container via
additions to the command starting the docker image.


Nico

[1]
https://github.com/docker-flink/docker-flink/tree/master/1.4/hadoop28-scala_2.11-alpine

On 27/02/18 18:25, JP de Vooght wrote:

> Hello Nico,
>
> took me a while to respond. Thank you for the comments. I had explored a
> little more the docker-image and startup scripts. That allowed me to
> better understand the log4j properties file used but I am still facing
> this odd behavior.
>
> I created a stackoverflow entry for this
>
> https://stackoverflow.com/questions/48853497/docker-flink-not-showing-all-log-statements
>
> Below, I am just showing the properties file below which I hadn't put on SO.
>
> # This affects logging for both user code and Flink
> log4j.rootLogger=INFO, file, console
>  
> # Uncomment this if you want to _only_ change Flink's logging
> log4j.logger.org.apache.flink=OFF
>  
> # The following lines keep the log level of common libraries/connectors on
> # log level INFO. The root logger does not override this. You have to
> manually
> # change the log levels here.
> log4j.logger.akka=INFO
> log4j.logger.org.apache.kafka=INFO
> log4j.logger.org.apache.hadoop=INFO
> log4j.logger.org.apache.zookeeper=INFO
>  
> # Log all infos in the given file
> log4j.appender.file=org.apache.log4j.FileAppender
> log4j.appender.file.file=${log.file}
> log4j.appender.file.append=false
> log4j.appender.file.layout=org.apache.log4j.PatternLayout
> log4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS}
> %-5p %-60c %x - %m%n
>  
> # Log all infos to the console
> log4j.appender.console=org.apache.log4j.ConsoleAppender
> log4j.appender.console.Target=System.out
> log4j.appender.console.layout=org.apache.log4j.PatternLayout
> log4j.appender.console.layout.ConversionPattern=%d{yyyy-MM-dd
> HH:mm:ss,SSS} %-5p %-60c %x - %m%n
>  
> # Suppress the irrelevant (wrong) warnings
> log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, file
> log4j.logger.org.apache.hadoop.util.NativeCodeLoader=OFF
>
> JP
>
>
> On 01/16/2018 10:50 AM, Nico Kruber wrote:
>> Just a guess, but probably our logging initialisation changes the global
>> log level (see conf/log4j.properties). DataStream.collect() executes the
>> program along with creating a local Flink "cluster" (if you are testing
>> locally / in an IDE) and initializing logging, among other things.
>>
>> Please comment the first line out and uncomment the following one to
>> read like this:
>> ==========
>> # This affects logging for both user code and Flink
>> #log4j.rootLogger=INFO, file
>>
>> # Uncomment this if you want to _only_ change Flink's logging
>> log4j.logger.org.apache.flink=INFO
>> ==========
>>
>>
>> Nico
>>
>> On 13/01/18 13:52, [hidden email] wrote:
>>> Hello,
>>> I am learning Flink and using the docker image along with the AMIDST
>>> library for this.
>>> Below is a sample task from AMIDST which provides INFO output up until I
>>> reach updateModel(). I pasted the short method as well and wonder what
>>> prevents the Logger from
>>>
>>>         //Set-up Flink session
>>>         env = ExecutionEnvironment.getExecutionEnvironment();
>>>         env.getConfig().disableSysoutLogging();
>>>         Logger LOG = LoggerFactory.getLogger(">>>>> ParallelMLExample");
>>>
>>>         //generate a random dataset
>>>         DataFlink<DataInstance> dataFlink = new
>>> DataSetGenerator().generate(env, 1234, 1000, 5, 0);
>>>
>>>         //Creates a DAG with the NaiveBayes structure for the random
>>> dataset
>>>         DAG dag =
>>> DAGGenerator.getNaiveBayesStructure(dataFlink.getAttributes(),
>>> "DiscreteVar4");
>>>         LOG.info(dag.toString());
>>>
>>>         //Create the Learner object
>>>         ParameterLearningAlgorithm learningAlgorithmFlink = new
>>> ParallelMaximumLikelihood();
>>>
>>>         //Learning parameters
>>>         learningAlgorithmFlink.setBatchSize(10);
>>>         learningAlgorithmFlink.setDAG(dag);
>>>
>>>         //Initialize the learning process
>>>         learningAlgorithmFlink.initLearning();
>>>
>>>         //Learn from the flink data
>>>         LOG.info("BEFORE UPDATEMODEL");
>>>         learningAlgorithmFlink.updateModel(dataFlink);
>>>         LOG.info("AFTER UPDATEMODEL");
>>>
>>>         //Print the learnt Bayes Net
>>>         BayesianNetwork bn =
>>> learningAlgorithmFlink.getLearntBayesianNetwork();
>>>         LOG.info(bn.toString());
>>>
>>>
>>> Below is the updateModel method.
>>>
>>>     public double updateModel(DataFlink<DataInstance> dataUpdate) {
>>>         try {
>>>             Configuration config = new Configuration();
>>>             config.setString(BN_NAME, this.dag.getName());
>>>             config.setBytes(EFBN_NAME,
>>> Serialization.serializeObject(efBayesianNetwork));
>>>
>>>             DataSet<DataInstance> dataset = dataUpdate.getDataSet();
>>>             this.sumSS = dataset.map(new SufficientSatisticsMAP())
>>>                     .withParameters(config)
>>>                     .reduce(new SufficientSatisticsReduce())
>>>                     .collect().get(0);
>>>
>>>             //Add the prior
>>>             sumSS.sum(efBayesianNetwork.createInitSufficientStatistics());
>>>
>>>             JobExecutionResult result =
>>> dataset.getExecutionEnvironment().getLastJobExecutionResult();
>>>
>>>             numInstances =
>>> result.getAccumulatorResult(ParallelMaximumLikelihood.COUNTER_NAME+"_"+this.dag.getName());
>>>
>>>             numInstances++;//Initial counts
>>>
>>>         }catch(Exception ex){
>>>             throw new UndeclaredThrowableException(ex);
>>>         }
>>>
>>>         return this.getLogMarginalProbability();
>>>     }
>>>
>>>
>>> Not sure why LOG.info past that method are not output to the console.
>>> TIA
>>> JP
>


signature.asc (201 bytes) Download Attachment
jp
Reply | Threaded
Open this post in threaded view
|

Re: logging question

jp
in the docker-compose.yaml I have a volume entry which maps my
log4j.properties with /opt/flink/conf/log4j-console.properties

Not pretty but it works after I determined how it was being launched.
See below


version: "2.1"
 
services:
  jobmanager:
    image: flink
    expose:
      - "6123"
    ports:
      - "8081:8081"
    command: jobmanager
    volumes:
      -
/c/Users/XYZ/playground/flink/shared/log4j.properties:/opt/flink/conf/log4j-console.properties

    environment:
      - JOB_MANAGER_RPC_ADDRESS=jobmanager
      - FLINK_ENV_JAVA_OPTS=-Dlog.file=/opt/flink/log/jobmanager.log
 
  taskmanager:
    image: flink
    expose:
      - "6121"
      - "6122"
    depends_on:
      - jobmanager
    command: taskmanager
    links:
      - jobmanager:jobmanager
    volumes:
      -
/c/Users/XYZ/playground/flink/shared/log4j.properties:/opt/flink/conf/log4j-console.properties

    environment:
      - JOB_MANAGER_RPC_ADDRESS=jobmanager
      - FLINK_ENV_JAVA_OPTS=-Dlog.file=/opt/flink/log/taskmanager.log


On 02/28/2018 04:55 PM, Nico Kruber wrote:

> I'm a bit curious on how you hand your log4j into the docker image for
> consumption. On SO you are referring to bin/flink-console.sh but
> executing Flink in docker is a bit different.
> Maybe I'm wrong, but looking at the sources of the docker image [1], it
> will not forward any additional parameters to the docker container via
> additions to the command starting the docker image.
>
>
> Nico
>
> [1]
> https://github.com/docker-flink/docker-flink/tree/master/1.4/hadoop28-scala_2.11-alpine
>
> On 27/02/18 18:25, JP de Vooght wrote:
>> Hello Nico,
>>
>> took me a while to respond. Thank you for the comments. I had explored a
>> little more the docker-image and startup scripts. That allowed me to
>> better understand the log4j properties file used but I am still facing
>> this odd behavior.
>>
>> I created a stackoverflow entry for this
>>
>> https://stackoverflow.com/questions/48853497/docker-flink-not-showing-all-log-statements
>>
>> Below, I am just showing the properties file below which I hadn't put on SO.
>>
>> # This affects logging for both user code and Flink
>> log4j.rootLogger=INFO, file, console
>>  
>> # Uncomment this if you want to _only_ change Flink's logging
>> log4j.logger.org.apache.flink=OFF
>>  
>> # The following lines keep the log level of common libraries/connectors on
>> # log level INFO. The root logger does not override this. You have to
>> manually
>> # change the log levels here.
>> log4j.logger.akka=INFO
>> log4j.logger.org.apache.kafka=INFO
>> log4j.logger.org.apache.hadoop=INFO
>> log4j.logger.org.apache.zookeeper=INFO
>>  
>> # Log all infos in the given file
>> log4j.appender.file=org.apache.log4j.FileAppender
>> log4j.appender.file.file=${log.file}
>> log4j.appender.file.append=false
>> log4j.appender.file.layout=org.apache.log4j.PatternLayout
>> log4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS}
>> %-5p %-60c %x - %m%n
>>  
>> # Log all infos to the console
>> log4j.appender.console=org.apache.log4j.ConsoleAppender
>> log4j.appender.console.Target=System.out
>> log4j.appender.console.layout=org.apache.log4j.PatternLayout
>> log4j.appender.console.layout.ConversionPattern=%d{yyyy-MM-dd
>> HH:mm:ss,SSS} %-5p %-60c %x - %m%n
>>  
>> # Suppress the irrelevant (wrong) warnings
>> log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, file
>> log4j.logger.org.apache.hadoop.util.NativeCodeLoader=OFF
>>
>> JP
>>
>>
>> On 01/16/2018 10:50 AM, Nico Kruber wrote:
>>> Just a guess, but probably our logging initialisation changes the global
>>> log level (see conf/log4j.properties). DataStream.collect() executes the
>>> program along with creating a local Flink "cluster" (if you are testing
>>> locally / in an IDE) and initializing logging, among other things.
>>>
>>> Please comment the first line out and uncomment the following one to
>>> read like this:
>>> ==========
>>> # This affects logging for both user code and Flink
>>> #log4j.rootLogger=INFO, file
>>>
>>> # Uncomment this if you want to _only_ change Flink's logging
>>> log4j.logger.org.apache.flink=INFO
>>> ==========
>>>
>>>
>>> Nico
>>>
>>> On 13/01/18 13:52, [hidden email] wrote:
>>>> Hello,
>>>> I am learning Flink and using the docker image along with the AMIDST
>>>> library for this.
>>>> Below is a sample task from AMIDST which provides INFO output up until I
>>>> reach updateModel(). I pasted the short method as well and wonder what
>>>> prevents the Logger from
>>>>
>>>>         //Set-up Flink session
>>>>         env = ExecutionEnvironment.getExecutionEnvironment();
>>>>         env.getConfig().disableSysoutLogging();
>>>>         Logger LOG = LoggerFactory.getLogger(">>>>> ParallelMLExample");
>>>>
>>>>         //generate a random dataset
>>>>         DataFlink<DataInstance> dataFlink = new
>>>> DataSetGenerator().generate(env, 1234, 1000, 5, 0);
>>>>
>>>>         //Creates a DAG with the NaiveBayes structure for the random
>>>> dataset
>>>>         DAG dag =
>>>> DAGGenerator.getNaiveBayesStructure(dataFlink.getAttributes(),
>>>> "DiscreteVar4");
>>>>         LOG.info(dag.toString());
>>>>
>>>>         //Create the Learner object
>>>>         ParameterLearningAlgorithm learningAlgorithmFlink = new
>>>> ParallelMaximumLikelihood();
>>>>
>>>>         //Learning parameters
>>>>         learningAlgorithmFlink.setBatchSize(10);
>>>>         learningAlgorithmFlink.setDAG(dag);
>>>>
>>>>         //Initialize the learning process
>>>>         learningAlgorithmFlink.initLearning();
>>>>
>>>>         //Learn from the flink data
>>>>         LOG.info("BEFORE UPDATEMODEL");
>>>>         learningAlgorithmFlink.updateModel(dataFlink);
>>>>         LOG.info("AFTER UPDATEMODEL");
>>>>
>>>>         //Print the learnt Bayes Net
>>>>         BayesianNetwork bn =
>>>> learningAlgorithmFlink.getLearntBayesianNetwork();
>>>>         LOG.info(bn.toString());
>>>>
>>>>
>>>> Below is the updateModel method.
>>>>
>>>>     public double updateModel(DataFlink<DataInstance> dataUpdate) {
>>>>         try {
>>>>             Configuration config = new Configuration();
>>>>             config.setString(BN_NAME, this.dag.getName());
>>>>             config.setBytes(EFBN_NAME,
>>>> Serialization.serializeObject(efBayesianNetwork));
>>>>
>>>>             DataSet<DataInstance> dataset = dataUpdate.getDataSet();
>>>>             this.sumSS = dataset.map(new SufficientSatisticsMAP())
>>>>                     .withParameters(config)
>>>>                     .reduce(new SufficientSatisticsReduce())
>>>>                     .collect().get(0);
>>>>
>>>>             //Add the prior
>>>>             sumSS.sum(efBayesianNetwork.createInitSufficientStatistics());
>>>>
>>>>             JobExecutionResult result =
>>>> dataset.getExecutionEnvironment().getLastJobExecutionResult();
>>>>
>>>>             numInstances =
>>>> result.getAccumulatorResult(ParallelMaximumLikelihood.COUNTER_NAME+"_"+this.dag.getName());
>>>>
>>>>             numInstances++;//Initial counts
>>>>
>>>>         }catch(Exception ex){
>>>>             throw new UndeclaredThrowableException(ex);
>>>>         }
>>>>
>>>>         return this.getLogMarginalProbability();
>>>>     }
>>>>
>>>>
>>>> Not sure why LOG.info past that method are not output to the console.
>>>> TIA
>>>> JP

jp
Reply | Threaded
Open this post in threaded view
|

Re: logging question

jp
In reply to this post by Nico Kruber

Nico, all,

I am still stuck with this. Upgraded the docker image to 1.4.2 and the AMIDST library to 0.7.0

Just noticed this issue which signals logging issues outside transforms: https://issues.apache.org/jira/browse/FLINK-7990

Could this be related? Although I don't see the relation to logback.

Below is the library code invoked after "BEFORE updateModel"

try {
        Configuration config = new Configuration();
        config.setString(BN_NAME, this.dag.getName());
        config.setBytes(EFBN_NAME, Serialization.serializeObject(efBayesianNetwork));

        DataSet<DataInstance> dataset = dataUpdate.getDataSet();
        this.sumSS = dataset.map(new SufficientSatisticsMAP())
                .withParameters(config)
                .reduce(new SufficientSatisticsReduce())
                .collect().get(0);

        //Add the prior
        sumSS.sum(efBayesianNetwork.createInitSufficientStatistics());

        JobExecutionResult result = dataset.getExecutionEnvironment().getLastJobExecutionResult();

        numInstances = result.getAccumulatorResult(ParallelMaximumLikelihood.COUNTER_NAME+"_"+this.dag.getName());
        numInstances++;//Initial counts

    }catch(Exception ex){
        throw new UndeclaredThrowableException(ex);
    }
JP

On 01/16/2018 10:50 AM, Nico Kruber wrote:
Just a guess, but probably our logging initialisation changes the global
log level (see conf/log4j.properties). DataStream.collect() executes the
program along with creating a local Flink "cluster" (if you are testing
locally / in an IDE) and initializing logging, among other things.

Please comment the first line out and uncomment the following one to
read like this:
==========
# This affects logging for both user code and Flink
#log4j.rootLogger=INFO, file

# Uncomment this if you want to _only_ change Flink's logging
log4j.logger.org.apache.flink=INFO
==========


Nico

On 13/01/18 13:52, [hidden email] wrote:
Hello,
I am learning Flink and using the docker image along with the AMIDST
library for this.
Below is a sample task from AMIDST which provides INFO output up until I
reach updateModel(). I pasted the short method as well and wonder what
prevents the Logger from

        //Set-up Flink session
        env = ExecutionEnvironment.getExecutionEnvironment();
        env.getConfig().disableSysoutLogging();
        Logger LOG = LoggerFactory.getLogger(">>>>> ParallelMLExample");

        //generate a random dataset
        DataFlink<DataInstance> dataFlink = new
DataSetGenerator().generate(env, 1234, 1000, 5, 0);

        //Creates a DAG with the NaiveBayes structure for the random
dataset
        DAG dag =
DAGGenerator.getNaiveBayesStructure(dataFlink.getAttributes(),
"DiscreteVar4");
        LOG.info(dag.toString());

        //Create the Learner object
        ParameterLearningAlgorithm learningAlgorithmFlink = new
ParallelMaximumLikelihood();

        //Learning parameters
        learningAlgorithmFlink.setBatchSize(10);
        learningAlgorithmFlink.setDAG(dag);

        //Initialize the learning process
        learningAlgorithmFlink.initLearning();

        //Learn from the flink data
        LOG.info("BEFORE UPDATEMODEL");
        learningAlgorithmFlink.updateModel(dataFlink);
        LOG.info("AFTER UPDATEMODEL");

        //Print the learnt Bayes Net
        BayesianNetwork bn =
learningAlgorithmFlink.getLearntBayesianNetwork();
        LOG.info(bn.toString());


Below is the updateModel method.

    public double updateModel(DataFlink<DataInstance> dataUpdate) {
        try {
            Configuration config = new Configuration();
            config.setString(BN_NAME, this.dag.getName());
            config.setBytes(EFBN_NAME,
Serialization.serializeObject(efBayesianNetwork));

            DataSet<DataInstance> dataset = dataUpdate.getDataSet();
            this.sumSS = dataset.map(new SufficientSatisticsMAP())
                    .withParameters(config)
                    .reduce(new SufficientSatisticsReduce())
                    .collect().get(0);

            //Add the prior
            sumSS.sum(efBayesianNetwork.createInitSufficientStatistics());

            JobExecutionResult result =
dataset.getExecutionEnvironment().getLastJobExecutionResult();

            numInstances =
result.getAccumulatorResult(ParallelMaximumLikelihood.COUNTER_NAME+"_"+this.dag.getName());

            numInstances++;//Initial counts

        }catch(Exception ex){
            throw new UndeclaredThrowableException(ex);
        }

        return this.getLogMarginalProbability();
    }


Not sure why LOG.info past that method are not output to the console.
TIA
JP

    

jp
Reply | Threaded
Open this post in threaded view
|

Re: logging question

jp
Just to recap

I use Flink 1.4.2 with Docker compose which launches a jobmanager and a taskmanager.
My hope is to learn another library which can be used with Flink, so logging is important to me.
I start the cluster and deploy the following task (I dropped all calls to that library so I can focus on plain Flink and docker)

public class ParallelMLExample {
    private static final Logger LOG = LoggerFactory.getLogger(ParallelMLExample.class);

    public static void main(String[] args) throws Exception {
        final ExecutionEnvironment env;

        //Set-up Flink session
        env = ExecutionEnvironment.getExecutionEnvironment();
        env.getConfig().disableSysoutLogging();

        DataSet<Integer> amounts = env.fromElements(1, 29, 40, 50);

        LOG.info("########## BEFORE UPDATEMODEL ##########");
        List<Integer> collect = amounts.filter(a -> a > 30).reduce((integer, t1) -> integer + t1).collect();
        LOG.info("########## AFTER UPDATEMODEL ##########");

        LOG.info(collect.get(0).toString());
    }
}

Log output of jobmanager does not show anything after "BEFORE UPDATE MODEL"

$ docker-compose up
Starting flink_jobmanager_1
Starting flink_taskmanager_1
Attaching to flink_jobmanager_1, flink_taskmanager_1
jobmanager_1   | Starting Job Manager
jobmanager_1   | config file:
jobmanager_1   | jobmanager.rpc.address: jobmanager
jobmanager_1   | jobmanager.rpc.port: 6123
jobmanager_1   | jobmanager.heap.mb: 1024
jobmanager_1   | taskmanager.heap.mb: 1024
jobmanager_1   | taskmanager.numberOfTaskSlots: 1
jobmanager_1   | taskmanager.memory.preallocate: false
jobmanager_1   | parallelism.default: 1
jobmanager_1   | web.port: 8081
jobmanager_1   | blob.server.port: 6124
jobmanager_1   | query.server.port: 6125
jobmanager_1   | blob.server.port: 6124
jobmanager_1   | query.server.port: 6125
jobmanager_1   | blob.server.port: 6124
jobmanager_1   | query.server.port: 6125
taskmanager_1  | Starting Task Manager
taskmanager_1  | config file:
taskmanager_1  | jobmanager.rpc.address: jobmanager
taskmanager_1  | jobmanager.rpc.port: 6123
taskmanager_1  | jobmanager.heap.mb: 1024
taskmanager_1  | taskmanager.heap.mb: 1024
taskmanager_1  | taskmanager.numberOfTaskSlots: 4
taskmanager_1  | taskmanager.memory.preallocate: false
taskmanager_1  | parallelism.default: 1
taskmanager_1  | web.port: 8081
taskmanager_1  | blob.server.port: 6124
taskmanager_1  | query.server.port: 6125
taskmanager_1  | blob.server.port: 6124
taskmanager_1  | query.server.port: 6125
taskmanager_1  | blob.server.port: 6124
taskmanager_1  | query.server.port: 6125
jobmanager_1   | Starting jobmanager as a console application on host e207d6ad4a1a.
taskmanager_1  | Starting taskmanager as a console application on host 1d724ce8ae5e.
jobmanager_1   | Slf4jLogger started
taskmanager_1  | Slf4jLogger started
taskmanager_1  | Could not load Queryable State Client Proxy. Probable reason: flink-queryable-state-runtime is not in the classpath. To enable Queryable State, please move the flink-queryable-state-runtime jar from the opt to the lib folder.
taskmanager_1  | Could not load Queryable State Server. Probable reason: flink-queryable-state-runtime is not in the classpath. To enable Queryable State, please move the flink-queryable-state-runtime jar from the opt to the lib folder.
jobmanager_1   | ########## BEFORE UPDATEMODEL ##########
taskmanager_1  | The operator name DataSource (at main(ParallelMLExample.java:30) (org.apache.flink.api.java.io.CollectionInputFormat)) exceeded the 80 characters length limit and was truncated.



On 04/07/2018 02:46 PM, JP de Vooght wrote:

Nico, all,

I am still stuck with this. Upgraded the docker image to 1.4.2 and the AMIDST library to 0.7.0

Just noticed this issue which signals logging issues outside transforms: https://issues.apache.org/jira/browse/FLINK-7990

Could this be related? Although I don't see the relation to logback.

Below is the library code invoked after "BEFORE updateModel"

try {
        Configuration config = new Configuration();
        config.setString(BN_NAME, this.dag.getName());
        config.setBytes(EFBN_NAME, Serialization.serializeObject(efBayesianNetwork));

        DataSet<DataInstance> dataset = dataUpdate.getDataSet();
        this.sumSS = dataset.map(new SufficientSatisticsMAP())
                .withParameters(config)
                .reduce(new SufficientSatisticsReduce())
                .collect().get(0);

        //Add the prior
        sumSS.sum(efBayesianNetwork.createInitSufficientStatistics());

        JobExecutionResult result = dataset.getExecutionEnvironment().getLastJobExecutionResult();

        numInstances = result.getAccumulatorResult(ParallelMaximumLikelihood.COUNTER_NAME+"_"+this.dag.getName());
        numInstances++;//Initial counts

    }catch(Exception ex){
        throw new UndeclaredThrowableException(ex);
    }
JP

On 01/16/2018 10:50 AM, Nico Kruber wrote:
Just a guess, but probably our logging initialisation changes the global
log level (see conf/log4j.properties). DataStream.collect() executes the
program along with creating a local Flink "cluster" (if you are testing
locally / in an IDE) and initializing logging, among other things.

Please comment the first line out and uncomment the following one to
read like this:
==========
# This affects logging for both user code and Flink
#log4j.rootLogger=INFO, file

# Uncomment this if you want to _only_ change Flink's logging
log4j.logger.org.apache.flink=INFO
==========


Nico

On 13/01/18 13:52, [hidden email] wrote:
Hello,
I am learning Flink and using the docker image along with the AMIDST
library for this.
Below is a sample task from AMIDST which provides INFO output up until I
reach updateModel(). I pasted the short method as well and wonder what
prevents the Logger from

        //Set-up Flink session
        env = ExecutionEnvironment.getExecutionEnvironment();
        env.getConfig().disableSysoutLogging();
        Logger LOG = LoggerFactory.getLogger(">>>>> ParallelMLExample");

        //generate a random dataset
        DataFlink<DataInstance> dataFlink = new
DataSetGenerator().generate(env, 1234, 1000, 5, 0);

        //Creates a DAG with the NaiveBayes structure for the random
dataset
        DAG dag =
DAGGenerator.getNaiveBayesStructure(dataFlink.getAttributes(),
"DiscreteVar4");
        LOG.info(dag.toString());

        //Create the Learner object
        ParameterLearningAlgorithm learningAlgorithmFlink = new
ParallelMaximumLikelihood();

        //Learning parameters
        learningAlgorithmFlink.setBatchSize(10);
        learningAlgorithmFlink.setDAG(dag);

        //Initialize the learning process
        learningAlgorithmFlink.initLearning();

        //Learn from the flink data
        LOG.info("BEFORE UPDATEMODEL");
        learningAlgorithmFlink.updateModel(dataFlink);
        LOG.info("AFTER UPDATEMODEL");

        //Print the learnt Bayes Net
        BayesianNetwork bn =
learningAlgorithmFlink.getLearntBayesianNetwork();
        LOG.info(bn.toString());


Below is the updateModel method.

    public double updateModel(DataFlink<DataInstance> dataUpdate) {
        try {
            Configuration config = new Configuration();
            config.setString(BN_NAME, this.dag.getName());
            config.setBytes(EFBN_NAME,
Serialization.serializeObject(efBayesianNetwork));

            DataSet<DataInstance> dataset = dataUpdate.getDataSet();
            this.sumSS = dataset.map(new SufficientSatisticsMAP())
                    .withParameters(config)
                    .reduce(new SufficientSatisticsReduce())
                    .collect().get(0);

            //Add the prior
            sumSS.sum(efBayesianNetwork.createInitSufficientStatistics());

            JobExecutionResult result =
dataset.getExecutionEnvironment().getLastJobExecutionResult();

            numInstances =
result.getAccumulatorResult(ParallelMaximumLikelihood.COUNTER_NAME+"_"+this.dag.getName());

            numInstances++;//Initial counts

        }catch(Exception ex){
            throw new UndeclaredThrowableException(ex);
        }

        return this.getLogMarginalProbability();
    }


Not sure why LOG.info past that method are not output to the console.
TIA
JP


jp
Reply | Threaded
Open this post in threaded view
|

Re: logging question

jp

I finally sorted my problem out. Using the CLI instead of the Web UI for debugging with simple System.out.println() statements. I noticed that a local installation launched with start-cluster.sh worked A-OK.

In order to reproduce a simple learning environment with docker images, I ended up creating a my own image without that entrypoint.sh and ensuring the CLI would run from an image on the same bridge network... That image also includes some opt libraries. That's about it really.

docker run --rm -t --net "amidst_default" -v /home/jdevoo/playground/amidst/target:/opt/flink/target flink flink run -m jobmanager:6123 target/ParallelMLExample-0.0.1.jar

docker-compose.yaml is below...
version: "2.0"
services:
  jobmanager:
    image: flink
    volumes:
      - /home/jdevoo/playground/amidst/conf:/opt/flink/conf
    ports:
      - "8081:8081"
    command: jobmanager.sh start-foreground cluster
    environment:
      - JOB_MANAGER_RPC_ADDRESS=jobmanager

  taskmanager:
    image: flink
    volumes:
      - /home/jdevoo/playground/amidst/conf:/opt/flink/conf
    depends_on:
      - jobmanager
    command: taskmanager.sh start-foreground
    depends_on:
      - jobmanager
    environment:
      - JOB_MANAGER_RPC_ADDRESS=jobmanager

Going to enjoy some AMIDST solutions to ML problems now :-)

JP


On 04/07/2018 06:31 PM, JP de Vooght wrote:
Just to recap

I use Flink 1.4.2 with Docker compose which launches a jobmanager and a taskmanager.
My hope is to learn another library which can be used with Flink, so logging is important to me.
I start the cluster and deploy the following task (I dropped all calls to that library so I can focus on plain Flink and docker)

public class ParallelMLExample {
    private static final Logger LOG = LoggerFactory.getLogger(ParallelMLExample.class);

    public static void main(String[] args) throws Exception {
        final ExecutionEnvironment env;

        //Set-up Flink session
        env = ExecutionEnvironment.getExecutionEnvironment();
        env.getConfig().disableSysoutLogging();

        DataSet<Integer> amounts = env.fromElements(1, 29, 40, 50);

        LOG.info("########## BEFORE UPDATEMODEL ##########");
        List<Integer> collect = amounts.filter(a -> a > 30).reduce((integer, t1) -> integer + t1).collect();
        LOG.info("########## AFTER UPDATEMODEL ##########");

        LOG.info(collect.get(0).toString());
    }
}

Log output of jobmanager does not show anything after "BEFORE UPDATE MODEL"

$ docker-compose up
Starting flink_jobmanager_1
Starting flink_taskmanager_1
Attaching to flink_jobmanager_1, flink_taskmanager_1
jobmanager_1   | Starting Job Manager
jobmanager_1   | config file:
jobmanager_1   | jobmanager.rpc.address: jobmanager
jobmanager_1   | jobmanager.rpc.port: 6123
jobmanager_1   | jobmanager.heap.mb: 1024
jobmanager_1   | taskmanager.heap.mb: 1024
jobmanager_1   | taskmanager.numberOfTaskSlots: 1
jobmanager_1   | taskmanager.memory.preallocate: false
jobmanager_1   | parallelism.default: 1
jobmanager_1   | web.port: 8081
jobmanager_1   | blob.server.port: 6124
jobmanager_1   | query.server.port: 6125
jobmanager_1   | blob.server.port: 6124
jobmanager_1   | query.server.port: 6125
jobmanager_1   | blob.server.port: 6124
jobmanager_1   | query.server.port: 6125
taskmanager_1  | Starting Task Manager
taskmanager_1  | config file:
taskmanager_1  | jobmanager.rpc.address: jobmanager
taskmanager_1  | jobmanager.rpc.port: 6123
taskmanager_1  | jobmanager.heap.mb: 1024
taskmanager_1  | taskmanager.heap.mb: 1024
taskmanager_1  | taskmanager.numberOfTaskSlots: 4
taskmanager_1  | taskmanager.memory.preallocate: false
taskmanager_1  | parallelism.default: 1
taskmanager_1  | web.port: 8081
taskmanager_1  | blob.server.port: 6124
taskmanager_1  | query.server.port: 6125
taskmanager_1  | blob.server.port: 6124
taskmanager_1  | query.server.port: 6125
taskmanager_1  | blob.server.port: 6124
taskmanager_1  | query.server.port: 6125
jobmanager_1   | Starting jobmanager as a console application on host e207d6ad4a1a.
taskmanager_1  | Starting taskmanager as a console application on host 1d724ce8ae5e.
jobmanager_1   | Slf4jLogger started
taskmanager_1  | Slf4jLogger started
taskmanager_1  | Could not load Queryable State Client Proxy. Probable reason: flink-queryable-state-runtime is not in the classpath. To enable Queryable State, please move the flink-queryable-state-runtime jar from the opt to the lib folder.
taskmanager_1  | Could not load Queryable State Server. Probable reason: flink-queryable-state-runtime is not in the classpath. To enable Queryable State, please move the flink-queryable-state-runtime jar from the opt to the lib folder.
jobmanager_1   | ########## BEFORE UPDATEMODEL ##########
taskmanager_1  | The operator name DataSource (at main(ParallelMLExample.java:30) (org.apache.flink.api.java.io.CollectionInputFormat)) exceeded the 80 characters length limit and was truncated.



On 04/07/2018 02:46 PM, JP de Vooght wrote:

Nico, all,

I am still stuck with this. Upgraded the docker image to 1.4.2 and the AMIDST library to 0.7.0

Just noticed this issue which signals logging issues outside transforms: https://issues.apache.org/jira/browse/FLINK-7990

Could this be related? Although I don't see the relation to logback.

Below is the library code invoked after "BEFORE updateModel"

try {
        Configuration config = new Configuration();
        config.setString(BN_NAME, this.dag.getName());
        config.setBytes(EFBN_NAME, Serialization.serializeObject(efBayesianNetwork));

        DataSet<DataInstance> dataset = dataUpdate.getDataSet();
        this.sumSS = dataset.map(new SufficientSatisticsMAP())
                .withParameters(config)
                .reduce(new SufficientSatisticsReduce())
                .collect().get(0);

        //Add the prior
        sumSS.sum(efBayesianNetwork.createInitSufficientStatistics());

        JobExecutionResult result = dataset.getExecutionEnvironment().getLastJobExecutionResult();

        numInstances = result.getAccumulatorResult(ParallelMaximumLikelihood.COUNTER_NAME+"_"+this.dag.getName());
        numInstances++;//Initial counts

    }catch(Exception ex){
        throw new UndeclaredThrowableException(ex);
    }
JP

On 01/16/2018 10:50 AM, Nico Kruber wrote:
Just a guess, but probably our logging initialisation changes the global
log level (see conf/log4j.properties). DataStream.collect() executes the
program along with creating a local Flink "cluster" (if you are testing
locally / in an IDE) and initializing logging, among other things.

Please comment the first line out and uncomment the following one to
read like this:
==========
# This affects logging for both user code and Flink
#log4j.rootLogger=INFO, file

# Uncomment this if you want to _only_ change Flink's logging
log4j.logger.org.apache.flink=INFO
==========


Nico

On 13/01/18 13:52, [hidden email] wrote:
Hello,
I am learning Flink and using the docker image along with the AMIDST
library for this.
Below is a sample task from AMIDST which provides INFO output up until I
reach updateModel(). I pasted the short method as well and wonder what
prevents the Logger from

        //Set-up Flink session
        env = ExecutionEnvironment.getExecutionEnvironment();
        env.getConfig().disableSysoutLogging();
        Logger LOG = LoggerFactory.getLogger(">>>>> ParallelMLExample");

        //generate a random dataset
        DataFlink<DataInstance> dataFlink = new
DataSetGenerator().generate(env, 1234, 1000, 5, 0);

        //Creates a DAG with the NaiveBayes structure for the random
dataset
        DAG dag =
DAGGenerator.getNaiveBayesStructure(dataFlink.getAttributes(),
"DiscreteVar4");
        LOG.info(dag.toString());

        //Create the Learner object
        ParameterLearningAlgorithm learningAlgorithmFlink = new
ParallelMaximumLikelihood();

        //Learning parameters
        learningAlgorithmFlink.setBatchSize(10);
        learningAlgorithmFlink.setDAG(dag);

        //Initialize the learning process
        learningAlgorithmFlink.initLearning();

        //Learn from the flink data
        LOG.info("BEFORE UPDATEMODEL");
        learningAlgorithmFlink.updateModel(dataFlink);
        LOG.info("AFTER UPDATEMODEL");

        //Print the learnt Bayes Net
        BayesianNetwork bn =
learningAlgorithmFlink.getLearntBayesianNetwork();
        LOG.info(bn.toString());


Below is the updateModel method.

    public double updateModel(DataFlink<DataInstance> dataUpdate) {
        try {
            Configuration config = new Configuration();
            config.setString(BN_NAME, this.dag.getName());
            config.setBytes(EFBN_NAME,
Serialization.serializeObject(efBayesianNetwork));

            DataSet<DataInstance> dataset = dataUpdate.getDataSet();
            this.sumSS = dataset.map(new SufficientSatisticsMAP())
                    .withParameters(config)
                    .reduce(new SufficientSatisticsReduce())
                    .collect().get(0);

            //Add the prior
            sumSS.sum(efBayesianNetwork.createInitSufficientStatistics());

            JobExecutionResult result =
dataset.getExecutionEnvironment().getLastJobExecutionResult();

            numInstances =
result.getAccumulatorResult(ParallelMaximumLikelihood.COUNTER_NAME+"_"+this.dag.getName());

            numInstances++;//Initial counts

        }catch(Exception ex){
            throw new UndeclaredThrowableException(ex);
        }

        return this.getLogMarginalProbability();
    }


Not sure why LOG.info past that method are not output to the console.
TIA
JP