Join Stream with big ref table

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Join Stream with big ref table

LINZ, Arnaud

Hello,

 

I have to enrich a stream with a big reference table (11,000,000 rows). I cannot use “join” because I cannot window the stream ; so in the “open()” function of each mapper I read the content of the table and put it in a HashMap (stored on the heap).

 

11M rows is quite big but it should take less than 100Mb in RAM, so it’s supposed to be easy. However, I systematically run into a Java Out Of Memory error, even with huge 64Gb containers (5 slots / container).

 

Path, ID

Data Port

Last Heartbeat

All Slots

Free Slots

CPU Cores

Physical Memory

Free Memory

Flink Managed Memory

akka.tcp://flink@172.21.125.28:43653/user/taskmanager

4B4D0A725451E933C39E891AAE80B53B

41982

2015-11-12, 17:46:14

5

5

32

126.0 GB

46.0 GB

31.5 GB

 

I don’t clearly understand why this happens and how to fix it. Any clue?

 

 

 




L'intégrité de ce message n'étant pas assurée sur internet, la société expéditrice ne peut être tenue responsable de son contenu ni de ses pièces jointes. Toute utilisation ou diffusion non autorisée est interdite. Si vous n'êtes pas destinataire de ce message, merci de le détruire et d'avertir l'expéditeur.

The integrity of this message cannot be guaranteed on the Internet. The company that sent this message cannot therefore be held liable for its content nor attachments. Any unauthorized use or dissemination is prohibited. If you are not the intended recipient of this message, then please delete it and notify the sender.
Reply | Threaded
Open this post in threaded view
|

RE: Join Stream with big ref table

LINZ, Arnaud

Hello,

 

I’ve worked around my problem by not using the HiveServer2 JDBC driver to read the ref table. Apparently, despite all the good options passed to the Statement object, it poorly handles RAM, since converting the table into textformat and directly reading the hdfs works without any problem and with a lot of free mem…

 

Greetings,

Arnaud

 

De : LINZ, Arnaud
Envoyé : jeudi
12 novembre 2015 17:48
À : '[hidden email]' <[hidden email]>
Objet : Join Stream with big ref table

 

Hello,

 

I have to enrich a stream with a big reference table (11,000,000 rows). I cannot use “join” because I cannot window the stream ; so in the “open()” function of each mapper I read the content of the table and put it in a HashMap (stored on the heap).

 

11M rows is quite big but it should take less than 100Mb in RAM, so it’s supposed to be easy. However, I systematically run into a Java Out Of Memory error, even with huge 64Gb containers (5 slots / container).

 

Path, ID

Data Port

Last Heartbeat

All Slots

Free Slots

CPU Cores

Physical Memory

Free Memory

Flink Managed Memory

akka.tcp://flink@172.21.125.28:43653/user/taskmanager

4B4D0A725451E933C39E891AAE80B53B

41982

2015-11-12, 17:46:14

5

5

32

126.0 GB

46.0 GB

31.5 GB

 

I don’t clearly understand why this happens and how to fix it. Any clue?

 

 

 




L'intégrité de ce message n'étant pas assurée sur internet, la société expéditrice ne peut être tenue responsable de son contenu ni de ses pièces jointes. Toute utilisation ou diffusion non autorisée est interdite. Si vous n'êtes pas destinataire de ce message, merci de le détruire et d'avertir l'expéditeur.

The integrity of this message cannot be guaranteed on the Internet. The company that sent this message cannot therefore be held liable for its content nor attachments. Any unauthorized use or dissemination is prohibited. If you are not the intended recipient of this message, then please delete it and notify the sender.
Reply | Threaded
Open this post in threaded view
|

Re: Join Stream with big ref table

rmetzger0
Hi Arnaud,

I'm happy that you were able to resolve the issue. If you are still interested in the first approach, you could try some things, for example using only one slot per task manager (the slots share the heap of the TM).

Regards,
Robert

On Fri, Nov 13, 2015 at 9:18 AM, LINZ, Arnaud <[hidden email]> wrote:

Hello,

 

I’ve worked around my problem by not using the HiveServer2 JDBC driver to read the ref table. Apparently, despite all the good options passed to the Statement object, it poorly handles RAM, since converting the table into textformat and directly reading the hdfs works without any problem and with a lot of free mem…

 

Greetings,

Arnaud

 

