NPE with Flink Streaming from Kafka

classic Classic list List threaded Threaded
32 messages Options
12
Reply | Threaded
Open this post in threaded view
|

RE: Question about DataStream serialization

Radu Tudoran

Hi,

 

Taking the example you mentioned of using RichFlatMapFunction and in the open() reading a file.

Would this open function be executed on each node where the RichFlatMapFunction gets executed? (I have done some tests and I would get the feeling it does – but I wanted to double - check )

If so, would this mean that the same data will be loaded multiple times on each parallel instance? Is there anyway, this can be prevented and the data to be hashed and partitioned somehow across nodes?

 

Would using the operator state help?:

OperatorState<MyList<String>> dataset;

I would be curious in this case how could the open function look like to initialize the data for this operator state:

 

 

I have tried to just read a file and write it into the dataset, but I encountered a strange behavior that would look like the flatmap function gets executed before the open function, which leads to using an empty dataset in the flatmap function while when this finish executing the dataset gets loaded. Is this an error or I am doing something wrong?

 

 

 

Dr. Radu Tudoran

Research Engineer

IT R&D Division

 

cid:image007.jpg@01CD52EB.AD060EE0

HUAWEI TECHNOLOGIES Duesseldorf GmbH

European Research Center

Riesstrasse 25, 80992 München

 

E-mail: [hidden email]

Mobile: +49 15209084330

Telephone: +49 891588344173

 

HUAWEI TECHNOLOGIES Duesseldorf GmbH
Hansaallee 205, 40549 Düsseldorf, Germany,
www.huawei.com
Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
Managing Director: Jingwen TAO, Wanzhou MENG, Lifang CHEN
Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
Geschäftsführer: Jingwen TAO, Wanzhou MENG, Lifang CHEN

This e-mail and its attachments contain confidential information from HUAWEI, which is intended only for the person or entity whose address is listed above. Any use of the information contained herein in any way (including, but not limited to, total or partial disclosure, reproduction, or dissemination) by persons other than the intended recipient(s) is prohibited. If you receive this e-mail in error, please notify the sender by phone or email immediately and delete it!

 

From: Robert Metzger [mailto:[hidden email]]
Sent: Tuesday, December 01, 2015 6:21 PM
To: [hidden email]
Cc: Goetz Brasche
Subject: Re: Question about DataStream serialization

 

Hi Radu,

 

both emails reached the mailing list :)

 

You can not reference to DataSets or DataStreams from inside user defined functions. Both are just abstractions for a data set or stream, so the elements are not really inside the set. 

 

We don't have any support for mixing the DataSet and DataStream API.

 

For your use case, I would recommend you to use a RichFlatMapFunction and in the open() call read the text file.

 

 

 

On Tue, Dec 1, 2015 at 5:03 PM, Radu Tudoran <[hidden email]> wrote:

 

Hello,

 

I am not sure if this message was received on the user list, if so I apologies for duplicate messages

 

I have the following scenario  

 

·         Reading a fixed set

DataStream<String> fixedset = env.readtextFile(…

·         Reading a continuous stream of data

DataStream<String> stream = ….

 

I would need that for each event read from the continuous stream to make some operations onit and on the fixedsettoghether

 

 

I have tried something like

 

final myObject.referenceStaticSet = fixedset;

stream.map(new MapFunction<String, String>() {

                     @Override

                     public String map(String arg0) throws Exception {

                          

                           //for example:   final string2add = arg0;

                                                                //the goal of below function would be to add the string2add to the fixedset

                           myObject.referenceStaticSet = myObject.referenceStaticSet.flatMap(new FlatMapFunction<String, String>() {

                          

                                  @Override

                                  public void flatMap(String arg0, Collector<String> arg1)

                                                                                //for example adding to the fixed set also the string2add object:   arg1.collect(string2add);

                                                                                }

}

 

However,  I get an exception (Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: ) that object is not serializable (Object MyClass$3@a71081 not serializable )

 

Looking into this I see that the issues is that the DataStream<> is not serializable. What would be the solution to this issue?

 

As I mentioned before, I would like that for each event from the continuous stream to use the initial fixed set, add the event to it and apply an operation.

Stephan was mentioning at some point some possibility to create a DataSet and launch a batch processing while operating in stream mode– in case this is possible, can you give me a reference for it, because it might be the good solution to  use in case. I am thinking that I could keep the fixed set as a DataSet and as each new event comes, transform it into a dataset and then join with reference set and apply an operation

 

Regards,

 

 

 

 

Dr. Radu Tudoran

Research Engineer

IT R&D Division

 

cid:image007.jpg@01CD52EB.AD060EE0

HUAWEI TECHNOLOGIES Duesseldorf GmbH

European Research Center

Riesstrasse 25, 80992 München

 

E-mail: [hidden email]

Mobile: <a href="tel:%2B49%2015209084330" target="_blank">+49 15209084330

Telephone: <a href="tel:%2B49%20891588344173" target="_blank">+49 891588344173

 

HUAWEI TECHNOLOGIES Duesseldorf GmbH
Hansaallee 205, 40549 Düsseldorf, Germany,
www.huawei.com
Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
Managing Director: Jingwen TAO, Wanzhou MENG, Lifang CHEN
Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
Geschäftsführer: Jingwen TAO, Wanzhou MENG, Lifang CHEN

This e-mail and its attachments contain confidential information from HUAWEI, which is intended only for the person or entity whose address is listed above. Any use of the information contained herein in any way (including, but not limited to, total or partial disclosure, reproduction, or dissemination) by persons other than the intended recipient(s) is prohibited. If you receive this e-mail in error, please notify the sender by phone or email immediately and delete it!

 

From: Vieru, Mihail [mailto:[hidden email]]
Sent: Tuesday, December 01, 2015 4:55 PM
To: [hidden email]
Subject: NPE with Flink Streaming from Kafka

 

Hi,

we get the following NullPointerException after ~50 minutes when running a streaming job with windowing and state that reads data from Kafka and writes the result to local FS.

There are around 170 million messages to be processed, Flink 0.10.1 stops at ~8 million.

Flink runs locally, started with the "start-cluster-streaming.sh" script.


12/01/2015 15:06:24    Job execution switched to status RUNNING.
12/01/2015 15:06:24    Source: Custom Source -> Map -> Map(1/1) switched to SCHEDULED
12/01/2015 15:06:24    Source: Custom Source -> Map -> Map(1/1) switched to DEPLOYING
12/01/2015 15:06:24    Fast TumblingTimeWindows(5000) of Reduce at main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) switched to SCHEDULED
12/01/2015 15:06:24    Fast TumblingTimeWindows(5000) of Reduce at main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) switched to DEPLOYING
12/01/2015 15:06:24    Source: Custom Source -> Map -> Map(1/1) switched to RUNNING
12/01/2015 15:06:24    Fast TumblingTimeWindows(5000) of Reduce at main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) switched to RUNNING
12/01/2015 15:56:08    Fast TumblingTimeWindows(5000) of Reduce at main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) switched to CANCELED
12/01/2015 15:56:08    Source: Custom Source -> Map -> Map(1/1) switched to FAILED
java.lang.Exception
    at org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher.run(LegacyFetcher.java:242)
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.run(FlinkKafkaConsumer.java:397)
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58)
    at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:55)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:218)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NullPointerException
    at org.I0Itec.zkclient.ZkConnection.writeDataReturnStat(ZkConnection.java:115)
    at org.I0Itec.zkclient.ZkClient$10.call(ZkClient.java:817)
    at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:675)
    at org.I0Itec.zkclient.ZkClient.writeDataReturnStat(ZkClient.java:813)
    at org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:808)
    at org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:777)
    at kafka.utils.ZkUtils$.updatePersistentPath(ZkUtils.scala:332)
    at kafka.utils.ZkUtils.updatePersistentPath(ZkUtils.scala)
    at org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.setOffsetInZooKeeper(ZookeeperOffsetHandler.java:112)
    at org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.commit(ZookeeperOffsetHandler.java:80)
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer$PeriodicOffsetCommitter.run(FlinkKafkaConsumer.java:632)

Any ideas on what could cause this behaviour?

 

Best,

Mihail

 

Reply | Threaded
Open this post in threaded view
|

Re: Question about DataStream serialization

Matthias J. Sax-2
Hi Radu,

you are right. The open() method is called for each parallel instance of
a rich function. Thus, if all instanced use the same code, you might
read the same data multiple times.

The easiest was to distinguish different instanced within open() is to
user the RuntimeContext. If offers two methods  "int
getNumberOfParallelSubtasks()" and "int getIndexOfThisSubtask()" that
you can use to compute your own partitioning within open().

For example (just a sketch):

@Override
public void open(Configuration parameters) throws Exception {
  RuntimeContext context = super.getRuntimeContext();
  int dop = context.getNumberOfParallelSubtasks();
  int idx = context.getIndexOfThisSubtask();

  // open file
  // get size of file in bytes

  // seek to partition #idx:
  long seek = fileSize * idx / dop;

  // read "fileSize/dop" bytes
}

Hope this helps.

-Matthias


On 12/08/2015 04:28 AM, Radu Tudoran wrote:

