Hi, there
Recently I run streaming benchmark with flink 1.5.2 standalone on the cluster with 4 machines(1 as master and others as workers), it appears different result as below: (1). when I set the parallelism with 96, source, sink and middle operator parallelism all set to 96, start 3 taskmanager and each taskmanager slot is 32, all goes well. (2). when I change (1) to start 6 taskmanager, here 2 taskmanger on each work and each taskmanager slot is 16. all goes well too. At this situation, I find the subtask on each work processed same data size, but one worker processed times than other worker, it seems data skew occur. How could this happen? Someone could explain to me that when set same parallelism, the performance between multi taskmanager each worker with slots and one taskmanager with more slots? Thanks a lot! Best Regards Rui ----- stay hungry, stay foolish. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
stay hungry, stay foolish.
|
Hi Rui, such a situation can occur if you have data skew in your data set (differently sized partitions if you key by some key). Assume you have 2 TMs with 2 slots each and you key your data by some key x. The partition assignment could look like: TM1: slot_1 = Partition_1, slot_2 = Partition_2 TM2: slot_1 = Partition_3, slot_2 = Partition_4 Now assume that partition_1 and partition_3 are ten times bigger than partition_2 and partition_4. From a TM perspective both TMs would process the same amount of data. If you now start 4 TMs with a single slot each you could get the following assignment: TM1: slot_1 = Partition_1 TM2: slot_1 = Partition_2 TM3: slot_3 = Partition_3 TM4: slot_4 = Partition_4 Now from a TM perspective, TM1 and TM3 would process ten times more data than TM2 and TM4. Does this make sense? What you could check is whether you can detect such a data skew in your input data (e.g. by counting the occurrences of items with a specific key). Cheers, Till On Wed, Jan 2, 2019 at 6:13 AM varuy322 <[hidden email]> wrote: Hi, there |
Hi, Till
It's very kind of your reply. I got your point, I'm sorry to not make it clear about my issue. I generated data by streaming benchmark just as the link: https://github.com/dataArtisans/databricks-benchmark/blob/master/src/main/scala/com/databricks/benchmark/flink/EventGenerator.scala . What I wanna to say is that, let the parallelism is same assume to 96, just changes the tm and slots/tm. The first test to configure tm 3 with 32 slots/tm, there does not occur data skew, three machine receive same data and each partition processed approximate data. Then second test to configure tm 6 with 16 slots/tm, I find each partition processed same data too, but one machine processed data more than the other two machine. I wonder whether the taskmanager(jvm) competes in one machine? What's more, how does the streaming benchmark do with backpressure? I test on cluster with 4 node, one for master and three for worker, each node with Intel Xeon E5-2699 v4 @ 2.20GHz/3.60GHz, 256G memory, 88 cores, 10Gbps network, I could not find the bottleneck. It confused me! Best Regards & Thanks Rui ----- stay hungry, stay foolish. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
stay hungry, stay foolish.
|
Hi, could you tell me how exactly you started the cluster and with which parameters (configured memory, maybe vcores, etc.)? Cheers, Till On Thu, Jan 3, 2019 at 2:37 AM varuy322 <[hidden email]> wrote: Hi, Till |
Free forum by Nabble | Edit this page |