WordCount with Flink Docker

classic Classic list List threaded Threaded
6 messages Options
jp
Reply | Threaded
Open this post in threaded view
|

WordCount with Flink Docker

jp
Hello,

When I submit a WordCount job from the examples, without any parameter,
to a standalone cluster running on Docker, I cannot see the
System.out.println() message. Instead I notice the
org.apache.flink.metrics.MetricGroup entry below (last)

jobmanager_1   | Starting Job Manager
jobmanager_1   | config file:
jobmanager_1   | jobmanager.rpc.address: jobmanager
jobmanager_1   | jobmanager.rpc.port: 6123
jobmanager_1   | jobmanager.heap.mb: 1024
jobmanager_1   | taskmanager.heap.mb: 1024
jobmanager_1   | taskmanager.numberOfTaskSlots: 1
jobmanager_1   | taskmanager.memory.preallocate: false
jobmanager_1   | parallelism.default: 1
jobmanager_1   | web.port: 8081
jobmanager_1   | blob.server.port: 6124
jobmanager_1   | query.server.port: 6125
jobmanager_1   | Starting jobmanager as a console application on host
400baa8ff371.
taskmanager_1  | Starting Task Manager
taskmanager_1  | config file:
taskmanager_1  | jobmanager.rpc.address: jobmanager
taskmanager_1  | jobmanager.rpc.port: 6123
taskmanager_1  | jobmanager.heap.mb: 1024
taskmanager_1  | taskmanager.heap.mb: 1024
taskmanager_1  | taskmanager.numberOfTaskSlots: 4
taskmanager_1  | taskmanager.memory.preallocate: false
taskmanager_1  | parallelism.default: 1
taskmanager_1  | web.port: 8081
taskmanager_1  | blob.server.port: 6124
taskmanager_1  | query.server.port: 6125
taskmanager_1  | Starting taskmanager as a console application on host
531ef6c27264.
jobmanager_1   | Slf4jLogger started
taskmanager_1  | Slf4jLogger started
taskmanager_1  | Could not load Queryable State Client Proxy. Probable
reason: flink-queryable-state-runtime is not in the classpath. To enable
Queryable State, please move the flink-queryable-state-runtime jar from
the opt to the lib folder.
taskmanager_1  | Could not load Queryable State Server. Probable reason:
flink-queryable-state-runtime is not in the classpath. To enable
Queryable State, please move the flink-queryable-state-runtime jar from
the opt to the lib folder.
taskmanager_1  | The operator name DataSource (at
getDefaultTextLineDataSet(WordCountData.java:70)
(org.apache.flink.api.java.io.CollectionInputFormat)) exceeded the 80
characters length limit and was truncated.

I am not sure why this happens. I am using the latest Docker image with
1.4.2 and the corresponding JAR from the 1.4.2 release of the examples.

TIA

JP

Reply | Threaded
Open this post in threaded view
|

Re: WordCount with Flink Docker

Ted Yu
bq. To enable Queryable State, please move the flink-queryable-state-runtime jar from the opt to the lib folder.

Have you noticed the above and taken corresponding action ?

Thanks

On Sat, Apr 21, 2018 at 3:15 PM, JP de Vooght <[hidden email]> wrote:
Hello,

When I submit a WordCount job from the examples, without any parameter,
to a standalone cluster running on Docker, I cannot see the
System.out.println() message. Instead I notice the
org.apache.flink.metrics.MetricGroup entry below (last)