> Hi,
>
>  
>
> Taking the example you mentioned of using RichFlatMapFunction and in the
> open() reading a file.
>
> Would this open function be executed on each node where the
> RichFlatMapFunction gets executed? (I have done some tests and I would
> get the feeling it does – but I wanted to double - check )
>
> If so, would this mean that the same data will be loaded multiple times
> on each parallel instance? Is there anyway, this can be prevented and
> the data to be hashed and partitioned somehow across nodes?
>
>  
>
> Would using the operator state help?:
>
> “
>
> OperatorState*<*MyList<String>*>*dataset*;*
>
> ”
>
> I would be curious in this case how could the open function look like to
> initialize the data for this operator state:
>
>  
>
>  
>
> I have tried to just read a file and write it into the dataset, but I
> encountered a strange behavior that would look like the flatmap function
> gets executed before the open function, which leads to using an empty
> dataset in the flatmap function while when this finish executing the
> dataset gets loaded. Is this an error or I am doing something wrong?
>
>  
>
>  
>
>  
>
> Dr. Radu Tudoran
>
> Research Engineer
>
> IT R&D Division
>
>  
>
> cid:image007.jpg@01CD52EB.AD060EE0
>
> HUAWEI TECHNOLOGIES Duesseldorf GmbH
>
> European Research Center
>
> Riesstrasse 25, 80992 München
>
>  
>
> E-mail: _radu.tudoran@huawei.com_
>
> Mobile: +49 15209084330
>
> Telephone: +49 891588344173
>
>  
>
> HUAWEI TECHNOLOGIES Duesseldorf GmbH
> Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
> <http://www.huawei.com/>
> Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
> Managing Director: Jingwen TAO, Wanzhou MENG, Lifang CHEN
> Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
> Geschäftsführer: Jingwen TAO, Wanzhou MENG, Lifang CHEN
>
> This e-mail and its attachments contain confidential information from
> HUAWEI, which is intended only for the person or entity whose address is
> listed above. Any use of the information contained herein in any way
> (including, but not limited to, total or partial disclosure,
> reproduction, or dissemination) by persons other than the intended
> recipient(s) is prohibited. If you receive this e-mail in error, please
> notify the sender by phone or email immediately and delete it!
>
>  
>
> *From:*Robert Metzger [mailto:[hidden email]]
> *Sent:* Tuesday, December 01, 2015 6:21 PM
> *To:* [hidden email]
> *Cc:* Goetz Brasche
> *Subject:* Re: Question about DataStream serialization
>
>  
>
> Hi Radu,
>
>  
>
> both emails reached the mailing list :)
>
>  
>
> You can not reference to DataSets or DataStreams from inside user
> defined functions. Both are just abstractions for a data set or stream,
> so the elements are not really inside the set.
>
>  
>
> We don't have any support for mixing the DataSet and DataStream API.
>
>  
>
> For your use case, I would recommend you to use a RichFlatMapFunction
> and in the open() call read the text file.
>
>  
>
>  
>
>  
>
> On Tue, Dec 1, 2015 at 5:03 PM, Radu Tudoran <[hidden email]
> <mailto:[hidden email]>> wrote:
>
>  
>
> Hello,
>
>  
>
> I am not sure if this message was received on the user list, if so I
> apologies for duplicate messages
>
>  
>
> I have the following scenario  
>
>  
>
> ·         Reading a fixed set
>
> DataStream<String> /fixedset/ = env.readtextFile(…
>
> ·         Reading a continuous stream of data
>
> DataStream<String> /stream/ = ….
>
>  
>
> I would need that for each event read from the continuous stream to make
> some operations onit and on the /fixedsettoghether/
>
>  
>
>  
>
> I have tried something like
>
>  
>
> final myObject.referenceStaticSet = fixedset;
>
> stream.map(new MapFunction<String, String>() {
>
>                      @Override
>
>                      public String map(String arg0) throws Exception {
>
>                          
>
>                            //for example:   final string2add = arg0;
>
>                                                                 //the
> goal of below function would be to add the string2add to the fixedset
>
>                            myObject.referenceStaticSet =
> myObject.referenceStaticSet.flatMap(new FlatMapFunction<String, String>() {
>
>                          
>
>                                   @Override
>
>                                   public void flatMap(String arg0,
> Collector<String> arg1)
>
>                                                                                
> //for example adding to the fixed set also the string2add object:  
> arg1.collect(string2add);
>
>                                                
>                                 }
>
> …
>
> }
>
>  
>
> However,  I get an exception (Exception in thread "main"
> _org.apache.flink.api.common.InvalidProgramException_: ) that object is
> not serializable (Object MyClass$3@a71081 not serializable )
>
>  
>
> Looking into this I see that the issues is that the DataStream<> is not
> serializable. What would be the solution to this issue?
>
>  
>
> As I mentioned before, I would like that for each event from the
> continuous stream to use the initial fixed set, add the event to it and
> apply an operation.
>
> Stephan was mentioning at some point some possibility to create a
> DataSet and launch a batch processing while operating in stream mode– in
> case this is possible, can you give me a reference for it, because it
> might be the good solution to  use in case. I am thinking that I could
> keep the fixed set as a DataSet and as each new event comes, transform
> it into a dataset and then join with reference set and apply an operation
>
>  
>
> Regards,
>
>  
>
>  
>
>  
>
>  
>
> Dr. Radu Tudoran
>
> Research Engineer
>
> IT R&D Division
>
>  
>
> cid:image007.jpg@01CD52EB.AD060EE0
>
> HUAWEI TECHNOLOGIES Duesseldorf GmbH
>
> European Research Center
>
> Riesstrasse 25, 80992 München
>
>  
>
> E-mail: [hidden email] <mailto:[hidden email]>_
>
> Mobile: +49 15209084330 <tel:%2B49%2015209084330>
>
> Telephone: +49 891588344173 <tel:%2B49%20891588344173>
>
>  
>
> HUAWEI TECHNOLOGIES Duesseldorf GmbH
> Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
> <http://www.huawei.com/>
> Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
> Managing Director: Jingwen TAO, Wanzhou MENG, Lifang CHEN
> Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
> Geschäftsführer: Jingwen TAO, Wanzhou MENG, Lifang CHEN
>
> This e-mail and its attachments contain confidential information from
> HUAWEI, which is intended only for the person or entity whose address is
> listed above. Any use of the information contained herein in any way
> (including, but not limited to, total or partial disclosure,
> reproduction, or dissemination) by persons other than the intended
> recipient(s) is prohibited. If you receive this e-mail in error, please
> notify the sender by phone or email immediately and delete it!
>
>  
>
> *From:*Vieru, Mihail [mailto:[hidden email]
> <mailto:[hidden email]>]
> *Sent:* Tuesday, December 01, 2015 4:55 PM
> *To:* [hidden email] <mailto:[hidden email]>
> *Subject:* NPE with Flink Streaming from Kafka
>
>  
>
> Hi,
>
> we get the following NullPointerException after ~50 minutes when running
> a streaming job with windowing and state that reads data from Kafka and
> writes the result to local FS.
>
> There are around 170 million messages to be processed, Flink 0.10.1
> stops at ~8 million.
>
> Flink runs locally, started with the "start-cluster-streaming.sh" script.
>
>
> 12/01/2015 15:06:24    Job execution switched to status RUNNING.
> 12/01/2015 15:06:24    Source: Custom Source -> Map -> Map(1/1) switched
> to SCHEDULED
> 12/01/2015 15:06:24    Source: Custom Source -> Map -> Map(1/1) switched
> to DEPLOYING
> 12/01/2015 15:06:24    Fast TumblingTimeWindows(5000) of Reduce at
> main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) switched to
> SCHEDULED
> 12/01/2015 15:06:24    Fast TumblingTimeWindows(5000) of Reduce at
> main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) switched to
> DEPLOYING
> 12/01/2015 15:06:24    Source: Custom Source -> Map -> Map(1/1) switched
> to RUNNING
> 12/01/2015 15:06:24    Fast TumblingTimeWindows(5000) of Reduce at
> main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) switched to
> RUNNING
> 12/01/2015 15:56:08    Fast TumblingTimeWindows(5000) of Reduce at
> main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) switched to
> CANCELED
> 12/01/2015 15:56:08    Source: Custom Source -> Map -> Map(1/1) switched
> to FAILED
> java.lang.Exception
>     at
> org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher.run(LegacyFetcher.java:242)
>     at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.run(FlinkKafkaConsumer.java:397)
>     at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58)
>     at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:55)
>     at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:218)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
>     at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.NullPointerException
>     at
> org.I0Itec.zkclient.ZkConnection.writeDataReturnStat(ZkConnection.java:115)
>     at org.I0Itec.zkclient.ZkClient$10.call(ZkClient.java:817)
>     at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:675)
>     at org.I0Itec.zkclient.ZkClient.writeDataReturnStat(ZkClient.java:813)
>     at org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:808)
>     at org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:777)
>     at kafka.utils.ZkUtils$.updatePersistentPath(ZkUtils.scala:332)
>     at kafka.utils.ZkUtils.updatePersistentPath(ZkUtils.scala)
>     at
> org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.setOffsetInZooKeeper(ZookeeperOffsetHandler.java:112)
>     at
> org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.commit(ZookeeperOffsetHandler.java:80)
>     at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer$PeriodicOffsetCommitter.run(FlinkKafkaConsumer.java:632)
>
> Any ideas on what could cause this behaviour?
>
>  
>
> Best,
>
> Mihail
>
>  
>


