Exception using flink-connector-elasticsearch

classic Classic list List threaded Threaded
8 messages Options
Reply | Threaded
Open this post in threaded view
|

Exception using flink-connector-elasticsearch

Lopez, Javier
Hi,

We are using the sink for ElasticSearch and when we try to run our job we get the following exception:

java.lang.ExceptionInInitializerError Caused by: java.lang.IllegalArgumentException: An SPI class of type org.apache.lucene.codecs.Codec with name 'Lucene410' does not exist.  You need to add the corresponding JAR file supporting this SPI to your classpath.  The current classpath supports the following names: []

We are using embedded nodes and we don't know if we are missing some configuration for the elasticsearch client. This is the code we are using:

Map<String, String> config = Maps.newHashMap();

  config.put("bulk.flush.max.actions", "1");
  
  config.put("cluster.name", "flink-test");
  
  

  result.addSink(new ElasticsearchSink<>(config, new IndexRequestBuilder<Tuple4<String, Double, Long, Double>>() {
      @Override
      public org.elasticsearch.action.index.IndexRequest createIndexRequest(Tuple4<String, Double, Long, Double> element, RuntimeContext ctx) {
          Map<String, Object> json = new HashMap<>();
          json.put("data", element);
          return org.elasticsearch.client.Requests.indexRequest()
                  .index("stream_test_1")
                  .type("aggregation_test")
                  .source(json);
      }
  }));

The flink server as well as the elasticsearch server are in the same local machine. 

Thanks for your help
Reply | Threaded
Open this post in threaded view
|

Re: Exception using flink-connector-elasticsearch

Till Rohrmann

Hi Javier,

it seems as if you either are missing the lucene-codecs jar in your classpath or that you have a wrong version (not 4.10.4). Could you check in your job jar whether it includes lucence-codecs? If so, could you run mvn dependency:tree in the root directory of your project. There you should see which version of lucene-codecs you have included and from where it stems.

Cheers,
Till


On Tue, Jan 12, 2016 at 11:55 AM, Lopez, Javier <[hidden email]> wrote:
Hi,

We are using the sink for ElasticSearch and when we try to run our job we get the following exception:

java.lang.ExceptionInInitializerError Caused by: java.lang.IllegalArgumentException: An SPI class of type org.apache.lucene.codecs.Codec with name 'Lucene410' does not exist.  You need to add the corresponding JAR file supporting this SPI to your classpath.  The current classpath supports the following names: []

We are using embedded nodes and we don't know if we are missing some configuration for the elasticsearch client. This is the code we are using:

Map<String, String> config = Maps.newHashMap();

  config.put("bulk.flush.max.actions", "1");
  
  config.put("cluster.name", "flink-test");
  
  

  result.addSink(new ElasticsearchSink<>(config, new IndexRequestBuilder<Tuple4<String, Double, Long, Double>>() {
      @Override
      public org.elasticsearch.action.index.IndexRequest createIndexRequest(Tuple4<String, Double, Long, Double> element, RuntimeContext ctx) {
          Map<String, Object> json = new HashMap<>();
          json.put("data", element);
          return org.elasticsearch.client.Requests.indexRequest()
                  .index("stream_test_1")
                  .type("aggregation_test")
                  .source(json);
      }
  }));

The flink server as well as the elasticsearch server are in the same local machine. 

Thanks for your help

Reply | Threaded
Open this post in threaded view
|

Re: Exception using flink-connector-elasticsearch