jobmanager_1   | Starting Job Manager
jobmanager_1   | config file:
jobmanager_1   | jobmanager.rpc.address: jobmanager
jobmanager_1   | jobmanager.rpc.port: 6123
jobmanager_1   | jobmanager.heap.mb: 1024
jobmanager_1   | taskmanager.heap.mb: 1024
jobmanager_1   | taskmanager.numberOfTaskSlots: 1
jobmanager_1   | taskmanager.memory.preallocate: false
jobmanager_1   | parallelism.default: 1
jobmanager_1   | web.port: 8081
jobmanager_1   | blob.server.port: 6124
jobmanager_1   | query.server.port: 6125
jobmanager_1   | Starting jobmanager as a console application on host
400baa8ff371.
taskmanager_1  | Starting Task Manager
taskmanager_1  | config file:
taskmanager_1  | jobmanager.rpc.address: jobmanager
taskmanager_1  | jobmanager.rpc.port: 6123
taskmanager_1  | jobmanager.heap.mb: 1024
taskmanager_1  | taskmanager.heap.mb: 1024
taskmanager_1  | taskmanager.numberOfTaskSlots: 4
taskmanager_1  | taskmanager.memory.preallocate: false
taskmanager_1  | parallelism.default: 1
taskmanager_1  | web.port: 8081
taskmanager_1  | blob.server.port: 6124
taskmanager_1  | query.server.port: 6125
taskmanager_1  | Starting taskmanager as a console application on host
531ef6c27264.
jobmanager_1   | Slf4jLogger started
taskmanager_1  | Slf4jLogger started
taskmanager_1  | Could not load Queryable State Client Proxy. Probable
reason: flink-queryable-state-runtime is not in the classpath. To enable
Queryable State, please move the flink-queryable-state-runtime jar from
the opt to the lib folder.
taskmanager_1  | Could not load Queryable State Server. Probable reason:
flink-queryable-state-runtime is not in the classpath. To enable
Queryable State, please move the flink-queryable-state-runtime jar from
the opt to the lib folder.
taskmanager_1  | The operator name DataSource (at
getDefaultTextLineDataSet(WordCountData.java:70)
(org.apache.flink.api.java.io.CollectionInputFormat)) exceeded the 80
characters length limit and was truncated.

I am not sure why this happens. I am using the latest Docker image with
1.4.2 and the corresponding JAR from the 1.4.2 release of the examples.

TIA

JP


jp
Reply | Threaded
Open this post in threaded view
|

Re: WordCount with Flink Docker

jp

Thank you Ted!

I still get the message below...