signature.asc (836 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

RE: Question about DataStream serialization

Radu Tudoran
Hi,

Thanks for the answer - it is helpful.
The issue that remains is why is the open function not being executed before the flatmap to load the data in the OperatorState.

I used something like - and I observe that the dataset is not initialized when being used in the flatmap function

env.socketTextStream
.map() -> to transform data to a Tuple1<String>
.keyby(0) -> to enable the usage of the operatorState which I saw requires keyed structured
.flatmap(RichFlatMapFunction<Tuple1<String>, String>   -> the function
{
private OperatorState<String> dataset;
@Override
public void flatMap(
{
Dataset -> use ...is empty
}
@Override
public void open(
{
dataset -> load
}
})



Dr. Radu Tudoran
Research Engineer
IT R&D Division


HUAWEI TECHNOLOGIES Duesseldorf GmbH
European Research Center
Riesstrasse 25, 80992 München

E-mail: [hidden email]
Mobile: +49 15209084330
Telephone: +49 891588344173

HUAWEI TECHNOLOGIES Duesseldorf GmbH
Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
Managing Director: Jingwen TAO, Wanzhou MENG, Lifang CHEN
Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
Geschäftsführer: Jingwen TAO, Wanzhou MENG, Lifang CHEN
This e-mail and its attachments contain confidential information from HUAWEI, which is intended only for the person or entity whose address is listed above. Any use of the information contained herein in any way (including, but not limited to, total or partial disclosure, reproduction, or dissemination) by persons other than the intended recipient(s) is prohibited. If you receive this e-mail in error, please notify the sender by phone or email immediately and delete it!

-----Original Message-----
From: Matthias J. Sax [mailto:[hidden email]]
Sent: Tuesday, December 08, 2015 8:42 AM
To: [hidden email]
Subject: Re: Question about DataStream serialization

Hi Radu,

you are right. The open() method is called for each parallel instance of a rich function. Thus, if all instanced use the same code, you might read the same data multiple times.

The easiest was to distinguish different instanced within open() is to user the RuntimeContext. If offers two methods  "int getNumberOfParallelSubtasks()" and "int getIndexOfThisSubtask()" that you can use to compute your own partitioning within open().

For example (just a sketch):

@Override
public void open(Configuration parameters) throws Exception {
  RuntimeContext context = super.getRuntimeContext();
  int dop = context.getNumberOfParallelSubtasks();
  int idx = context.getIndexOfThisSubtask();

  // open file
  // get size of file in bytes

  // seek to partition #idx:
  long seek = fileSize * idx / dop;

  // read "fileSize/dop" bytes
}

Hope this helps.

-Matthias


On 12/08/2015 04:28 AM, Radu Tudoran wrote:

> Hi,
>
>  
>
> Taking the example you mentioned of using RichFlatMapFunction and in
> the
> open() reading a file.
>
> Would this open function be executed on each node where the
> RichFlatMapFunction gets executed? (I have done some tests and I would
> get the feeling it does – but I wanted to double - check )
>
> If so, would this mean that the same data will be loaded multiple
> times on each parallel instance? Is there anyway, this can be
> prevented and the data to be hashed and partitioned somehow across nodes?
>
>  
>
> Would using the operator state help?:
>
> “
>
> OperatorState*<*MyList<String>*>*dataset*;*
>
> ”
>
> I would be curious in this case how could the open function look like
> to initialize the data for this operator state:
>
>  
>
>  
>
> I have tried to just read a file and write it into the dataset, but I
> encountered a strange behavior that would look like the flatmap
> function gets executed before the open function, which leads to using
> an empty dataset in the flatmap function while when this finish
> executing the dataset gets loaded. Is this an error or I am doing something wrong?
>
>  
>
>  
>
>  
>
> Dr. Radu Tudoran
>
> Research Engineer
>
> IT R&D Division
>
>  
>
> cid:image007.jpg@01CD52EB.AD060EE0
>
> HUAWEI TECHNOLOGIES Duesseldorf GmbH
>
> European Research Center
>
> Riesstrasse 25, 80992 München
>
>  
>
> E-mail: _radu.tudoran@huawei.com_
>
> Mobile: +49 15209084330
>
> Telephone: +49 891588344173
>
>  
>
> HUAWEI TECHNOLOGIES Duesseldorf GmbH
> Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
> <http://www.huawei.com/> Registered Office: Düsseldorf, Register Court
> Düsseldorf, HRB 56063, Managing Director: Jingwen TAO, Wanzhou MENG,
> Lifang CHEN Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf,
> HRB 56063,
> Geschäftsführer: Jingwen TAO, Wanzhou MENG, Lifang CHEN
>
> This e-mail and its attachments contain confidential information from
> HUAWEI, which is intended only for the person or entity whose address
> is listed above. Any use of the information contained herein in any
> way (including, but not limited to, total or partial disclosure,
> reproduction, or dissemination) by persons other than the intended
> recipient(s) is prohibited. If you receive this e-mail in error,
> please notify the sender by phone or email immediately and delete it!
>
>  
>
> *From:*Robert Metzger [mailto:[hidden email]]
> *Sent:* Tuesday, December 01, 2015 6:21 PM
> *To:* [hidden email]
> *Cc:* Goetz Brasche
> *Subject:* Re: Question about DataStream serialization
>
>  
>
> Hi Radu,
>
>  
>
> both emails reached the mailing list :)
>
>  
>
> You can not reference to DataSets or DataStreams from inside user
> defined functions. Both are just abstractions for a data set or
> stream, so the elements are not really inside the set.
>
>  
>
> We don't have any support for mixing the DataSet and DataStream API.
>
>  
>
> For your use case, I would recommend you to use a RichFlatMapFunction
> and in the open() call read the text file.
>
>  
>
>  
>
>  
>
> On Tue, Dec 1, 2015 at 5:03 PM, Radu Tudoran <[hidden email]
> <mailto:[hidden email]>> wrote:
>
>  
>
> Hello,
>
>  
>
> I am not sure if this message was received on the user list, if so I
> apologies for duplicate messages
>
>  
>
> I have the following scenario
>
>  
>
> ·         Reading a fixed set
>
> DataStream<String> /fixedset/ = env.readtextFile(…
>
> ·         Reading a continuous stream of data
>
> DataStream<String> /stream/ = ….
>
>  
>
> I would need that for each event read from the continuous stream to
> make some operations onit and on the /fixedsettoghether/
>
>  
>
>  
>
> I have tried something like
>
>  
>
> final myObject.referenceStaticSet = fixedset;
>
> stream.map(new MapFunction<String, String>() {
>
>                      @Override
>
>                      public String map(String arg0) throws Exception {
>
>                          
>
>                            //for example:   final string2add = arg0;
>
>                                                                 //the
> goal of below function would be to add the string2add to the fixedset
>
>                            myObject.referenceStaticSet =
> myObject.referenceStaticSet.flatMap(new FlatMapFunction<String,
> String>() {
>
>                          
>
>                                   @Override
>
>                                   public void flatMap(String arg0,
> Collector<String> arg1)
>
>                                                                                
> //for example adding to the fixed set also the string2add object:  
> arg1.collect(string2add);
>
>                                                
>                                 }
>
> …
>
> }
>
>  
>
> However,  I get an exception (Exception in thread "main"
> _org.apache.flink.api.common.InvalidProgramException_: ) that object
> is not serializable (Object MyClass$3@a71081 not serializable )
>
>  
>
> Looking into this I see that the issues is that the DataStream<> is
> not serializable. What would be the solution to this issue?
>
>  
>
> As I mentioned before, I would like that for each event from the
> continuous stream to use the initial fixed set, add the event to it
> and apply an operation.
>
> Stephan was mentioning at some point some possibility to create a
> DataSet and launch a batch processing while operating in stream mode–
> in case this is possible, can you give me a reference for it, because
> it might be the good solution to  use in case. I am thinking that I
> could keep the fixed set as a DataSet and as each new event comes,
> transform it into a dataset and then join with reference set and apply
> an operation
>
>  
>
> Regards,
>
>  
>
>  
>
>  
>
>  
>
> Dr. Radu Tudoran
>
> Research Engineer
>
> IT R&D Division
>
>  
>
> cid:image007.jpg@01CD52EB.AD060EE0
>
> HUAWEI TECHNOLOGIES Duesseldorf GmbH
>
> European Research Center
>
> Riesstrasse 25, 80992 München
>
>  
>
> E-mail: [hidden email] <mailto:[hidden email]>_
>
> Mobile: +49 15209084330 <tel:%2B49%2015209084330>
>
> Telephone: +49 891588344173 <tel:%2B49%20891588344173>
>
>  
>
> HUAWEI TECHNOLOGIES Duesseldorf GmbH
> Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
> <http://www.huawei.com/> Registered Office: Düsseldorf, Register Court
> Düsseldorf, HRB 56063, Managing Director: Jingwen TAO, Wanzhou MENG,
> Lifang CHEN Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf,
> HRB 56063,
> Geschäftsführer: Jingwen TAO, Wanzhou MENG, Lifang CHEN
>
> This e-mail and its attachments contain confidential information from
> HUAWEI, which is intended only for the person or entity whose address
> is listed above. Any use of the information contained herein in any
> way (including, but not limited to, total or partial disclosure,
> reproduction, or dissemination) by persons other than the intended
> recipient(s) is prohibited. If you receive this e-mail in error,
> please notify the sender by phone or email immediately and delete it!
>
>  
>
> *From:*Vieru, Mihail [mailto:[hidden email]
> <mailto:[hidden email]>]
> *Sent:* Tuesday, December 01, 2015 4:55 PM
> *To:* [hidden email] <mailto:[hidden email]>
> *Subject:* NPE with Flink Streaming from Kafka
>
>  
>
> Hi,
>
> we get the following NullPointerException after ~50 minutes when
> running a streaming job with windowing and state that reads data from
> Kafka and writes the result to local FS.
>
> There are around 170 million messages to be processed, Flink 0.10.1
> stops at ~8 million.
>
> Flink runs locally, started with the "start-cluster-streaming.sh" script.
>
>
> 12/01/2015 15:06:24    Job execution switched to status RUNNING.
> 12/01/2015 15:06:24    Source: Custom Source -> Map -> Map(1/1) switched
> to SCHEDULED
> 12/01/2015 15:06:24    Source: Custom Source -> Map -> Map(1/1) switched
> to DEPLOYING
> 12/01/2015 15:06:24    Fast TumblingTimeWindows(5000) of Reduce at
> main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) switched to
> SCHEDULED
> 12/01/2015 15:06:24    Fast TumblingTimeWindows(5000) of Reduce at
> main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) switched to
> DEPLOYING
> 12/01/2015 15:06:24    Source: Custom Source -> Map -> Map(1/1) switched
> to RUNNING
> 12/01/2015 15:06:24    Fast TumblingTimeWindows(5000) of Reduce at
> main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) switched to
> RUNNING
> 12/01/2015 15:56:08    Fast TumblingTimeWindows(5000) of Reduce at
> main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) switched to
> CANCELED
> 12/01/2015 15:56:08    Source: Custom Source -> Map -> Map(1/1) switched
> to FAILED
> java.lang.Exception
>     at
> org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher.run(LegacyFetcher.java:242)
>     at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.run(FlinkKafkaConsumer.java:397)
>     at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58)
>     at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:55)
>     at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:218)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
>     at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.NullPointerException
>     at
> org.I0Itec.zkclient.ZkConnection.writeDataReturnStat(ZkConnection.java:115)
>     at org.I0Itec.zkclient.ZkClient$10.call(ZkClient.java:817)
>     at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:675)
>     at org.I0Itec.zkclient.ZkClient.writeDataReturnStat(ZkClient.java:813)
>     at org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:808)
>     at org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:777)
>     at kafka.utils.ZkUtils$.updatePersistentPath(ZkUtils.scala:332)
>     at kafka.utils.ZkUtils.updatePersistentPath(ZkUtils.scala)
>     at
> org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.setOffsetInZooKeeper(ZookeeperOffsetHandler.java:112)
>     at
> org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.commit(ZookeeperOffsetHandler.java:80)
>     at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer$Periodi
> cOffsetCommitter.run(FlinkKafkaConsumer.java:632)
>
> Any ideas on what could cause this behaviour?
>
>  
>
> Best,
>
> Mihail
>
>  
>

Reply | Threaded
Open this post in threaded view
|

Re: Question about DataStream serialization

Matthias J. Sax-2
Hi,

I think (but please someone verify) that an OperatorState is actually
not required -- I think that "open()" is called after a failure and
recovery, too. So you can use a regular member variable to store the
data instead of an OperatorState. In case of failure, you just re-read
the data as on regular start-up.

-Matthias


On 12/08/2015 09:38 AM, Radu Tudoran wrote:

> Hi,
>
> Thanks for the answer - it is helpful.
> The issue that remains is why is the open function not being executed before the flatmap to load the data in the OperatorState.
>
> I used something like - and I observe that the dataset is not initialized when being used in the flatmap function
>
> env.socketTextStream
> .map() -> to transform data to a Tuple1<String>
> .keyby(0) -> to enable the usage of the operatorState which I saw requires keyed structured
> .flatmap(RichFlatMapFunction<Tuple1<String>, String>   -> the function
> {
> private OperatorState<String> dataset;
> @Override
> public void flatMap(
> {
> Dataset -> use ...is empty
> }
> @Override
> public void open(
> {
> dataset -> load
> }
> })
>
>
>
> Dr. Radu Tudoran
> Research Engineer
> IT R&D Division
>
>
> HUAWEI TECHNOLOGIES Duesseldorf GmbH
> European Research Center
> Riesstrasse 25, 80992 München
>
> E-mail: [hidden email]
> Mobile: +49 15209084330
> Telephone: +49 891588344173
>
> HUAWEI TECHNOLOGIES Duesseldorf GmbH
> Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
> Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
> Managing Director: Jingwen TAO, Wanzhou MENG, Lifang CHEN
> Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
> Geschäftsführer: Jingwen TAO, Wanzhou MENG, Lifang CHEN
> This e-mail and its attachments contain confidential information from HUAWEI, which is intended only for the person or entity whose address is listed above. Any use of the information contained herein in any way (including, but not limited to, total or partial disclosure, reproduction, or dissemination) by persons other than the intended recipient(s) is prohibited. If you receive this e-mail in error, please notify the sender by phone or email immediately and delete it!
>
> -----Original Message-----
> From: Matthias J. Sax [mailto:[hidden email]]
> Sent: Tuesday, December 08, 2015 8:42 AM
> To: [hidden email]
> Subject: Re: Question about DataStream serialization
>
> Hi Radu,
>
> you are right. The open() method is called for each parallel instance of a rich function. Thus, if all instanced use the same code, you might read the same data multiple times.
>
> The easiest was to distinguish different instanced within open() is to user the RuntimeContext. If offers two methods  "int getNumberOfParallelSubtasks()" and "int getIndexOfThisSubtask()" that you can use to compute your own partitioning within open().
>
> For example (just a sketch):
>
> @Override
> public void open(Configuration parameters) throws Exception {
>   RuntimeContext context = super.getRuntimeContext();
>   int dop = context.getNumberOfParallelSubtasks();
>   int idx = context.getIndexOfThisSubtask();
>
>   // open file
>   // get size of file in bytes
>
>   // seek to partition #idx:
>   long seek = fileSize * idx / dop;
>
>   // read "fileSize/dop" bytes
> }
>
> Hope this helps.
>
> -Matthias
>
>
> On 12/08/2015 04:28 AM, Radu Tudoran wrote:
>> Hi,
>>
>>  
>>
>> Taking the example you mentioned of using RichFlatMapFunction and in
>> the
>> open() reading a file.
>>
>> Would this open function be executed on each node where the
>> RichFlatMapFunction gets executed? (I have done some tests and I would
>> get the feeling it does – but I wanted to double - check )
>>
>> If so, would this mean that the same data will be loaded multiple
>> times on each parallel instance? Is there anyway, this can be
>> prevented and the data to be hashed and partitioned somehow across nodes?
>>
>>  
>>
>> Would using the operator state help?:
>>
>> “
>>
>> OperatorState*<*MyList<String>*>*dataset*;*
>>
>> ”
>>
>> I would be curious in this case how could the open function look like
>> to initialize the data for this operator state:
>>
>>  
>>
>>  
>>
>> I have tried to just read a file and write it into the dataset, but I
>> encountered a strange behavior that would look like the flatmap
>> function gets executed before the open function, which leads to using
>> an empty dataset in the flatmap function while when this finish
>> executing the dataset gets loaded. Is this an error or I am doing something wrong?
>>
>>  
>>
>>  
>>
>>  
>>
>> Dr. Radu Tudoran
>>
>> Research Engineer
>>
>> IT R&D Division
>>
>>  
>>
>> cid:image007.jpg@01CD52EB.AD060EE0
>>
>> HUAWEI TECHNOLOGIES Duesseldorf GmbH
>>
>> European Research Center
>>
>> Riesstrasse 25, 80992 München
>>
>>  
>>
>> E-mail: _radu.tudoran@huawei.com_
>>
>> Mobile: +49 15209084330
>>
>> Telephone: +49 891588344173
>>
>>  
>>
>> HUAWEI TECHNOLOGIES Duesseldorf GmbH
>> Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
>> <http://www.huawei.com/> Registered Office: Düsseldorf, Register Court
>> Düsseldorf, HRB 56063, Managing Director: Jingwen TAO, Wanzhou MENG,
>> Lifang CHEN Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf,
>> HRB 56063,
>> Geschäftsführer: Jingwen TAO, Wanzhou MENG, Lifang CHEN
>>
>> This e-mail and its attachments contain confidential information from
>> HUAWEI, which is intended only for the person or entity whose address
>> is listed above. Any use of the information contained herein in any
>> way (including, but not limited to, total or partial disclosure,
>> reproduction, or dissemination) by persons other than the intended
>> recipient(s) is prohibited. If you receive this e-mail in error,
>> please notify the sender by phone or email immediately and delete it!
>>
>>  
>>
>> *From:*Robert Metzger [mailto:[hidden email]]
>> *Sent:* Tuesday, December 01, 2015 6:21 PM
>> *To:* [hidden email]
>> *Cc:* Goetz Brasche
>> *Subject:* Re: Question about DataStream serialization
>>
>>  
>>
>> Hi Radu,
>>
>>  
>>
>> both emails reached the mailing list :)
>>
>>  
>>
>> You can not reference to DataSets or DataStreams from inside user
>> defined functions. Both are just abstractions for a data set or
>> stream, so the elements are not really inside the set.
>>
>>  
>>
>> We don't have any support for mixing the DataSet and DataStream API.
>>
>>  
>>
>> For your use case, I would recommend you to use a RichFlatMapFunction
>> and in the open() call read the text file.
>>
>>  
>>
>>  
>>
>>  
>>
>> On Tue, Dec 1, 2015 at 5:03 PM, Radu Tudoran <[hidden email]
>> <mailto:[hidden email]>> wrote:
>>
>>  
>>
>> Hello,
>>
>>  
>>
>> I am not sure if this message was received on the user list, if so I
>> apologies for duplicate messages
>>
>>  
>>
>> I have the following scenario
>>
>>  
>>
>> ·         Reading a fixed set
>>
>> DataStream<String> /fixedset/ = env.readtextFile(…
>>
>> ·         Reading a continuous stream of data
>>
>> DataStream<String> /stream/ = ….
>>
>>  
>>
>> I would need that for each event read from the continuous stream to
>> make some operations onit and on the /fixedsettoghether/
>>
>>  
>>
>>  
>>
>> I have tried something like
>>
>>  
>>
>> final myObject.referenceStaticSet = fixedset;
>>
>> stream.map(new MapFunction<String, String>() {
>>
>>                      @Override
>>
>>                      public String map(String arg0) throws Exception {
>>
>>                          
>>
>>                            //for example:   final string2add = arg0;
>>
>>                                                                 //the
>> goal of below function would be to add the string2add to the fixedset
>>
>>                            myObject.referenceStaticSet =
>> myObject.referenceStaticSet.flatMap(new FlatMapFunction<String,
>> String>() {
>>
>>                          
>>
>>                                   @Override
>>
>>                                   public void flatMap(String arg0,
>> Collector<String> arg1)
>>
>>                                                                                
>> //for example adding to the fixed set also the string2add object:  
>> arg1.collect(string2add);
>>
>>                                                
>>                                 }
>>
>> …
>>
>> }
>>
>>  
>>
>> However,  I get an exception (Exception in thread "main"
>> _org.apache.flink.api.common.InvalidProgramException_: ) that object
>> is not serializable (Object MyClass$3@a71081 not serializable )
>>
>>  
>>
>> Looking into this I see that the issues is that the DataStream<> is
>> not serializable. What would be the solution to this issue?
>>
>>  
>>
>> As I mentioned before, I would like that for each event from the
>> continuous stream to use the initial fixed set, add the event to it
>> and apply an operation.
>>
>> Stephan was mentioning at some point some possibility to create a
>> DataSet and launch a batch processing while operating in stream mode–
>> in case this is possible, can you give me a reference for it, because
>> it might be the good solution to  use in case. I am thinking that I
>> could keep the fixed set as a DataSet and as each new event comes,
>> transform it into a dataset and then join with reference set and apply
>> an operation
>>
>>  
>>
>> Regards,
>>
>>  
>>
>>  
>>
>>  
>>
>>  
>>
>> Dr. Radu Tudoran
>>
>> Research Engineer
>>
>> IT R&D Division
>>
>>  
>>
>> cid:image007.jpg@01CD52EB.AD060EE0
>>
>> HUAWEI TECHNOLOGIES Duesseldorf GmbH
>>
>> European Research Center
>>
>> Riesstrasse 25, 80992 München
>>
>>  
>>
>> E-mail: [hidden email] <mailto:[hidden email]>_
>>
>> Mobile: +49 15209084330 <tel:%2B49%2015209084330>
>>
>> Telephone: +49 891588344173 <tel:%2B49%20891588344173>
>>
>>  
>>
>> HUAWEI TECHNOLOGIES Duesseldorf GmbH
>> Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
>> <http://www.huawei.com/> Registered Office: Düsseldorf, Register Court
>> Düsseldorf, HRB 56063, Managing Director: Jingwen TAO, Wanzhou MENG,
>> Lifang CHEN Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf,
>> HRB 56063,
>> Geschäftsführer: Jingwen TAO, Wanzhou MENG, Lifang CHEN
>>
>> This e-mail and its attachments contain confidential information from
>> HUAWEI, which is intended only for the person or entity whose address
>> is listed above. Any use of the information contained herein in any
>> way (including, but not limited to, total or partial disclosure,
>> reproduction, or dissemination) by persons other than the intended
>> recipient(s) is prohibited. If you receive this e-mail in error,
>> please notify the sender by phone or email immediately and delete it!
>>
>>  
>>
>> *From:*Vieru, Mihail [mailto:[hidden email]
>> <mailto:[hidden email]>]
>> *Sent:* Tuesday, December 01, 2015 4:55 PM
>> *To:* [hidden email] <mailto:[hidden email]>
>> *Subject:* NPE with Flink Streaming from Kafka
>>
>>  
>>
>> Hi,
>>
>> we get the following NullPointerException after ~50 minutes when
>> running a streaming job with windowing and state that reads data from
>> Kafka and writes the result to local FS.
>>
>> There are around 170 million messages to be processed, Flink 0.10.1
>> stops at ~8 million.
>>
>> Flink runs locally, started with the "start-cluster-streaming.sh" script.
>>
>>
>> 12/01/2015 15:06:24    Job execution switched to status RUNNING.
>> 12/01/2015 15:06:24    Source: Custom Source -> Map -> Map(1/1) switched
>> to SCHEDULED
>> 12/01/2015 15:06:24    Source: Custom Source -> Map -> Map(1/1) switched
>> to DEPLOYING
>> 12/01/2015 15:06:24    Fast TumblingTimeWindows(5000) of Reduce at
>> main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) switched to
>> SCHEDULED
>> 12/01/2015 15:06:24    Fast TumblingTimeWindows(5000) of Reduce at
>> main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) switched to
>> DEPLOYING
>> 12/01/2015 15:06:24    Source: Custom Source -> Map -> Map(1/1) switched
>> to RUNNING
>> 12/01/2015 15:06:24    Fast TumblingTimeWindows(5000) of Reduce at
>> main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) switched to
>> RUNNING
>> 12/01/2015 15:56:08    Fast TumblingTimeWindows(5000) of Reduce at
>> main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) switched to
>> CANCELED
>> 12/01/2015 15:56:08    Source: Custom Source -> Map -> Map(1/1) switched
>> to FAILED
>> java.lang.Exception
>>     at
>> org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher.run(LegacyFetcher.java:242)
>>     at
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.run(FlinkKafkaConsumer.java:397)
>>     at
>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58)
>>     at
>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:55)
>>     at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:218)
>>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
>>     at java.lang.Thread.run(Thread.java:745)
>> Caused by: java.lang.NullPointerException
>>     at
>> org.I0Itec.zkclient.ZkConnection.writeDataReturnStat(ZkConnection.java:115)
>>     at org.I0Itec.zkclient.ZkClient$10.call(ZkClient.java:817)
>>     at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:675)
>>     at org.I0Itec.zkclient.ZkClient.writeDataReturnStat(ZkClient.java:813)
>>     at org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:808)
>>     at org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:777)
>>     at kafka.utils.ZkUtils$.updatePersistentPath(ZkUtils.scala:332)
>>     at kafka.utils.ZkUtils.updatePersistentPath(ZkUtils.scala)
>>     at
>> org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.setOffsetInZooKeeper(ZookeeperOffsetHandler.java:112)
>>     at
>> org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.commit(ZookeeperOffsetHandler.java:80)
>>     at
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer$Periodi
>> cOffsetCommitter.run(FlinkKafkaConsumer.java:632)
>>
>> Any ideas on what could cause this behaviour?
>>
>>  
>>
>> Best,
>>
>> Mihail
>>
>>  
>>
>