Aljoscha Krettek-2
In reply to this post by Lopez, Javier
Hi,
could you please try adding the lucene-core-4.10.4.jar file to your lib folder of Flink. (https://repo1.maven.org/maven2/org/apache/lucene/lucene-core/4.10.4/) Elasticsearch uses dependency injection to resolve the classes and maven is not really aware of this.

Also you could try adding lucent-codecs-4.10.4.jar as well (https://repo1.maven.org/maven2/org/apache/lucene/lucene-codecs/4.10.4/).

Cheers,
Aljoscha

> On 12 Jan 2016, at 11:55, Lopez, Javier <[hidden email]> wrote:
>
> Hi,
>
> We are using the sink for ElasticSearch and when we try to run our job we get the following exception:
>
> java.lang.ExceptionInInitializerError Caused by: java.lang.IllegalArgumentException: An SPI class of type org.apache.lucene.codecs.Codec with name 'Lucene410' does not exist.  You need to add the corresponding JAR file supporting this SPI to your classpath.  The current classpath supports the following names: []
>
> We are using embedded nodes and we don't know if we are missing some configuration for the elasticsearch client. This is the code we are using:
>
> Map<String, String> config = Maps.newHashMap();
>
>   config.put("bulk.flush.max.actions", "1");
>  
>   config.put("cluster.name", "flink-test");
>  
>  
>
>   result.addSink(new ElasticsearchSink<>(config, new IndexRequestBuilder<Tuple4<String, Double, Long, Double>>() {
>       @Override
>       public org.elasticsearch.action.index.IndexRequest createIndexRequest(Tuple4<String, Double, Long, Double> element, RuntimeContext ctx) {
>           Map<String, Object> json = new HashMap<>();
>           json.put("data", element);
>           return org.elasticsearch.client.Requests.indexRequest()
>                   .index("stream_test_1")
>                   .type("aggregation_test")
>                   .source(json);
>       }
>   }));
>
> The flink server as well as the elasticsearch server are in the same local machine.
>
> Thanks for your help

Reply | Threaded
Open this post in threaded view
|

global function over partitions

Radu Tudoran
Hi,

I am trying to compute some final statistics over a stream topology. For this I would like to gather all data from all windows and parallel partitions into a single/global window. Could you suggest a solution for this. I saw that the map function has a ".global()" but I end up with the same number of partitions as I have in the main computation. Bellow you can find a schema for the program:


DataStream stream = env.Read...

end.setParallelism(10);
//Compute phase
        DataStream<Tuple...> result = stream.keyBy(_).window(_).apply();
//end compute phase


//get the metrics
        result.map(//extract some of the Tuple fields).global().timeWindowAll(Time.of(5, TimeUnit.SECONDS),Time.of(1, TimeUnit.SECONDS))
                .trigger(EventTimeTrigger.create()).apply ().writeAsText();


For this last function - I would expect that even if I had parallel computation during the compute phase, I can select part of the events from all partitions and gather all these into one unique window. However, I do not seem to be successful in this.
I also tried by applying a keyBy() to the result stream in which I assigned the same hash to any event, but the result remains the same.
result.map((//extract some of the Tuple fields).keyBy(
new KeySelector<Tuple2<Long,Long>, Integer>() {
                        @Override
                        public Integer getKey(Tuple2<Long, Long> arg0) throws Exception {
                               
                                return 1;
                        }
                        @Override
                        public int hashCode() {
                               
                                return 1;
                        }
                       
                }). timeWindowAll().apply()


Thanks for the help/ideas



Reply | Threaded
Open this post in threaded view
|

Re: Exception using flink-connector-elasticsearch

Lopez, Javier
In reply to this post by Aljoscha Krettek-2
Hi,

Thanks Aljoscha, the libraries solved the problem. It worked perfectly!.

On 12 January 2016 at 14:03, Aljoscha Krettek <[hidden email]> wrote:
Hi,
could you please try adding the lucene-core-4.10.4.jar file to your lib folder of Flink. (https://repo1.maven.org/maven2/org/apache/lucene/lucene-core/4.10.4/) Elasticsearch uses dependency injection to resolve the classes and maven is not really aware of this.

Also you could try adding lucent-codecs-4.10.4.jar as well (https://repo1.maven.org/maven2/org/apache/lucene/lucene-codecs/4.10.4/).

Cheers,
Aljoscha
> On 12 Jan 2016, at 11:55, Lopez, Javier <[hidden email]> wrote:
>
> Hi,
>
> We are using the sink for ElasticSearch and when we try to run our job we get the following exception:
>
> java.lang.ExceptionInInitializerError Caused by: java.lang.IllegalArgumentException: An SPI class of type org.apache.lucene.codecs.Codec with name 'Lucene410' does not exist.  You need to add the corresponding JAR file supporting this SPI to your classpath.  The current classpath supports the following names: []
>
> We are using embedded nodes and we don't know if we are missing some configuration for the elasticsearch client. This is the code we are using:
>
> Map<String, String> config = Maps.newHashMap();
>
>   config.put("bulk.flush.max.actions", "1");
>
>   config.put("cluster.name", "flink-test");
>
>
>
>   result.addSink(new ElasticsearchSink<>(config, new IndexRequestBuilder<Tuple4<String, Double, Long, Double>>() {
>       @Override
>       public org.elasticsearch.action.index.IndexRequest createIndexRequest(Tuple4<String, Double, Long, Double> element, RuntimeContext ctx) {
>           Map<String, Object> json = new HashMap<>();
>           json.put("data", element);
>           return org.elasticsearch.client.Requests.indexRequest()
>                   .index("stream_test_1")
>                   .type("aggregation_test")
>                   .source(json);
>       }
>   }));
>
> The flink server as well as the elasticsearch server are in the same local machine.
>
> Thanks for your help


Reply | Threaded
Open this post in threaded view
|

Re: global function over partitions

rmetzger0
In reply to this post by Radu Tudoran
Hi Radu,

I'm sorry for the delayed response.
I'm not sure what the purpose of DataStream.global() actually is. I've opened a JIRA to document or remove it: https://issues.apache.org/jira/browse/FLINK-3240.

For getting the final metrics, you can just call ".timeWindowAll()", without a ".global()" call before. The timeWindowAll() will run with a parallelism of one, hence it will receive the data from all partitions.

Regards,
Robert



 

On Tue, Jan 12, 2016 at 6:59 PM, Radu Tudoran <[hidden email]> wrote:
Hi,

I am trying to compute some final statistics over a stream topology. For this I would like to gather all data from all windows and parallel partitions into a single/global window. Could you suggest a solution for this. I saw that the map function has a ".global()" but I end up with the same number of partitions as I have in the main computation. Bellow you can find a schema for the program:


DataStream stream = env.Read...

end.setParallelism(10);
//Compute phase
        DataStream<Tuple...> result = stream.keyBy(_).window(_).apply();
//end compute phase


//get the metrics
        result.map(//extract some of the Tuple fields).global().timeWindowAll(Time.of(5, TimeUnit.SECONDS),Time.of(1, TimeUnit.SECONDS))
                .trigger(EventTimeTrigger.create()).apply ().writeAsText();


For this last function - I would expect that even if I had parallel computation during the compute phase, I can select part of the events from all partitions and gather all these into one unique window. However, I do not seem to be successful in this.
I also tried by applying a keyBy() to the result stream in which I assigned the same hash to any event, but the result remains the same.
result.map((//extract some of the Tuple fields).keyBy(
new KeySelector<Tuple2<Long,Long>, Integer>() {
                        @Override
                        public Integer getKey(Tuple2<Long, Long> arg0) throws Exception {

                                return 1;
                        }
                        @Override
                        public int hashCode() {

                                return 1;
                        }

                }). timeWindowAll().apply()


Thanks for the help/ideas




Reply | Threaded
Open this post in threaded view
|

RE: global function over partitions

Radu Tudoran

Hi,

 

Thanks for the response.

 

1) regarding the JIRA issue related to the .global and .forward functions – I believe it is a good idea to be removed as they are confusing. Actually, they are totally missing from the documentation webpage

https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming_guide.html#windows

which makes things even more confusing regarding what is their role/capabilities.

 

2) regarding the ".timeWindowAll()", it’s behavior is not as one / I would expect. I am not sure if this behavior is intentional or there is an error. I would expect as mentioned in my initial email that even if on the previous operators I have a parallelism of N, using this function I can get a parallelism of 1 in which I can aggregate the data from the previous operators. However, it is not really the case. More specifically, it is not the case when you execute this function in a cluster with more machine (on the other hand it works ok for the local case!).  It turns out that the parallelism degree is kept when being run in the cluster (and it is to guess I would say the function is executed round-robin over the executors). So if you use this function to aggregate all data in one place you will end up aggregating it over multiple parallel instances. I am attaching bellow a dummy piece of code to exemplify.

 

The function reads events from a network socket and multiplies these events based on the parallelism degree. The stream is partitioned based on a key. This is followed by the “main computation” that is run in parallel  and finally by an aggregation part.  For this aggregation part I use as suggested “.timeWindowAll()”. Assume that this aggregation function counts the events  processed in the system across all instances and prints/logs this data.

For example if you run this with a parallelism degree of 10 – you end up with outputs from the timeWindowAll() across all instances in the cluster. A sample output is shown below. This shows that despite that the function should be executed with parallelism 1, actually it is not – so it cannot aggregate the data into one place…  Is this actually the intended behavior (case in which it would be interested to understand what is the target scenario) or is there an error?

 

Machine1:                                (values in the file)

/tmp/testoutput/1   (10)

/tmp/testoutput/2   (20)

 

Machine2:

/tmp/testoutput/6   (10)

/tmp/testoutput/4

/tmp/testoutput/7   (40)

 

Machine3:

/tmp/testoutput/5

/tmp/testoutput/3

 

 

……………

 

 

 

public static void main(String[] args) throws Exception {

 

              final StreamExecutionEnvironment env = StreamExecutionEnvironment

                           .getExecutionEnvironment();

 

              final int parallelism = 10;

              env.setParallelism(parallelism);

 

              DataStream<Tuple2<Integer, String>> inputStream = env.socketTextStream(

                           hostIP, port2use, '\n').flatMap(

                           new FlatMapFunction<String, Tuple2<Integer, String>>() {

 

                                  @Override

                                  public void flatMap(String arg0,

                                                Collector<Tuple2<Integer, String>> arg1)

                                                throws Exception {

 

                                         for (int i = 0; i < parallelism; i++)

                                                arg1.collect(new Tuple2(i, arg0));

                                  }

 

                           });

 

              DataStream<Tuple2<Integer, Integer>> result = inputStream

                           .keyBy(0)

                           .timeWindow(Time.of(2, TimeUnit.SECONDS))

                           .apply(new WindowFunction<Tuple2<Integer, String>, Tuple2<Integer, Integer>, Tuple, TimeWindow>() {

                                  public void apply(

                                                Tuple arg0,

                                                TimeWindow arg1,

                                                java.lang.Iterable<org.apache.flink.api.java.tuple.Tuple2<Integer, String>> arg2,

                                                org.apache.flink.util.Collector<org.apache.flink.api.java.tuple.Tuple2<Integer, Integer>> arg3)

                                                throws Exception {

 

                                         // Compuatation ....

                                         int count = 0;

                                         for (Tuple2<Integer, String> value : arg2) {

                                                count++;

                                                arg3.collect(new Tuple2<Integer, Integer>(value.f0,

                                                              value.f1.length()));

                                         }

                                         //System.out.println("Count per hash is " + count);

                                  };

 

                           });

 

              result.timeWindowAll(Time.of(2, TimeUnit.SECONDS))

                           .apply(new AllWindowFunction<Tuple2<Integer, Integer>, Tuple1<Integer>, TimeWindow>() {

                                  @Override

                                  public void apply(TimeWindow arg0,

                                                Iterable<Tuple2<Integer, Integer>> arg1,

                                                Collector<Tuple1<Integer>> arg2) throws Exception {

 

                                         // Compuatation ....

                                         int count = 0;

                                         for (Tuple2<Integer, Integer> value : arg1) {

                                                count++;

                                         }

                                         //System.out.println("Count aggregated metrics is "

                                         //            + count + " at " + System.currentTimeMillis());

                                         arg2.collect(new Tuple1(count));

 

                                  }

                           }).setParallelism(1)

                           .writeAsText("/tmp/testoutput", WriteMode.OVERWRITE);

 

              env.execute("main stream application");

 

       }

 

 

 

Regards,

 

 

Dr. Radu Tudoran

Research Engineer - Big Data Expert

IT R&D Division

 

cid:image007.jpg@01CD52EB.AD060EE0

HUAWEI TECHNOLOGIES Duesseldorf GmbH

European Research Center

Riesstrasse 25, 80992 München

 

E-mail: [hidden email]

Mobile: +49 15209084330

Telephone: +49 891588344173

 

HUAWEI TECHNOLOGIES Duesseldorf GmbH
Hansaallee 205, 40549 Düsseldorf, Germany,
www.huawei.com
Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN
Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN

This e-mail and its attachments contain confidential information from HUAWEI, which is intended only for the person or entity whose address is listed above. Any use of the information contained herein in any way (including, but not limited to, total or partial disclosure, reproduction, or dissemination) by persons other than the intended recipient(s) is prohibited. If you receive this e-mail in error, please notify the sender by phone or email immediately and delete it!

 

From: Robert Metzger [mailto:[hidden email]]
Sent: Friday, January 15, 2016 10:18 AM
To: [hidden email]
Subject: Re: global function over partitions

 

Hi Radu,

 

I'm sorry for the delayed response.

I'm not sure what the purpose of DataStream.global() actually is. I've opened a JIRA to document or remove it: https://issues.apache.org/jira/browse/FLINK-3240.

 

For getting the final metrics, you can just call ".timeWindowAll()", without a ".global()" call before. The timeWindowAll() will run with a parallelism of one, hence it will receive the data from all partitions.

 

Regards,

Robert

 

 

 

 

 

On Tue, Jan 12, 2016 at 6:59 PM, Radu Tudoran <[hidden email]> wrote:

Hi,

I am trying to compute some final statistics over a stream topology. For this I would like to gather all data from all windows and parallel partitions into a single/global window. Could you suggest a solution for this. I saw that the map function has a ".global()" but I end up with the same number of partitions as I have in the main computation. Bellow you can find a schema for the program:


DataStream stream = env.Read...

end.setParallelism(10);
//Compute phase
        DataStream<Tuple...> result = stream.keyBy(_).window(_).apply();
//end compute phase


//get the metrics
        result.map(//extract some of the Tuple fields).global().timeWindowAll(Time.of(5, TimeUnit.SECONDS),Time.of(1, TimeUnit.SECONDS))
                .trigger(EventTimeTrigger.create()).apply ().writeAsText();


For this last function - I would expect that even if I had parallel computation during the compute phase, I can select part of the events from all partitions and gather all these into one unique window. However, I do not seem to be successful in this.
I also tried by applying a keyBy() to the result stream in which I assigned the same hash to any event, but the result remains the same.
result.map((//extract some of the Tuple fields).keyBy(
new KeySelector<Tuple2<Long,Long>, Integer>() {
                        @Override
                        public Integer getKey(Tuple2<Long, Long> arg0) throws Exception {

                                return 1;
                        }
                        @Override
                        public int hashCode() {

                                return 1;
                        }

                }). timeWindowAll().apply()


Thanks for the help/ideas


 

Reply | Threaded
Open this post in threaded view
|

Re: global function over partitions

Aljoscha Krettek
Hi,
I think the reason why you are seeing output across all parallel machines is that the sink itself has parallelism=10 again. So even though there is only one parallel instance of the All-WIndow Operator, the results of this get shipped (round-robin) to the 10 parallel instances of the file sink.

By default, streaming operations don’t adapt to the parallelism of upstream operations, so if you want that you also have to specify the parallelism of the sink.

Cheers,
Aljoscha

> On 15 Jan 2016, at 15:08, Radu Tudoran <[hidden email]> wrote:
>
> Hi,
>  
> Thanks for the response.
>  
> 1) regarding the JIRA issue related to the .global and .forward functions – I believe it is a good idea to be removed as they are confusing. Actually, they are totally missing from the documentation webpage
> https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming_guide.html#windows
> which makes things even more confusing regarding what is their role/capabilities.
>  
> 2) regarding the ".timeWindowAll()", it’s behavior is not as one / I would expect. I am not sure if this behavior is intentional or there is an error. I would expect as mentioned in my initial email that even if on the previous operators I have a parallelism of N, using this function I can get a parallelism of 1 in which I can aggregate the data from the previous operators. However, it is not really the case. More specifically, it is not the case when you execute this function in a cluster with more machine (on the other hand it works ok for the local case!).  It turns out that the parallelism degree is kept when being run in the cluster (and it is to guess I would say the function is executed round-robin over the executors). So if you use this function to aggregate all data in one place you will end up aggregating it over multiple parallel instances. I am attaching bellow a dummy piece of code to exemplify.
>  
> The function reads events from a network socket and multiplies these events based on the parallelism degree. The stream is partitioned based on a key. This is followed by the “main computation” that is run in parallel  and finally by an aggregation part.  For this aggregation part I use as suggested “.timeWindowAll()”. Assume that this aggregation function counts the events  processed in the system across all instances and prints/logs this data.
> For example if you run this with a parallelism degree of 10 – you end up with outputs from the timeWindowAll() across all instances in the cluster. A sample output is shown below. This shows that despite that the function should be executed with parallelism 1, actually it is not – so it cannot aggregate the data into one place…  Is this actually the intended behavior (case in which it would be interested to understand what is the target scenario) or is there an error?
>  
> Machine1:                                (values in the file)
> /tmp/testoutput/1   (10)
> /tmp/testoutput/2   (20)
>  
> Machine2:
> /tmp/testoutput/6   (10)
> /tmp/testoutput/4
> /tmp/testoutput/7   (40)
>  
> Machine3:
> /tmp/testoutput/5
> /tmp/testoutput/3
>  
>  
> ……………
>  
>  
>  
> public static void main(String[] args) throws Exception {
>  
>               final StreamExecutionEnvironment env = StreamExecutionEnvironment
>                            .getExecutionEnvironment();
>  
>               final int parallelism = 10;
>               env.setParallelism(parallelism);
>  
>               DataStream<Tuple2<Integer, String>> inputStream = env.socketTextStream(
>                            hostIP, port2use, '\n').flatMap(
>                            new FlatMapFunction<String, Tuple2<Integer, String>>() {
>  
>                                   @Override
>                                   public void flatMap(String arg0,
>                                                 Collector<Tuple2<Integer, String>> arg1)
>                                                 throws Exception {
>  
>                                          for (int i = 0; i < parallelism; i++)
>                                                 arg1.collect(new Tuple2(i, arg0));
>                                   }
>  
>                            });
>  
>               DataStream<Tuple2<Integer, Integer>> result = inputStream
>                            .keyBy(0)
>                            .timeWindow(Time.of(2, TimeUnit.SECONDS))
>                            .apply(new WindowFunction<Tuple2<Integer, String>, Tuple2<Integer, Integer>, Tuple, TimeWindow>() {
>                                   public void apply(
>                                                 Tuple arg0,
>                                                 TimeWindow arg1,
>                                                 java.lang.Iterable<org.apache.flink.api.java.tuple.Tuple2<Integer, String>> arg2,
>                                                 org.apache.flink.util.Collector<org.apache.flink.api.java.tuple.Tuple2<Integer, Integer>> arg3)
>                                                 throws Exception {
>  
>                                          // Compuatation ....
>                                          int count = 0;
>                                          for (Tuple2<Integer, String> value : arg2) {
>                                                 count++;
>                                                 arg3.collect(new Tuple2<Integer, Integer>(value.f0,
>                                                               value.f1.length()));
>                                          }
>                                          //System.out.println("Count per hash is " + count);
>                                   };
>  
>                            });
>  
>               result.timeWindowAll(Time.of(2, TimeUnit.SECONDS))
>                            .apply(new AllWindowFunction<Tuple2<Integer, Integer>, Tuple1<Integer>, TimeWindow>() {
>                                   @Override
>                                   public void apply(TimeWindow arg0,
>                                                 Iterable<Tuple2<Integer, Integer>> arg1,
>                                                 Collector<Tuple1<Integer>> arg2) throws Exception {
>  
>                                          // Compuatation ....
>                                          int count = 0;
>                                          for (Tuple2<Integer, Integer> value : arg1) {
>                                                 count++;
>                                          }
>                                          //System.out.println("Count aggregated metrics is "
>                                          //            + count + " at " + System.currentTimeMillis());
>                                          arg2.collect(new Tuple1(count));
>  
>                                   }
>                            }).setParallelism(1)
>                            .writeAsText("/tmp/testoutput", WriteMode.OVERWRITE);
>  
>               env.execute("main stream application");
>  
>        }
>  
>  
>  
> Regards,
>  
>  
> Dr. Radu Tudoran
> Research Engineer - Big Data Expert
> IT R&D Division
>  
> <image001.png>
> HUAWEI TECHNOLOGIES Duesseldorf GmbH
> European Research Center
> Riesstrasse 25, 80992 München
>  
> E-mail: [hidden email]
> Mobile: +49 15209084330
> Telephone: +49 891588344173
>  
> HUAWEI TECHNOLOGIES Duesseldorf GmbH
> Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
> Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
> Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN
> Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
> Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN
> This e-mail and its attachments contain confidential information from HUAWEI, which is intended only for the person or entity whose address is listed above. Any use of the information contained herein in any way (including, but not limited to, total or partial disclosure, reproduction, or dissemination) by persons other than the intended recipient(s) is prohibited. If you receive this e-mail in error, please notify the sender by phone or email immediately and delete it!
>  
> From: Robert Metzger [mailto:[hidden email]]
> Sent: Friday, January 15, 2016 10:18 AM
> To: [hidden email]
> Subject: Re: global function over partitions
>  
> Hi Radu,
>  
> I'm sorry for the delayed response.
> I'm not sure what the purpose of DataStream.global() actually is. I've opened a JIRA to document or remove it: https://issues.apache.org/jira/browse/FLINK-3240.
>  
> For getting the final metrics, you can just call ".timeWindowAll()", without a ".global()" call before. The timeWindowAll() will run with a parallelism of one, hence it will receive the data from all partitions.
>  
> Regards,
> Robert
>  
>  
>  
>  
>  
> On Tue, Jan 12, 2016 at 6:59 PM, Radu Tudoran <[hidden email]> wrote:
> Hi,
>
> I am trying to compute some final statistics over a stream topology. For this I would like to gather all data from all windows and parallel partitions into a single/global window. Could you suggest a solution for this. I saw that the map function has a ".global()" but I end up with the same number of partitions as I have in the main computation. Bellow you can find a schema for the program:
>
>
> DataStream stream = env.Read...
>
> end.setParallelism(10);
> //Compute phase
>         DataStream<Tuple...> result = stream.keyBy(_).window(_).apply();
> //end compute phase
>
>
> //get the metrics
>         result.map(//extract some of the Tuple fields).global().timeWindowAll(Time.of(5, TimeUnit.SECONDS),Time.of(1, TimeUnit.SECONDS))
>                 .trigger(EventTimeTrigger.create()).apply ().writeAsText();
>
>
> For this last function - I would expect that even if I had parallel computation during the compute phase, I can select part of the events from all partitions and gather all these into one unique window. However, I do not seem to be successful in this.
> I also tried by applying a keyBy() to the result stream in which I assigned the same hash to any event, but the result remains the same.
> result.map((//extract some of the Tuple fields).keyBy(
> new KeySelector<Tuple2<Long,Long>, Integer>() {
>                         @Override
>                         public Integer getKey(Tuple2<Long, Long> arg0) throws Exception {
>
>                                 return 1;
>                         }
>                         @Override
>                         public int hashCode() {
>
>                                 return 1;
>                         }
>
>                 }). timeWindowAll().apply()
>
>
> Thanks for the help/ideas
>