taskmanager_1  | The operator name DataSource (at
getDefaultTextLineDataSet(WordCountData.java:70 (org.apache.flink.api.java.io.CollectionInputFormat)) exceeded the 80 characters length limit and was truncated.

The corresponding line 70 in Wordcount.java is

System.out.println("Executing WordCount example with default input data set.");

I got the docker image from https://github.com/docker-flink/docker-flink and still get this message even with all defaults.


On 04/22/2018 12:39 AM, Ted Yu wrote:
bq. To enable Queryable State, please move the flink-queryable-state-runtime jar from the opt to the lib folder.

Have you noticed the above and taken corresponding action ?

Thanks

On Sat, Apr 21, 2018 at 3:15 PM, JP de Vooght <[hidden email]> wrote:
Hello,

When I submit a WordCount job from the examples, without any parameter,
to a standalone cluster running on Docker, I cannot see the
System.out.println() message. Instead I notice the
org.apache.flink.metrics.MetricGroup entry below (last)

jobmanager_1   | Starting Job Manager
jobmanager_1   | config file:
jobmanager_1   | jobmanager.rpc.address: jobmanager
jobmanager_1   | jobmanager.rpc.port: 6123
jobmanager_1   | jobmanager.heap.mb: 1024
jobmanager_1   | taskmanager.heap.mb: 1024
jobmanager_1   | taskmanager.numberOfTaskSlots: 1
jobmanager_1   | taskmanager.memory.preallocate: false
jobmanager_1   | parallelism.default: 1
jobmanager_1   | web.port: 8081
jobmanager_1   | blob.server.port: 6124
jobmanager_1   | query.server.port: 6125
jobmanager_1   | Starting jobmanager as a console application on host
400baa8ff371.
taskmanager_1  | Starting Task Manager
taskmanager_1  | config file:
taskmanager_1  | jobmanager.rpc.address: jobmanager
taskmanager_1  | jobmanager.rpc.port: 6123
taskmanager_1  | jobmanager.heap.mb: 1024
taskmanager_1  | taskmanager.heap.mb: 1024
taskmanager_1  | taskmanager.numberOfTaskSlots: 4
taskmanager_1  | taskmanager.memory.preallocate: false
taskmanager_1  | parallelism.default: 1
taskmanager_1  | web.port: 8081
taskmanager_1  | blob.server.port: 6124
taskmanager_1  | query.server.port: 6125
taskmanager_1  | Starting taskmanager as a console application on host
531ef6c27264.
jobmanager_1   | Slf4jLogger started
taskmanager_1  | Slf4jLogger started
taskmanager_1  | Could not load Queryable State Client Proxy. Probable
reason: flink-queryable-state-runtime is not in the classpath. To enable
Queryable State, please move the flink-queryable-state-runtime jar from
the opt to the lib folder.
taskmanager_1  | Could not load Queryable State Server. Probable reason:
flink-queryable-state-runtime is not in the classpath. To enable
Queryable State, please move the flink-queryable-state-runtime jar from
the opt to the lib folder.
taskmanager_1  | The operator name DataSource (at
getDefaultTextLineDataSet(WordCountData.java:70)
(org.apache.flink.api.java.io.CollectionInputFormat)) exceeded the 80
characters length limit and was truncated.

I am not sure why this happens. I am using the latest Docker image with
1.4.2 and the corresponding JAR from the 1.4.2 release of the examples.

TIA

JP



jp
Reply | Threaded
Open this post in threaded view
|

Re: WordCount with Flink Docker

jp

Just to update that the messages of type "The operator name DataSource () exceeded the 80 characters length limit and was truncated" occur for other operations than print statements.

I am still new with Flink and am specifically interested in print/logging statements. Using Wordcounter in batch with input and output works well. But cannot see print entries anywhere...

So I took a different approach and submitted the following simpler job using logging instead of printing to console:

public class SimpleExample {
        private static final Logger LOG = LoggerFactory.getLogger(ParallelMLExample.class);
        public static void main(String[] args) throws Exception {
                final ExecutionEnvironment env;
                env = ExecutionEnvironment.getExecutionEnvironment();
                DataSet<Integer> amounts = env.fromElements(1, 29, 40, 50);
                LOG.info("########## BEFORE UPDATEMODEL ##########");
                List<Integer> collect = amounts.filter(a -> a > 30).reduce((integer, t1) -> integer + t1).collect();
                LOG.info("########## AFTER UPDATEMODEL ##########");
                LOG.info(collect.get(0).toString());
        }
}

The logs only show the first statement (BEFORE) - why?! I am failing to understand how to use either print or logging reliably...

jobmanager_1   | Starting Job Manager
jobmanager_1   | config file: 
jobmanager_1   | jobmanager.rpc.address: jobmanager
jobmanager_1   | jobmanager.rpc.port: 6123
jobmanager_1   | jobmanager.heap.mb: 1024
jobmanager_1   | taskmanager.heap.mb: 1024
jobmanager_1   | taskmanager.numberOfTaskSlots: 1
jobmanager_1   | taskmanager.memory.preallocate: false
jobmanager_1   | parallelism.default: 1
jobmanager_1   | web.port: 8081
jobmanager_1   | blob.server.port: 6124
jobmanager_1   | query.server.port: 6125
jobmanager_1   | Starting jobmanager as a console application on host b66ca9ea48e2.
taskmanager_1  | Starting Task Manager
taskmanager_1  | config file: 
taskmanager_1  | jobmanager.rpc.address: jobmanager
taskmanager_1  | jobmanager.rpc.port: 6123
taskmanager_1  | jobmanager.heap.mb: 1024
taskmanager_1  | taskmanager.heap.mb: 1024
taskmanager_1  | taskmanager.numberOfTaskSlots: 4
taskmanager_1  | taskmanager.memory.preallocate: false
taskmanager_1  | parallelism.default: 1
taskmanager_1  | web.port: 8081
taskmanager_1  | blob.server.port: 6124
taskmanager_1  | query.server.port: 6125
taskmanager_1  | Starting taskmanager as a console application on host 5f6223cd3e71.
jobmanager_1   | Slf4jLogger started
taskmanager_1  | Slf4jLogger started
jobmanager_1   | ########## BEFORE UPDATEMODEL ##########
taskmanager_1  | The operator name DataSource (at main(SimpleExample.java:30) (org.apache.flink.api.java.io.CollectionInputFormat)) exceeded the 80 characters length limit and was truncated.

Any advice?


On 04/22/2018 10:33 AM, JP de Vooght wrote:

Thank you Ted!

I still get the message below...

taskmanager_1  | The operator name DataSource (at
getDefaultTextLineDataSet(WordCountData.java:70 (org.apache.flink.api.java.io.CollectionInputFormat)) exceeded the 80 characters length limit and was truncated.

The corresponding line 70 in Wordcount.java is

System.out.println("Executing WordCount example with default input data set.");

I got the docker image from https://github.com/docker-flink/docker-flink and still get this message even with all defaults.


On 04/22/2018 12:39 AM, Ted Yu wrote:
bq. To enable Queryable State, please move the flink-queryable-state-runtime jar from the opt to the lib folder.

Have you noticed the above and taken corresponding action ?

Thanks

On Sat, Apr 21, 2018 at 3:15 PM, JP de Vooght <[hidden email]> wrote:
Hello,

When I submit a WordCount job from the examples, without any parameter,
to a standalone cluster running on Docker, I cannot see the
System.out.println() message. Instead I notice the
org.apache.flink.metrics.MetricGroup entry below (last)

jobmanager_1   | Starting Job Manager
jobmanager_1   | config file:
jobmanager_1   | jobmanager.rpc.address: jobmanager
jobmanager_1   | jobmanager.rpc.port: 6123
jobmanager_1   | jobmanager.heap.mb: 1024
jobmanager_1   | taskmanager.heap.mb: 1024
jobmanager_1   | taskmanager.numberOfTaskSlots: 1
jobmanager_1   | taskmanager.memory.preallocate: false
jobmanager_1   | parallelism.default: 1
jobmanager_1   | web.port: 8081
jobmanager_1   | blob.server.port: 6124
jobmanager_1   | query.server.port: 6125
jobmanager_1   | Starting jobmanager as a console application on host
400baa8ff371.
taskmanager_1  | Starting Task Manager
taskmanager_1  | config file:
taskmanager_1  | jobmanager.rpc.address: jobmanager
taskmanager_1  | jobmanager.rpc.port: 6123
taskmanager_1  | jobmanager.heap.mb: 1024
taskmanager_1  | taskmanager.heap.mb: 1024
taskmanager_1  | taskmanager.numberOfTaskSlots: 4
taskmanager_1  | taskmanager.memory.preallocate: false
taskmanager_1  | parallelism.default: 1
taskmanager_1  | web.port: 8081
taskmanager_1  | blob.server.port: 6124
taskmanager_1  | query.server.port: 6125
taskmanager_1  | Starting taskmanager as a console application on host
531ef6c27264.
jobmanager_1   | Slf4jLogger started
taskmanager_1  | Slf4jLogger started
taskmanager_1  | Could not load Queryable State Client Proxy. Probable
reason: flink-queryable-state-runtime is not in the classpath. To enable
Queryable State, please move the flink-queryable-state-runtime jar from
the opt to the lib folder.
taskmanager_1  | Could not load Queryable State Server. Probable reason:
flink-queryable-state-runtime is not in the classpath. To enable
Queryable State, please move the flink-queryable-state-runtime jar from
the opt to the lib folder.
taskmanager_1  | The operator name DataSource (at
getDefaultTextLineDataSet(WordCountData.java:70)
(org.apache.flink.api.java.io.CollectionInputFormat)) exceeded the 80
characters length limit and was truncated.

I am not sure why this happens. I am using the latest Docker image with
1.4.2 and the corresponding JAR from the 1.4.2 release of the examples.

TIA

JP




Reply | Threaded
Open this post in threaded view
|

Re: WordCount with Flink Docker

Ted Yu
In reply to this post by jp
Looking at flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/wordcount/util/WordCountData.java , line 70 is

    return env.fromElements(WORDS);

possibly indicating some long line in the DataSet.

FYI

On Sun, Apr 22, 2018 at 1:33 AM, JP de Vooght <[hidden email]> wrote:

Thank you Ted!

I still get the message below...

taskmanager_1  | The operator name DataSource (at
getDefaultTextLineDataSet(WordCountData.java:70 (org.apache.flink.api.java.io.CollectionInputFormat)) exceeded the 80 characters length limit and was truncated.

The corresponding line 70 in Wordcount.java is

System.out.println("Executing WordCount example with default input data set.");

I got the docker image from https://github.com/docker-flink/docker-flink and still get this message even with all defaults.


On 04/22/2018 12:39 AM, Ted Yu wrote:
bq. To enable Queryable State, please move the flink-queryable-state-runtime jar from the opt to the lib folder.

Have you noticed the above and taken corresponding action ?

Thanks

On Sat, Apr 21, 2018 at 3:15 PM, JP de Vooght <[hidden email]> wrote:
Hello,

When I submit a WordCount job from the examples, without any parameter,
to a standalone cluster running on Docker, I cannot see the
System.out.println() message. Instead I notice the
org.apache.flink.metrics.MetricGroup entry below (last)

jobmanager_1   | Starting Job Manager
jobmanager_1   | config file:
jobmanager_1   | jobmanager.rpc.address: jobmanager
jobmanager_1   | jobmanager.rpc.port: 6123
jobmanager_1   | jobmanager.heap.mb: 1024
jobmanager_1   | taskmanager.heap.mb: 1024
jobmanager_1   | taskmanager.numberOfTaskSlots: 1
jobmanager_1   | taskmanager.memory.preallocate: false
jobmanager_1   | parallelism.default: 1
jobmanager_1   | web.port: 8081
jobmanager_1   | blob.server.port: 6124
jobmanager_1   | query.server.port: 6125
jobmanager_1   | Starting jobmanager as a console application on host
400baa8ff371.
taskmanager_1  | Starting Task Manager
taskmanager_1  | config file:
taskmanager_1  | jobmanager.rpc.address: jobmanager
taskmanager_1  | jobmanager.rpc.port: 6123
taskmanager_1  | jobmanager.heap.mb: 1024
taskmanager_1  | taskmanager.heap.mb: 1024
taskmanager_1  | taskmanager.numberOfTaskSlots: 4
taskmanager_1  | taskmanager.memory.preallocate: false
taskmanager_1  | parallelism.default: 1
taskmanager_1  | web.port: 8081
taskmanager_1  | blob.server.port: 6124
taskmanager_1  | query.server.port: 6125
taskmanager_1  | Starting taskmanager as a console application on host
531ef6c27264.
jobmanager_1   | Slf4jLogger started
taskmanager_1  | Slf4jLogger started
taskmanager_1  | Could not load Queryable State Client Proxy. Probable
reason: flink-queryable-state-runtime is not in the classpath. To enable
Queryable State, please move the flink-queryable-state-runtime jar from
the opt to the lib folder.
taskmanager_1  | Could not load Queryable State Server. Probable reason:
flink-queryable-state-runtime is not in the classpath. To enable
Queryable State, please move the flink-queryable-state-runtime jar from
the opt to the lib folder.
taskmanager_1  | The operator name DataSource (at
getDefaultTextLineDataSet(WordCountData.java:70)
(org.apache.flink.api.java.io.CollectionInputFormat)) exceeded the 80
characters length limit and was truncated.

I am not sure why this happens. I am using the latest Docker image with
1.4.2 and the corresponding JAR from the 1.4.2 release of the examples.

TIA

JP




Reply | Threaded
Open this post in threaded view
|

Re: WordCount with Flink Docker

Ted Yu
bq. at getDefaultTextLineDataSet(WordCountData.java:70)(org.apache.flink.api.java.io.CollectionInputFormat)

I think the above was part of stack trace indicating some parsing problem. (similarly with at main(SimpleExample.java:30))

In your program, I guess line 30 was for the amounts.filter call.

Please check job manager / task manager log to see if you can find the complete stack trace.

On Sun, Apr 22, 2018 at 6:55 AM, Ted Yu <[hidden email]> wrote:
Looking at flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/wordcount/util/WordCountData.java , line 70 is

    return env.fromElements(WORDS);

possibly indicating some long line in the DataSet.

FYI

On Sun, Apr 22, 2018 at 1:33 AM, JP de Vooght <[hidden email]> wrote:

Thank you Ted!

I still get the message below...

taskmanager_1  | The operator name DataSource (at
getDefaultTextLineDataSet(WordCountData.java:70 (org.apache.flink.api.java.io.CollectionInputFormat)) exceeded the 80 characters length limit and was truncated.

The corresponding line 70 in Wordcount.java is

System.out.println("Executing WordCount example with default input data set.");

I got the docker image from https://github.com/docker-flink/docker-flink and still get this message even with all defaults.


On 04/22/2018 12:39 AM, Ted Yu wrote:
bq. To enable Queryable State, please move the flink-queryable-state-runtime jar from the opt to the lib folder.

Have you noticed the above and taken corresponding action ?

Thanks

On Sat, Apr 21, 2018 at 3:15 PM, JP de Vooght <[hidden email]> wrote:
Hello,

When I submit a WordCount job from the examples, without any parameter,
to a standalone cluster running on Docker, I cannot see the
System.out.println() message. Instead I notice the
org.apache.flink.metrics.MetricGroup entry below (last)

jobmanager_1   | Starting Job Manager
jobmanager_1   | config file:
jobmanager_1   | jobmanager.rpc.address: jobmanager
jobmanager_1   | jobmanager.rpc.port: 6123
jobmanager_1   | jobmanager.heap.mb: 1024
jobmanager_1   | taskmanager.heap.mb: 1024
jobmanager_1   | taskmanager.numberOfTaskSlots: 1
jobmanager_1   | taskmanager.memory.preallocate: false
jobmanager_1   | parallelism.default: 1
jobmanager_1   | web.port: 8081
jobmanager_1   | blob.server.port: 6124
jobmanager_1   | query.server.port: 6125
jobmanager_1   | Starting jobmanager as a console application on host
400baa8ff371.
taskmanager_1  | Starting Task Manager
taskmanager_1  | config file:
taskmanager_1  | jobmanager.rpc.address: jobmanager
taskmanager_1  | jobmanager.rpc.port: 6123
taskmanager_1  | jobmanager.heap.mb: 1024
taskmanager_1  | taskmanager.heap.mb: 1024
taskmanager_1  | taskmanager.numberOfTaskSlots: 4
taskmanager_1  | taskmanager.memory.preallocate: false
taskmanager_1  | parallelism.default: 1
taskmanager_1  | web.port: 8081
taskmanager_1  | blob.server.port: 6124
taskmanager_1  | query.server.port: 6125
taskmanager_1  | Starting taskmanager as a console application on host
531ef6c27264.
jobmanager_1   | Slf4jLogger started
taskmanager_1  | Slf4jLogger started
taskmanager_1  | Could not load Queryable State Client Proxy. Probable
reason: flink-queryable-state-runtime is not in the classpath. To enable
Queryable State, please move the flink-queryable-state-runtime jar from
the opt to the lib folder.
taskmanager_1  | Could not load Queryable State Server. Probable reason:
flink-queryable-state-runtime is not in the classpath. To enable
Queryable State, please move the flink-queryable-state-runtime jar from
the opt to the lib folder.
taskmanager_1  | The operator name DataSource (at
getDefaultTextLineDataSet(WordCountData.java:70)
(org.apache.flink.api.java.io.CollectionInputFormat)) exceeded the 80
characters length limit and was truncated.

I am not sure why this happens. I am using the latest Docker image with
1.4.2 and the corresponding JAR from the 1.4.2 release of the examples.

TIA

JP