signature.asc (836 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Question about DataStream serialization

Aljoscha Krettek
Hi,
if the open() method is indeed not called before the first flatMap() call then this would be a bug. Could you please verify that this is the case and maybe provide an example where this is observable?

Cheers,
Aljoscha

> On 08 Dec 2015, at 10:41, Matthias J. Sax <[hidden email]> wrote:
>
> Hi,
>
> I think (but please someone verify) that an OperatorState is actually
> not required -- I think that "open()" is called after a failure and
> recovery, too. So you can use a regular member variable to store the
> data instead of an OperatorState. In case of failure, you just re-read
> the data as on regular start-up.
>
> -Matthias
>
>
> On 12/08/2015 09:38 AM, Radu Tudoran wrote:
>> Hi,
>>
>> Thanks for the answer - it is helpful.
>> The issue that remains is why is the open function not being executed before the flatmap to load the data in the OperatorState.
>>
>> I used something like - and I observe that the dataset is not initialized when being used in the flatmap function
>>
>> env.socketTextStream
>> .map() -> to transform data to a Tuple1<String>
>> .keyby(0) -> to enable the usage of the operatorState which I saw requires keyed structured
>> .flatmap(RichFlatMapFunction<Tuple1<String>, String>   -> the function
>> {
>> private OperatorState<String> dataset;
>> @Override
>> public void flatMap(
>> {
>> Dataset -> use ...is empty
>> }
>> @Override
>> public void open(
>> {
>> dataset -> load
>> }
>> })
>>
>>
>>
>> Dr. Radu Tudoran
>> Research Engineer
>> IT R&D Division
>>
>>
>> HUAWEI TECHNOLOGIES Duesseldorf GmbH
>> European Research Center
>> Riesstrasse 25, 80992 München
>>
>> E-mail: [hidden email]
>> Mobile: +49 15209084330
>> Telephone: +49 891588344173
>>
>> HUAWEI TECHNOLOGIES Duesseldorf GmbH
>> Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
>> Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
>> Managing Director: Jingwen TAO, Wanzhou MENG, Lifang CHEN
>> Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
>> Geschäftsführer: Jingwen TAO, Wanzhou MENG, Lifang CHEN
>> This e-mail and its attachments contain confidential information from HUAWEI, which is intended only for the person or entity whose address is listed above. Any use of the information contained herein in any way (including, but not limited to, total or partial disclosure, reproduction, or dissemination) by persons other than the intended recipient(s) is prohibited. If you receive this e-mail in error, please notify the sender by phone or email immediately and delete it!
>>
>> -----Original Message-----
>> From: Matthias J. Sax [mailto:[hidden email]]
>> Sent: Tuesday, December 08, 2015 8:42 AM
>> To: [hidden email]
>> Subject: Re: Question about DataStream serialization
>>
>> Hi Radu,
>>
>> you are right. The open() method is called for each parallel instance of a rich function. Thus, if all instanced use the same code, you might read the same data multiple times.
>>
>> The easiest was to distinguish different instanced within open() is to user the RuntimeContext. If offers two methods  "int getNumberOfParallelSubtasks()" and "int getIndexOfThisSubtask()" that you can use to compute your own partitioning within open().
>>
>> For example (just a sketch):
>>
>> @Override
>> public void open(Configuration parameters) throws Exception {
>>  RuntimeContext context = super.getRuntimeContext();
>>  int dop = context.getNumberOfParallelSubtasks();
>>  int idx = context.getIndexOfThisSubtask();
>>
>>  // open file
>>  // get size of file in bytes
>>
>>  // seek to partition #idx:
>>  long seek = fileSize * idx / dop;
>>
>>  // read "fileSize/dop" bytes
>> }
>>
>> Hope this helps.
>>
>> -Matthias
>>
>>
>> On 12/08/2015 04:28 AM, Radu Tudoran wrote:
>>> Hi,
>>>
>>>
>>>
>>> Taking the example you mentioned of using RichFlatMapFunction and in
>>> the
>>> open() reading a file.
>>>
>>> Would this open function be executed on each node where the
>>> RichFlatMapFunction gets executed? (I have done some tests and I would
>>> get the feeling it does – but I wanted to double - check )
>>>
>>> If so, would this mean that the same data will be loaded multiple
>>> times on each parallel instance? Is there anyway, this can be
>>> prevented and the data to be hashed and partitioned somehow across nodes?
>>>
>>>
>>>
>>> Would using the operator state help?:
>>>
>>> “
>>>
>>> OperatorState*<*MyList<String>*>*dataset*;*
>>>
>>> ”
>>>
>>> I would be curious in this case how could the open function look like
>>> to initialize the data for this operator state:
>>>
>>>
>>>
>>>
>>>
>>> I have tried to just read a file and write it into the dataset, but I
>>> encountered a strange behavior that would look like the flatmap
>>> function gets executed before the open function, which leads to using
>>> an empty dataset in the flatmap function while when this finish
>>> executing the dataset gets loaded. Is this an error or I am doing something wrong?
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> Dr. Radu Tudoran
>>>
>>> Research Engineer
>>>
>>> IT R&D Division
>>>
>>>
>>>
>>> cid:image007.jpg@01CD52EB.AD060EE0
>>>
>>> HUAWEI TECHNOLOGIES Duesseldorf GmbH
>>>
>>> European Research Center
>>>
>>> Riesstrasse 25, 80992 München
>>>
>>>
>>>
>>> E-mail: _radu.tudoran@huawei.com_
>>>
>>> Mobile: +49 15209084330
>>>
>>> Telephone: +49 891588344173
>>>
>>>
>>>
>>> HUAWEI TECHNOLOGIES Duesseldorf GmbH
>>> Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
>>> <http://www.huawei.com/> Registered Office: Düsseldorf, Register Court
>>> Düsseldorf, HRB 56063, Managing Director: Jingwen TAO, Wanzhou MENG,
>>> Lifang CHEN Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf,
>>> HRB 56063,
>>> Geschäftsführer: Jingwen TAO, Wanzhou MENG, Lifang CHEN
>>>
>>> This e-mail and its attachments contain confidential information from
>>> HUAWEI, which is intended only for the person or entity whose address
>>> is listed above. Any use of the information contained herein in any
>>> way (including, but not limited to, total or partial disclosure,
>>> reproduction, or dissemination) by persons other than the intended
>>> recipient(s) is prohibited. If you receive this e-mail in error,
>>> please notify the sender by phone or email immediately and delete it!
>>>
>>>
>>>
>>> *From:*Robert Metzger [mailto:[hidden email]]
>>> *Sent:* Tuesday, December 01, 2015 6:21 PM
>>> *To:* [hidden email]
>>> *Cc:* Goetz Brasche
>>> *Subject:* Re: Question about DataStream serialization
>>>
>>>
>>>
>>> Hi Radu,
>>>
>>>
>>>
>>> both emails reached the mailing list :)
>>>
>>>
>>>
>>> You can not reference to DataSets or DataStreams from inside user
>>> defined functions. Both are just abstractions for a data set or
>>> stream, so the elements are not really inside the set.
>>>
>>>
>>>
>>> We don't have any support for mixing the DataSet and DataStream API.
>>>
>>>
>>>
>>> For your use case, I would recommend you to use a RichFlatMapFunction
>>> and in the open() call read the text file.
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> On Tue, Dec 1, 2015 at 5:03 PM, Radu Tudoran <[hidden email]
>>> <mailto:[hidden email]>> wrote:
>>>
>>>
>>>
>>> Hello,
>>>
>>>
>>>
>>> I am not sure if this message was received on the user list, if so I
>>> apologies for duplicate messages
>>>
>>>
>>>
>>> I have the following scenario
>>>
>>>
>>>
>>> ·         Reading a fixed set
>>>
>>> DataStream<String> /fixedset/ = env.readtextFile(…
>>>
>>> ·         Reading a continuous stream of data
>>>
>>> DataStream<String> /stream/ = ….
>>>
>>>
>>>
>>> I would need that for each event read from the continuous stream to
>>> make some operations onit and on the /fixedsettoghether/
>>>
>>>
>>>
>>>
>>>
>>> I have tried something like
>>>
>>>
>>>
>>> final myObject.referenceStaticSet = fixedset;
>>>
>>> stream.map(new MapFunction<String, String>() {
>>>
>>>                     @Override
>>>
>>>                     public String map(String arg0) throws Exception {
>>>
>>>
>>>
>>>                           //for example:   final string2add = arg0;
>>>
>>>                                                                //the
>>> goal of below function would be to add the string2add to the fixedset
>>>
>>>                           myObject.referenceStaticSet =
>>> myObject.referenceStaticSet.flatMap(new FlatMapFunction<String,
>>> String>() {
>>>
>>>
>>>
>>>                                  @Override
>>>
>>>                                  public void flatMap(String arg0,
>>> Collector<String> arg1)
>>>
>>>
>>> //for example adding to the fixed set also the string2add object:  
>>> arg1.collect(string2add);
>>>
>>>
>>>                                }
>>>
>>> …
>>>
>>> }
>>>
>>>
>>>
>>> However,  I get an exception (Exception in thread "main"
>>> _org.apache.flink.api.common.InvalidProgramException_: ) that object
>>> is not serializable (Object MyClass$3@a71081 not serializable )
>>>
>>>
>>>
>>> Looking into this I see that the issues is that the DataStream<> is
>>> not serializable. What would be the solution to this issue?
>>>
>>>
>>>
>>> As I mentioned before, I would like that for each event from the
>>> continuous stream to use the initial fixed set, add the event to it
>>> and apply an operation.
>>>
>>> Stephan was mentioning at some point some possibility to create a
>>> DataSet and launch a batch processing while operating in stream mode–
>>> in case this is possible, can you give me a reference for it, because
>>> it might be the good solution to  use in case. I am thinking that I
>>> could keep the fixed set as a DataSet and as each new event comes,
>>> transform it into a dataset and then join with reference set and apply
>>> an operation
>>>
>>>
>>>
>>> Regards,
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> Dr. Radu Tudoran
>>>
>>> Research Engineer
>>>
>>> IT R&D Division
>>>
>>>
>>>
>>> cid:image007.jpg@01CD52EB.AD060EE0
>>>
>>> HUAWEI TECHNOLOGIES Duesseldorf GmbH
>>>
>>> European Research Center
>>>
>>> Riesstrasse 25, 80992 München
>>>
>>>
>>>
>>> E-mail: [hidden email] <mailto:[hidden email]>_
>>>
>>> Mobile: +49 15209084330 <tel:%2B49%2015209084330>
>>>
>>> Telephone: +49 891588344173 <tel:%2B49%20891588344173>
>>>
>>>
>>>
>>> HUAWEI TECHNOLOGIES Duesseldorf GmbH
>>> Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
>>> <http://www.huawei.com/> Registered Office: Düsseldorf, Register Court
>>> Düsseldorf, HRB 56063, Managing Director: Jingwen TAO, Wanzhou MENG,
>>> Lifang CHEN Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf,
>>> HRB 56063,
>>> Geschäftsführer: Jingwen TAO, Wanzhou MENG, Lifang CHEN
>>>
>>> This e-mail and its attachments contain confidential information from
>>> HUAWEI, which is intended only for the person or entity whose address
>>> is listed above. Any use of the information contained herein in any
>>> way (including, but not limited to, total or partial disclosure,
>>> reproduction, or dissemination) by persons other than the intended
>>> recipient(s) is prohibited. If you receive this e-mail in error,
>>> please notify the sender by phone or email immediately and delete it!
>>>
>>>
>>>
>>> *From:*Vieru, Mihail [mailto:[hidden email]
>>> <mailto:[hidden email]>]
>>> *Sent:* Tuesday, December 01, 2015 4:55 PM
>>> *To:* [hidden email] <mailto:[hidden email]>
>>> *Subject:* NPE with Flink Streaming from Kafka
>>>
>>>
>>>
>>> Hi,
>>>
>>> we get the following NullPointerException after ~50 minutes when
>>> running a streaming job with windowing and state that reads data from
>>> Kafka and writes the result to local FS.
>>>
>>> There are around 170 million messages to be processed, Flink 0.10.1
>>> stops at ~8 million.
>>>
>>> Flink runs locally, started with the "start-cluster-streaming.sh" script.
>>>
>>>
>>> 12/01/2015 15:06:24    Job execution switched to status RUNNING.
>>> 12/01/2015 15:06:24    Source: Custom Source -> Map -> Map(1/1) switched
>>> to SCHEDULED
>>> 12/01/2015 15:06:24    Source: Custom Source -> Map -> Map(1/1) switched
>>> to DEPLOYING
>>> 12/01/2015 15:06:24    Fast TumblingTimeWindows(5000) of Reduce at
>>> main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) switched to
>>> SCHEDULED
>>> 12/01/2015 15:06:24    Fast TumblingTimeWindows(5000) of Reduce at
>>> main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) switched to
>>> DEPLOYING
>>> 12/01/2015 15:06:24    Source: Custom Source -> Map -> Map(1/1) switched
>>> to RUNNING
>>> 12/01/2015 15:06:24    Fast TumblingTimeWindows(5000) of Reduce at
>>> main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) switched to
>>> RUNNING
>>> 12/01/2015 15:56:08    Fast TumblingTimeWindows(5000) of Reduce at
>>> main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) switched to
>>> CANCELED
>>> 12/01/2015 15:56:08    Source: Custom Source -> Map -> Map(1/1) switched
>>> to FAILED
>>> java.lang.Exception
>>>    at
>>> org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher.run(LegacyFetcher.java:242)
>>>    at
>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.run(FlinkKafkaConsumer.java:397)
>>>    at
>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58)
>>>    at
>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:55)
>>>    at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:218)
>>>    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
>>>    at java.lang.Thread.run(Thread.java:745)
>>> Caused by: java.lang.NullPointerException
>>>    at
>>> org.I0Itec.zkclient.ZkConnection.writeDataReturnStat(ZkConnection.java:115)
>>>    at org.I0Itec.zkclient.ZkClient$10.call(ZkClient.java:817)
>>>    at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:675)
>>>    at org.I0Itec.zkclient.ZkClient.writeDataReturnStat(ZkClient.java:813)
>>>    at org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:808)
>>>    at org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:777)
>>>    at kafka.utils.ZkUtils$.updatePersistentPath(ZkUtils.scala:332)
>>>    at kafka.utils.ZkUtils.updatePersistentPath(ZkUtils.scala)
>>>    at
>>> org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.setOffsetInZooKeeper(ZookeeperOffsetHandler.java:112)
>>>    at
>>> org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.commit(ZookeeperOffsetHandler.java:80)
>>>    at
>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer$Periodi
>>> cOffsetCommitter.run(FlinkKafkaConsumer.java:632)
>>>
>>> Any ideas on what could cause this behaviour?
>>>
>>>
>>>
>>> Best,
>>>
>>> Mihail
>>>
>>>
>>>
>>
>

Reply | Threaded
Open this post in threaded view
|

RE: Question about DataStream serialization

Radu Tudoran
Hi,

I attached below a function that shows the issue and that operatorstate does not have the initialized value from the open function before the flatmap is called. You can see this as the print will not show anything. If you remove on the other hand the initialization loop and put a non zero value for the dataset flag than the print will work.



public static void main(String[] args) throws Exception {

                final StreamExecutionEnvironment env = StreamExecutionEnvironment
                                .getExecutionEnvironment();

                DataStream<String> stream = env
                                .socketTextStream("localhost", 16333, '\n')
                                .map(new MapFunction<String, Tuple1<String>>() {
                                        @Override
                                        public Tuple1<String> map(String arg0) throws Exception {
                                                return new Tuple1<String>(arg0);
                                        }
                                }).keyBy(0)
                                .flatMap(new RichFlatMapFunction<Tuple1<String>, String>() {

                                        private OperatorState<Integer> dataset;

                                        @Override
                                        public void flatMap(Tuple1<String> arg0,
                                                        Collector<String> arg1) throws Exception {

                                                if (dataset.value() > 0)
                                                        arg1.collect("Test OK " + arg0);

                                               
                                               
                                        }

                                        @Override
                                        public void open(Configuration parameters) throws Exception {

                                                dataset = getRuntimeContext().getKeyValueState(
                                                                "loadeddata", Integer.class, 0);

                                               
                                                 /*
                                                  * Simulate loading data
                                                  * Looks like if this part is  commented out and the dataset is
                                                  * initialize with 1 for example, than the non-zero value is available
                                                  * in the flatMap function  
                                                  */
                                                 
                                                  for(int i=0;i<10;i++) {
                                                   dataset.update(dataset.value()+1);
                                                  }
                                                 
                                                  //System.out.println("dataset value "+dataset.value());
                                                 
                                        }
                                });

                stream.print();

                env.execute("test open function");
        }
 