De : LINZ, Arnaud
Envoyé : jeudi
12 novembre 2015 17:48
À : '[hidden email]' <[hidden email]>
Objet : Join Stream with big ref table

 

Hello,

 

I have to enrich a stream with a big reference table (11,000,000 rows). I cannot use “join” because I cannot window the stream ; so in the “open()” function of each mapper I read the content of the table and put it in a HashMap (stored on the heap).

 

11M rows is quite big but it should take less than 100Mb in RAM, so it’s supposed to be easy. However, I systematically run into a Java Out Of Memory error, even with huge 64Gb containers (5 slots / container).

 

Path, ID

Data Port

Last Heartbeat

All Slots

Free Slots

CPU Cores

Physical Memory

Free Memory

Flink Managed Memory

akka.tcp://flink@172.21.125.28:43653/user/taskmanager

4B4D0A725451E933C39E891AAE80B53B

41982

2015-11-12, 17:46:14

5

5

32

126.0 GB

46.0 GB

31.5 GB

 

I don’t clearly understand why this happens and how to fix it. Any clue?

 

 

 




L'intégrité de ce message n'étant pas assurée sur internet, la société expéditrice ne peut être tenue responsable de son contenu ni de ses pièces jointes. Toute utilisation ou diffusion non autorisée est interdite. Si vous n'êtes pas destinataire de ce message, merci de le détruire et d'avertir l'expéditeur.

The integrity of this message cannot be guaranteed on the Internet. The company that sent this message cannot therefore be held liable for its content nor attachments. Any unauthorized use or dissemination is prohibited. If you are not the intended recipient of this message, then please delete it and notify the sender.

Reply | Threaded
Open this post in threaded view
|

Re: Join Stream with big ref table

Stephan Ewen
I think this pattern may be common, so some tools that share such a table across multiple tasks may make sense.

Would be nice to add a handler that you give an "initializer" which reads the data and build the shared lookup map. The first to acquire the handler actually initializes the data set (reads the file, creates the table) and all successive once acquire the same handle (possibly blocking while it is not yet done).

The broadcast variables in the batch API do exactly that, BTW. 


On Fri, Nov 13, 2015 at 9:52 AM, Robert Metzger <[hidden email]> wrote:
Hi Arnaud,

I'm happy that you were able to resolve the issue. If you are still interested in the first approach, you could try some things, for example using only one slot per task manager (the slots share the heap of the TM).

Regards,
Robert

On Fri, Nov 13, 2015 at 9:18 AM, LINZ, Arnaud <[hidden email]> wrote:

Hello,

 

I’ve worked around my problem by not using the HiveServer2 JDBC driver to read the ref table. Apparently, despite all the good options passed to the Statement object, it poorly handles RAM, since converting the table into textformat and directly reading the hdfs works without any problem and with a lot of free mem…

 

Greetings,

Arnaud

 

De : LINZ, Arnaud
Envoyé : jeudi
12 novembre 2015 17:48
À : '[hidden email]' <[hidden email]>
Objet : Join Stream with big ref table

 

Hello,

 

I have to enrich a stream with a big reference table (11,000,000 rows). I cannot use “join” because I cannot window the stream ; so in the “open()” function of each mapper I read the content of the table and put it in a HashMap (stored on the heap).

 

11M rows is quite big but it should take less than 100Mb in RAM, so it’s supposed to be easy. However, I systematically run into a Java Out Of Memory error, even with huge 64Gb containers (5 slots / container).

 

Path, ID

Data Port

Last Heartbeat

All Slots

Free Slots

CPU Cores

Physical Memory

Free Memory

Flink Managed Memory

akka.tcp://flink@172.21.125.28:43653/user/taskmanager

4B4D0A725451E933C39E891AAE80B53B

41982

2015-11-12, 17:46:14

5

5

32

126.0 GB

46.0 GB

31.5 GB

 

I don’t clearly understand why this happens and how to fix it. Any clue?

 

 

 




L'intégrité de ce message n'étant pas assurée sur internet, la société expéditrice ne peut être tenue responsable de son contenu ni de ses pièces jointes. Toute utilisation ou diffusion non autorisée est interdite. Si vous n'êtes pas destinataire de ce message, merci de le détruire et d'avertir l'expéditeur.

The integrity of this message cannot be guaranteed on the Internet. The company that sent this message cannot therefore be held liable for its content nor attachments. Any unauthorized use or dissemination is prohibited. If you are not the intended recipient of this message, then please delete it and notify the sender.