Dr. Radu Tudoran
Research Engineer
IT R&D Division


HUAWEI TECHNOLOGIES Duesseldorf GmbH
European Research Center
Riesstrasse 25, 80992 München

E-mail: [hidden email]
Mobile: +49 15209084330
Telephone: +49 891588344173

HUAWEI TECHNOLOGIES Duesseldorf GmbH
Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
Managing Director: Jingwen TAO, Wanzhou MENG, Lifang CHEN
Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
Geschäftsführer: Jingwen TAO, Wanzhou MENG, Lifang CHEN
This e-mail and its attachments contain confidential information from HUAWEI, which is intended only for the person or entity whose address is listed above. Any use of the information contained herein in any way (including, but not limited to, total or partial disclosure, reproduction, or dissemination) by persons other than the intended recipient(s) is prohibited. If you receive this e-mail in error, please notify the sender by phone or email immediately and delete it!

-----Original Message-----
From: Aljoscha Krettek [mailto:[hidden email]]
Sent: Tuesday, December 08, 2015 12:14 PM
To: [hidden email]
Subject: Re: Question about DataStream serialization

Hi,
if the open() method is indeed not called before the first flatMap() call then this would be a bug. Could you please verify that this is the case and maybe provide an example where this is observable?

Cheers,
Aljoscha

> On 08 Dec 2015, at 10:41, Matthias J. Sax <[hidden email]> wrote:
>
> Hi,
>
> I think (but please someone verify) that an OperatorState is actually
> not required -- I think that "open()" is called after a failure and
> recovery, too. So you can use a regular member variable to store the
> data instead of an OperatorState. In case of failure, you just re-read
> the data as on regular start-up.
>
> -Matthias
>
>
> On 12/08/2015 09:38 AM, Radu Tudoran wrote:
>> Hi,
>>
>> Thanks for the answer - it is helpful.
>> The issue that remains is why is the open function not being executed before the flatmap to load the data in the OperatorState.
>>
>> I used something like - and I observe that the dataset is not
>> initialized when being used in the flatmap function
>>
>> env.socketTextStream
>> .map() -> to transform data to a Tuple1<String>
>> .keyby(0) -> to enable the usage of the operatorState which I saw requires keyed structured
>> .flatmap(RichFlatMapFunction<Tuple1<String>, String>   -> the function
>> {
>> private OperatorState<String> dataset; @Override public void flatMap(
>> { Dataset -> use ...is empty } @Override public void open( { dataset
>> -> load }
>> })
>>
>>
>>
>> Dr. Radu Tudoran
>> Research Engineer
>> IT R&D Division
>>
>>
>> HUAWEI TECHNOLOGIES Duesseldorf GmbH
>> European Research Center
>> Riesstrasse 25, 80992 München
>>
>> E-mail: [hidden email]
>> Mobile: +49 15209084330
>> Telephone: +49 891588344173
>>
>> HUAWEI TECHNOLOGIES Duesseldorf GmbH
>> Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com Registered
>> Office: Düsseldorf, Register Court Düsseldorf, HRB 56063, Managing
>> Director: Jingwen TAO, Wanzhou MENG, Lifang CHEN Sitz der
>> Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
>> Geschäftsführer: Jingwen TAO, Wanzhou MENG, Lifang CHEN This e-mail
>> and its attachments contain confidential information from HUAWEI, which is intended only for the person or entity whose address is listed above. Any use of the information contained herein in any way (including, but not limited to, total or partial disclosure, reproduction, or dissemination) by persons other than the intended recipient(s) is prohibited. If you receive this e-mail in error, please notify the sender by phone or email immediately and delete it!
>>
>> -----Original Message-----
>> From: Matthias J. Sax [mailto:[hidden email]]
>> Sent: Tuesday, December 08, 2015 8:42 AM
>> To: [hidden email]
>> Subject: Re: Question about DataStream serialization
>>
>> Hi Radu,
>>
>> you are right. The open() method is called for each parallel instance of a rich function. Thus, if all instanced use the same code, you might read the same data multiple times.
>>
>> The easiest was to distinguish different instanced within open() is to user the RuntimeContext. If offers two methods  "int getNumberOfParallelSubtasks()" and "int getIndexOfThisSubtask()" that you can use to compute your own partitioning within open().
>>
>> For example (just a sketch):
>>
>> @Override
>> public void open(Configuration parameters) throws Exception {  
>> RuntimeContext context = super.getRuntimeContext();  int dop =
>> context.getNumberOfParallelSubtasks();
>>  int idx = context.getIndexOfThisSubtask();
>>
>>  // open file
>>  // get size of file in bytes
>>
>>  // seek to partition #idx:
>>  long seek = fileSize * idx / dop;
>>
>>  // read "fileSize/dop" bytes
>> }
>>
>> Hope this helps.
>>
>> -Matthias
>>
>>
>> On 12/08/2015 04:28 AM, Radu Tudoran wrote:
>>> Hi,
>>>
>>>
>>>
>>> Taking the example you mentioned of using RichFlatMapFunction and in
>>> the
>>> open() reading a file.
>>>
>>> Would this open function be executed on each node where the
>>> RichFlatMapFunction gets executed? (I have done some tests and I
>>> would get the feeling it does – but I wanted to double - check )
>>>
>>> If so, would this mean that the same data will be loaded multiple
>>> times on each parallel instance? Is there anyway, this can be
>>> prevented and the data to be hashed and partitioned somehow across nodes?
>>>
>>>
>>>
>>> Would using the operator state help?:
>>>
>>> “
>>>
>>> OperatorState*<*MyList<String>*>*dataset*;*
>>>
>>> ”
>>>
>>> I would be curious in this case how could the open function look
>>> like to initialize the data for this operator state:
>>>
>>>
>>>
>>>
>>>
>>> I have tried to just read a file and write it into the dataset, but
>>> I encountered a strange behavior that would look like the flatmap
>>> function gets executed before the open function, which leads to
>>> using an empty dataset in the flatmap function while when this
>>> finish executing the dataset gets loaded. Is this an error or I am doing something wrong?
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> Dr. Radu Tudoran
>>>
>>> Research Engineer
>>>
>>> IT R&D Division
>>>
>>>
>>>
>>> cid:image007.jpg@01CD52EB.AD060EE0
>>>
>>> HUAWEI TECHNOLOGIES Duesseldorf GmbH
>>>
>>> European Research Center
>>>
>>> Riesstrasse 25, 80992 München
>>>
>>>
>>>
>>> E-mail: _radu.tudoran@huawei.com_
>>>
>>> Mobile: +49 15209084330
>>>
>>> Telephone: +49 891588344173
>>>
>>>
>>>
>>> HUAWEI TECHNOLOGIES Duesseldorf GmbH Hansaallee 205, 40549
>>> Düsseldorf, Germany, www.huawei.com <http://www.huawei.com/>
>>> Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
>>> Managing Director: Jingwen TAO, Wanzhou MENG, Lifang CHEN Sitz der
>>> Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
>>> Geschäftsführer: Jingwen TAO, Wanzhou MENG, Lifang CHEN
>>>
>>> This e-mail and its attachments contain confidential information
>>> from HUAWEI, which is intended only for the person or entity whose
>>> address is listed above. Any use of the information contained herein
>>> in any way (including, but not limited to, total or partial
>>> disclosure, reproduction, or dissemination) by persons other than
>>> the intended
>>> recipient(s) is prohibited. If you receive this e-mail in error,
>>> please notify the sender by phone or email immediately and delete it!
>>>
>>>
>>>
>>> *From:*Robert Metzger [mailto:[hidden email]]
>>> *Sent:* Tuesday, December 01, 2015 6:21 PM
>>> *To:* [hidden email]
>>> *Cc:* Goetz Brasche
>>> *Subject:* Re: Question about DataStream serialization
>>>
>>>
>>>
>>> Hi Radu,
>>>
>>>
>>>
>>> both emails reached the mailing list :)
>>>
>>>
>>>
>>> You can not reference to DataSets or DataStreams from inside user
>>> defined functions. Both are just abstractions for a data set or
>>> stream, so the elements are not really inside the set.
>>>
>>>
>>>
>>> We don't have any support for mixing the DataSet and DataStream API.
>>>
>>>
>>>
>>> For your use case, I would recommend you to use a
>>> RichFlatMapFunction and in the open() call read the text file.
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> On Tue, Dec 1, 2015 at 5:03 PM, Radu Tudoran
>>> <[hidden email] <mailto:[hidden email]>> wrote:
>>>
>>>
>>>
>>> Hello,
>>>
>>>
>>>
>>> I am not sure if this message was received on the user list, if so I
>>> apologies for duplicate messages
>>>
>>>
>>>
>>> I have the following scenario
>>>
>>>
>>>
>>> ·         Reading a fixed set
>>>
>>> DataStream<String> /fixedset/ = env.readtextFile(…
>>>
>>> ·         Reading a continuous stream of data
>>>
>>> DataStream<String> /stream/ = ….
>>>
>>>
>>>
>>> I would need that for each event read from the continuous stream to
>>> make some operations onit and on the /fixedsettoghether/
>>>
>>>
>>>
>>>
>>>
>>> I have tried something like
>>>
>>>
>>>
>>> final myObject.referenceStaticSet = fixedset;
>>>
>>> stream.map(new MapFunction<String, String>() {
>>>
>>>                     @Override
>>>
>>>                     public String map(String arg0) throws Exception
>>> {
>>>
>>>
>>>
>>>                           //for example:   final string2add = arg0;
>>>
>>>                                                                //the
>>> goal of below function would be to add the string2add to the
>>> fixedset
>>>
>>>                           myObject.referenceStaticSet =
>>> myObject.referenceStaticSet.flatMap(new FlatMapFunction<String,
>>> String>() {
>>>
>>>
>>>
>>>                                  @Override
>>>
>>>                                  public void flatMap(String arg0,
>>> Collector<String> arg1)
>>>
>>>
>>> //for example adding to the fixed set also the string2add object:  
>>> arg1.collect(string2add);
>>>
>>>
>>>                                }
>>>
>>> …
>>>
>>> }
>>>
>>>
>>>
>>> However,  I get an exception (Exception in thread "main"
>>> _org.apache.flink.api.common.InvalidProgramException_: ) that object
>>> is not serializable (Object MyClass$3@a71081 not serializable )
>>>
>>>
>>>
>>> Looking into this I see that the issues is that the DataStream<> is
>>> not serializable. What would be the solution to this issue?
>>>
>>>
>>>
>>> As I mentioned before, I would like that for each event from the
>>> continuous stream to use the initial fixed set, add the event to it
>>> and apply an operation.
>>>
>>> Stephan was mentioning at some point some possibility to create a
>>> DataSet and launch a batch processing while operating in stream
>>> mode– in case this is possible, can you give me a reference for it,
>>> because it might be the good solution to  use in case. I am thinking
>>> that I could keep the fixed set as a DataSet and as each new event
>>> comes, transform it into a dataset and then join with reference set
>>> and apply an operation
>>>
>>>
>>>
>>> Regards,
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> Dr. Radu Tudoran
>>>
>>> Research Engineer
>>>
>>> IT R&D Division
>>>
>>>
>>>
>>> cid:image007.jpg@01CD52EB.AD060EE0
>>>
>>> HUAWEI TECHNOLOGIES Duesseldorf GmbH
>>>
>>> European Research Center
>>>
>>> Riesstrasse 25, 80992 München
>>>
>>>
>>>
>>> E-mail: [hidden email] <mailto:[hidden email]>_
>>>
>>> Mobile: +49 15209084330 <tel:%2B49%2015209084330>
>>>
>>> Telephone: +49 891588344173 <tel:%2B49%20891588344173>
>>>
>>>
>>>
>>> HUAWEI TECHNOLOGIES Duesseldorf GmbH Hansaallee 205, 40549
>>> Düsseldorf, Germany, www.huawei.com <http://www.huawei.com/>
>>> Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
>>> Managing Director: Jingwen TAO, Wanzhou MENG, Lifang CHEN Sitz der
>>> Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
>>> Geschäftsführer: Jingwen TAO, Wanzhou MENG, Lifang CHEN
>>>
>>> This e-mail and its attachments contain confidential information
>>> from HUAWEI, which is intended only for the person or entity whose
>>> address is listed above. Any use of the information contained herein
>>> in any way (including, but not limited to, total or partial
>>> disclosure, reproduction, or dissemination) by persons other than
>>> the intended
>>> recipient(s) is prohibited. If you receive this e-mail in error,
>>> please notify the sender by phone or email immediately and delete it!
>>>
>>>
>>>
>>> *From:*Vieru, Mihail [mailto:[hidden email]
>>> <mailto:[hidden email]>]
>>> *Sent:* Tuesday, December 01, 2015 4:55 PM
>>> *To:* [hidden email] <mailto:[hidden email]>
>>> *Subject:* NPE with Flink Streaming from Kafka
>>>
>>>
>>>
>>> Hi,
>>>
>>> we get the following NullPointerException after ~50 minutes when
>>> running a streaming job with windowing and state that reads data
>>> from Kafka and writes the result to local FS.
>>>
>>> There are around 170 million messages to be processed, Flink 0.10.1
>>> stops at ~8 million.
>>>
>>> Flink runs locally, started with the "start-cluster-streaming.sh" script.
>>>
>>>
>>> 12/01/2015 15:06:24    Job execution switched to status RUNNING.
>>> 12/01/2015 15:06:24    Source: Custom Source -> Map -> Map(1/1) switched
>>> to SCHEDULED
>>> 12/01/2015 15:06:24    Source: Custom Source -> Map -> Map(1/1) switched
>>> to DEPLOYING
>>> 12/01/2015 15:06:24    Fast TumblingTimeWindows(5000) of Reduce at
>>> main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) switched
>>> to SCHEDULED
>>> 12/01/2015 15:06:24    Fast TumblingTimeWindows(5000) of Reduce at
>>> main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) switched
>>> to DEPLOYING
>>> 12/01/2015 15:06:24    Source: Custom Source -> Map -> Map(1/1) switched
>>> to RUNNING
>>> 12/01/2015 15:06:24    Fast TumblingTimeWindows(5000) of Reduce at
>>> main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) switched
>>> to RUNNING
>>> 12/01/2015 15:56:08    Fast TumblingTimeWindows(5000) of Reduce at
>>> main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) switched
>>> to CANCELED
>>> 12/01/2015 15:56:08    Source: Custom Source -> Map -> Map(1/1) switched
>>> to FAILED
>>> java.lang.Exception
>>>    at
>>> org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher.run(LegacyFetcher.java:242)
>>>    at
>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.run(FlinkKafkaConsumer.java:397)
>>>    at
>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58)
>>>    at
>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:55)
>>>    at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:218)
>>>    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
>>>    at java.lang.Thread.run(Thread.java:745)
>>> Caused by: java.lang.NullPointerException
>>>    at
>>> org.I0Itec.zkclient.ZkConnection.writeDataReturnStat(ZkConnection.java:115)
>>>    at org.I0Itec.zkclient.ZkClient$10.call(ZkClient.java:817)
>>>    at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:675)
>>>    at org.I0Itec.zkclient.ZkClient.writeDataReturnStat(ZkClient.java:813)
>>>    at org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:808)
>>>    at org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:777)
>>>    at kafka.utils.ZkUtils$.updatePersistentPath(ZkUtils.scala:332)
>>>    at kafka.utils.ZkUtils.updatePersistentPath(ZkUtils.scala)
>>>    at
>>> org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.setOffsetInZooKeeper(ZookeeperOffsetHandler.java:112)
>>>    at
>>> org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.commit(ZookeeperOffsetHandler.java:80)
>>>    at
>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer$Perio
>>> di
>>> cOffsetCommitter.run(FlinkKafkaConsumer.java:632)
>>>
>>> Any ideas on what could cause this behaviour?
>>>
>>>
>>>
>>> Best,
>>>
>>> Mihail
>>>
>>>
>>>
>>
>

Reply | Threaded
Open this post in threaded view
|

Re: Question about DataStream serialization

Aljoscha Krettek
Ah, I see what’s the problem. Operator state is scoped to the key of the incoming element. In the open() method, no element has been received yet, so the key of the incoming element is basically NULL. So the open() method initializes state for key NULL. In flatMap() we actually have a key of incoming elements so we access state for a specific key, which has default value “0” (from the getKeyValueState() call).

OperatorState is only useful if the state needs to be partitioned by key, but here it seems that the state is valid for all elements?

> On 08 Dec 2015, at 15:30, Radu Tudoran <[hidden email]> wrote:
>
> final StreamExecutionEnvironment env = StreamExecutionEnvironment
> .getExecutionEnvironment();
>
> DataStream<String> stream = env
> .socketTextStream("localhost", 16333, '\n')
> .map(new MapFunction<String, Tuple1<String>>() {
> @Override
> public Tuple1<String> map(String arg0) throws Exception {
> return new Tuple1<String>(arg0);
> }
> }).keyBy(0)
> .flatMap(new RichFlatMapFunction<Tuple1<String>, String>() {
>
> private OperatorState<Integer> dataset;
>
> @Override
> public void flatMap(Tuple1<String> arg0,
> Collector<String> arg1) throws Exception {
>
> if (dataset.value() > 0)
> arg1.collect("Test OK " + arg0);
>
>
>
> }
>
> @Override
> public void open(Configuration parameters) throws Exception {
>
> dataset = getRuntimeContext().getKeyValueState(
> "loadeddata", Integer.class, 0);
>
>
> /*
>  * Simulate loading data
>  * Looks like if this part is  commented out and the dataset is
>  * initialize with 1 for example, than the non-zero value is available
>  * in the flatMap function  
>  */
>  
>  for(int i=0;i<10;i++) {
>    dataset.update(dataset.value()+1);
>  }
>  
>  //System.out.println("dataset value "+dataset.value());
>  
> }
> });
>
> stream.print();
>
> env.execute("test open function");

Reply | Threaded
Open this post in threaded view
|

RE: Question about DataStream serialization

Radu Tudoran
Hi,

The state that is being loaded can very well be partitioned by keys. Assuming this scenario and that you would now that the keys go from 0 to N, is there some possibility to load and partitioned the initial data in the open function?


Dr. Radu Tudoran
Research Engineer
IT R&D Division


HUAWEI TECHNOLOGIES Duesseldorf GmbH
European Research Center
Riesstrasse 25, 80992 München

E-mail: [hidden email]
Mobile: +49 15209084330
Telephone: +49 891588344173

HUAWEI TECHNOLOGIES Duesseldorf GmbH
Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
Managing Director: Jingwen TAO, Wanzhou MENG, Lifang CHEN
Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
Geschäftsführer: Jingwen TAO, Wanzhou MENG, Lifang CHEN
This e-mail and its attachments contain confidential information from HUAWEI, which is intended only for the person or entity whose address is listed above. Any use of the information contained herein in any way (including, but not limited to, total or partial disclosure, reproduction, or dissemination) by persons other than the intended recipient(s) is prohibited. If you receive this e-mail in error, please notify the sender by phone or email immediately and delete it!


-----Original Message-----
From: Aljoscha Krettek [mailto:[hidden email]]
Sent: Tuesday, December 08, 2015 4:20 PM
To: [hidden email]
Subject: Re: Question about DataStream serialization

Ah, I see what’s the problem. Operator state is scoped to the key of the incoming element. In the open() method, no element has been received yet, so the key of the incoming element is basically NULL. So the open() method initializes state for key NULL. In flatMap() we actually have a key of incoming elements so we access state for a specific key, which has default value “0” (from the getKeyValueState() call).

OperatorState is only useful if the state needs to be partitioned by key, but here it seems that the state is valid for all elements?

> On 08 Dec 2015, at 15:30, Radu Tudoran <[hidden email]> wrote:
>
> final StreamExecutionEnvironment env = StreamExecutionEnvironment
> .getExecutionEnvironment();
>
> DataStream<String> stream = env
> .socketTextStream("localhost", 16333, '\n')
> .map(new MapFunction<String, Tuple1<String>>() {
> @Override
> public Tuple1<String> map(String arg0) throws Exception {
> return new Tuple1<String>(arg0);
> }
> }).keyBy(0)
> .flatMap(new RichFlatMapFunction<Tuple1<String>, String>() {
>
> private OperatorState<Integer> dataset;
>
> @Override
> public void flatMap(Tuple1<String> arg0,
> Collector<String> arg1) throws Exception {
>
> if (dataset.value() > 0)
> arg1.collect("Test OK " + arg0);
>
>
>
> }
>
> @Override
> public void open(Configuration parameters) throws Exception {
>
> dataset = getRuntimeContext().getKeyValueState(
> "loadeddata", Integer.class, 0);
>
>
> /*
>  * Simulate loading data
>  * Looks like if this part is  commented out and the dataset is
>  * initialize with 1 for example, than the non-zero value is available
>  * in the flatMap function  
>  */
>  
>  for(int i=0;i<10;i++) {
>    dataset.update(dataset.value()+1);
>  }
>  
>  //System.out.println("dataset value "+dataset.value());
>  
> }
> });
>
> stream.print();
>
> env.execute("test open function");

Reply | Threaded
Open this post in threaded view
|

Re: Question about DataStream serialization

Aljoscha Krettek
Hi,
it is not possible in an officially supported way. There is however a trick that you could use: You can cast the OperatorState to a KvState. This has a method setCurrentKey() that sets the key to be used when calling value() and update(). In this way you can trick the OperatorState into thinking that it has the key of an input element.

This is an internal API, however, and could change in the future, thereby breaking your program.

Cheers,
Aljoscha

> On 08 Dec 2015, at 16:31, Radu Tudoran <[hidden email]> wrote:
>
> Hi,
>
> The state that is being loaded can very well be partitioned by keys. Assuming this scenario and that you would now that the keys go from 0 to N, is there some possibility to load and partitioned the initial data in the open function?
>
>
> Dr. Radu Tudoran
> Research Engineer
> IT R&D Division
>
>
> HUAWEI TECHNOLOGIES Duesseldorf GmbH
> European Research Center
> Riesstrasse 25, 80992 München
>
> E-mail: [hidden email]
> Mobile: +49 15209084330
> Telephone: +49 891588344173
>
> HUAWEI TECHNOLOGIES Duesseldorf GmbH
> Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
> Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
> Managing Director: Jingwen TAO, Wanzhou MENG, Lifang CHEN
> Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
> Geschäftsführer: Jingwen TAO, Wanzhou MENG, Lifang CHEN
> This e-mail and its attachments contain confidential information from HUAWEI, which is intended only for the person or entity whose address is listed above. Any use of the information contained herein in any way (including, but not limited to, total or partial disclosure, reproduction, or dissemination) by persons other than the intended recipient(s) is prohibited. If you receive this e-mail in error, please notify the sender by phone or email immediately and delete it!
>
>
> -----Original Message-----
> From: Aljoscha Krettek [mailto:[hidden email]]
> Sent: Tuesday, December 08, 2015 4:20 PM
> To: [hidden email]
> Subject: Re: Question about DataStream serialization
>
> Ah, I see what’s the problem. Operator state is scoped to the key of the incoming element. In the open() method, no element has been received yet, so the key of the incoming element is basically NULL. So the open() method initializes state for key NULL. In flatMap() we actually have a key of incoming elements so we access state for a specific key, which has default value “0” (from the getKeyValueState() call).
>
> OperatorState is only useful if the state needs to be partitioned by key, but here it seems that the state is valid for all elements?
>> On 08 Dec 2015, at 15:30, Radu Tudoran <[hidden email]> wrote:
>>
>> final StreamExecutionEnvironment env = StreamExecutionEnvironment
>> .getExecutionEnvironment();
>>
>> DataStream<String> stream = env
>> .socketTextStream("localhost", 16333, '\n')
>> .map(new MapFunction<String, Tuple1<String>>() {
>> @Override
>> public Tuple1<String> map(String arg0) throws Exception {
>> return new Tuple1<String>(arg0);
>> }
>> }).keyBy(0)
>> .flatMap(new RichFlatMapFunction<Tuple1<String>, String>() {
>>
>> private OperatorState<Integer> dataset;
>>
>> @Override
>> public void flatMap(Tuple1<String> arg0,
>> Collector<String> arg1) throws Exception {
>>
>> if (dataset.value() > 0)
>> arg1.collect("Test OK " + arg0);
>>
>>
>>
>> }
>>
>> @Override
>> public void open(Configuration parameters) throws Exception {
>>
>> dataset = getRuntimeContext().getKeyValueState(
>> "loadeddata", Integer.class, 0);
>>
>>
>> /*
>>  * Simulate loading data
>>  * Looks like if this part is  commented out and the dataset is
>>  * initialize with 1 for example, than the non-zero value is available
>>  * in the flatMap function  
>>  */
>>  
>>  for(int i=0;i<10;i++) {
>>    dataset.update(dataset.value()+1);
>>  }
>>  
>>  //System.out.println("dataset value "+dataset.value());
>>  
>> }
>> });
>>
>> stream.print();
>>
>> env.execute("test open function");
>

Reply | Threaded
Open this post in threaded view
|

RE: Question about DataStream serialization

Radu Tudoran
Hi,

Is the partitioned functioned used by the ".keyBy(Object)" of the form:

Object.hash % getNumberOfParallelSubtasks()

?



Dr. Radu Tudoran
Research Engineer
IT R&D Division


HUAWEI TECHNOLOGIES Duesseldorf GmbH
European Research Center
Riesstrasse 25, 80992 München

E-mail: [hidden email]
Mobile: +49 15209084330
Telephone: +49 891588344173

HUAWEI TECHNOLOGIES Duesseldorf GmbH
Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
Managing Director: Jingwen TAO, Wanzhou MENG, Lifang CHEN
Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
Geschäftsführer: Jingwen TAO, Wanzhou MENG, Lifang CHEN
This e-mail and its attachments contain confidential information from HUAWEI, which is intended only for the person or entity whose address is listed above. Any use of the information contained herein in any way (including, but not limited to, total or partial disclosure, reproduction, or dissemination) by persons other than the intended recipient(s) is prohibited. If you receive this e-mail in error, please notify the sender by phone or email immediately and delete it!


-----Original Message-----
From: Aljoscha Krettek [mailto:[hidden email]]
Sent: Tuesday, December 08, 2015 5:00 PM
To: [hidden email]
Subject: Re: Question about DataStream serialization

Hi,
it is not possible in an officially supported way. There is however a trick that you could use: You can cast the OperatorState to a KvState. This has a method setCurrentKey() that sets the key to be used when calling value() and update(). In this way you can trick the OperatorState into thinking that it has the key of an input element.

This is an internal API, however, and could change in the future, thereby breaking your program.

Cheers,
Aljoscha

> On 08 Dec 2015, at 16:31, Radu Tudoran <[hidden email]> wrote:
>
> Hi,
>
> The state that is being loaded can very well be partitioned by keys. Assuming this scenario and that you would now that the keys go from 0 to N, is there some possibility to load and partitioned the initial data in the open function?
>
>
> Dr. Radu Tudoran
> Research Engineer
> IT R&D Division
>
>
> HUAWEI TECHNOLOGIES Duesseldorf GmbH
> European Research Center
> Riesstrasse 25, 80992 München
>
> E-mail: [hidden email]
> Mobile: +49 15209084330
> Telephone: +49 891588344173
>
> HUAWEI TECHNOLOGIES Duesseldorf GmbH
> Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com Registered
> Office: Düsseldorf, Register Court Düsseldorf, HRB 56063, Managing
> Director: Jingwen TAO, Wanzhou MENG, Lifang CHEN Sitz der
> Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
> Geschäftsführer: Jingwen TAO, Wanzhou MENG, Lifang CHEN This e-mail
> and its attachments contain confidential information from HUAWEI, which is intended only for the person or entity whose address is listed above. Any use of the information contained herein in any way (including, but not limited to, total or partial disclosure, reproduction, or dissemination) by persons other than the intended recipient(s) is prohibited. If you receive this e-mail in error, please notify the sender by phone or email immediately and delete it!
>
>
> -----Original Message-----
> From: Aljoscha Krettek [mailto:[hidden email]]
> Sent: Tuesday, December 08, 2015 4:20 PM
> To: [hidden email]
> Subject: Re: Question about DataStream serialization
>
> Ah, I see what’s the problem. Operator state is scoped to the key of the incoming element. In the open() method, no element has been received yet, so the key of the incoming element is basically NULL. So the open() method initializes state for key NULL. In flatMap() we actually have a key of incoming elements so we access state for a specific key, which has default value “0” (from the getKeyValueState() call).
>
> OperatorState is only useful if the state needs to be partitioned by key, but here it seems that the state is valid for all elements?
>> On 08 Dec 2015, at 15:30, Radu Tudoran <[hidden email]> wrote:
>>
>> final StreamExecutionEnvironment env = StreamExecutionEnvironment
>> .getExecutionEnvironment();
>>
>> DataStream<String> stream = env
>> .socketTextStream("localhost", 16333, '\n')
>> .map(new MapFunction<String, Tuple1<String>>() {
>> @Override
>> public Tuple1<String> map(String arg0) throws Exception {
>> return new Tuple1<String>(arg0);
>> }
>> }).keyBy(0)
>> .flatMap(new RichFlatMapFunction<Tuple1<String>, String>() {
>>
>> private OperatorState<Integer> dataset;
>>
>> @Override
>> public void flatMap(Tuple1<String> arg0,
>> Collector<String> arg1) throws Exception {
>>
>> if (dataset.value() > 0)
>> arg1.collect("Test OK " + arg0);
>>
>>
>>
>> }
>>
>> @Override
>> public void open(Configuration parameters) throws Exception {
>>
>> dataset = getRuntimeContext().getKeyValueState(
>> "loadeddata", Integer.class, 0);
>>
>>
>> /*
>>  * Simulate loading data
>>  * Looks like if this part is  commented out and the dataset is
>>  * initialize with 1 for example, than the non-zero value is available
>>  * in the flatMap function  
>>  */
>>  
>>  for(int i=0;i<10;i++) {
>>    dataset.update(dataset.value()+1);
>>  }
>>  
>>  //System.out.println("dataset value "+dataset.value());
>>  
>> }
>> });
>>
>> stream.print();
>>
>> env.execute("test open function");
>

Reply | Threaded
Open this post in threaded view
|

Re: Question about DataStream serialization

Stephan Ewen
The object's hash it is additionally scrambled using a typical hash function (like murmur hash) to guard against bad hash functions...

On Wed, Dec 9, 2015 at 2:37 AM, Radu Tudoran <[hidden email]> wrote:
Hi,

Is the partitioned functioned used by the ".keyBy(Object)" of the form:

Object.hash % getNumberOfParallelSubtasks()

?



Dr. Radu Tudoran
Research Engineer
IT R&D Division


HUAWEI TECHNOLOGIES Duesseldorf GmbH
European Research Center
Riesstrasse 25, 80992 München

E-mail: [hidden email]
Mobile: <a href="tel:%2B49%2015209084330" value="+4915209084330">+49 15209084330
Telephone: <a href="tel:%2B49%20891588344173" value="+49891588344173">+49 891588344173

HUAWEI TECHNOLOGIES Duesseldorf GmbH
Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
Managing Director: Jingwen TAO, Wanzhou MENG, Lifang CHEN
Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
Geschäftsführer: Jingwen TAO, Wanzhou MENG, Lifang CHEN
This e-mail and its attachments contain confidential information from HUAWEI, which is intended only for the person or entity whose address is listed above. Any use of the information contained herein in any way (including, but not limited to, total or partial disclosure, reproduction, or dissemination) by persons other than the intended recipient(s) is prohibited. If you receive this e-mail in error, please notify the sender by phone or email immediately and delete it!


-----Original Message-----
From: Aljoscha Krettek [mailto:[hidden email]]
Sent: Tuesday, December 08, 2015 5:00 PM
To: [hidden email]
Subject: Re: Question about DataStream serialization

Hi,
it is not possible in an officially supported way. There is however a trick that you could use: You can cast the OperatorState to a KvState. This has a method setCurrentKey() that sets the key to be used when calling value() and update(). In this way you can trick the OperatorState into thinking that it has the key of an input element.

This is an internal API, however, and could change in the future, thereby breaking your program.

Cheers,
Aljoscha
> On 08 Dec 2015, at 16:31, Radu Tudoran <[hidden email]> wrote:
>
> Hi,
>
> The state that is being loaded can very well be partitioned by keys. Assuming this scenario and that you would now that the keys go from 0 to N, is there some possibility to load and partitioned the initial data in the open function?
>
>
> Dr. Radu Tudoran
> Research Engineer
> IT R&D Division
>
>
> HUAWEI TECHNOLOGIES Duesseldorf GmbH
> European Research Center
> Riesstrasse 25, 80992 München
>
> E-mail: [hidden email]
> Mobile: <a href="tel:%2B49%2015209084330" value="+4915209084330">+49 15209084330
> Telephone: <a href="tel:%2B49%20891588344173" value="+49891588344173">+49 891588344173
>
> HUAWEI TECHNOLOGIES Duesseldorf GmbH
> Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com Registered
> Office: Düsseldorf, Register Court Düsseldorf, HRB 56063, Managing
> Director: Jingwen TAO, Wanzhou MENG, Lifang CHEN Sitz der
> Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
> Geschäftsführer: Jingwen TAO, Wanzhou MENG, Lifang CHEN This e-mail
> and its attachments contain confidential information from HUAWEI, which is intended only for the person or entity whose address is listed above. Any use of the information contained herein in any way (including, but not limited to, total or partial disclosure, reproduction, or dissemination) by persons other than the intended recipient(s) is prohibited. If you receive this e-mail in error, please notify the sender by phone or email immediately and delete it!
>
>
> -----Original Message-----
> From: Aljoscha Krettek [mailto:[hidden email]]
> Sent: Tuesday, December 08, 2015 4:20 PM
> To: [hidden email]
> Subject: Re: Question about DataStream serialization
>
> Ah, I see what’s the problem. Operator state is scoped to the key of the incoming element. In the open() method, no element has been received yet, so the key of the incoming element is basically NULL. So the open() method initializes state for key NULL. In flatMap() we actually have a key of incoming elements so we access state for a specific key, which has default value “0” (from the getKeyValueState() call).
>
> OperatorState is only useful if the state needs to be partitioned by key, but here it seems that the state is valid for all elements?
>> On 08 Dec 2015, at 15:30, Radu Tudoran <[hidden email]> wrote:
>>
>> final StreamExecutionEnvironment env = StreamExecutionEnvironment
>>                              .getExecutionEnvironment();
>>
>>              DataStream<String> stream = env
>>                              .socketTextStream("localhost", 16333, '\n')
>>                              .map(new MapFunction<String, Tuple1<String>>() {
>>                                      @Override
>>                                      public Tuple1<String> map(String arg0) throws Exception {
>>                                              return new Tuple1<String>(arg0);
>>                                      }
>>                              }).keyBy(0)
>>                              .flatMap(new RichFlatMapFunction<Tuple1<String>, String>() {
>>
>>                                      private OperatorState<Integer> dataset;
>>
>>                                      @Override
>>                                      public void flatMap(Tuple1<String> arg0,
>>                                                      Collector<String> arg1) throws Exception {
>>
>>                                              if (dataset.value() > 0)
>>                                                      arg1.collect("Test OK " + arg0);
>>
>>
>>
>>                                      }
>>
>>                                      @Override
>>                                      public void open(Configuration parameters) throws Exception {
>>
>>                                              dataset = getRuntimeContext().getKeyValueState(
>>                                                              "loadeddata", Integer.class, 0);
>>
>>
>>                                               /*
>>                                                * Simulate loading data
>>                                                * Looks like if this part is  commented out and the dataset is
>>                                                * initialize with 1 for example, than the non-zero value is available
>>                                                * in the flatMap function
>>                                                */
>>
>>                                                for(int i=0;i<10;i++) {
>>                                                        dataset.update(dataset.value()+1);
>>                                                }
>>
>>                                                //System.out.println("dataset value "+dataset.value());
>>
>>                                      }
>>                              });
>>
>>              stream.print();
>>
>>              env.execute("test open function");
>


Reply | Threaded
Open this post in threaded view
|

Re: Question about DataStream serialization

Aljoscha Krettek
In reply to this post by Radu Tudoran
Right now, it is exactly "Object.hash % getNumberOfParallelSubtasks()”...

> On 09 Dec 2015, at 02:37, Radu Tudoran <[hidden email]> wrote:
>
> Object.hash % getNumberOfParallelSubtasks()

12