import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.DataStream
import org.apache.flink.streaming.api.windowing.time.Time
import org.hpi.esb.flink.datamodel.{SimpleRecord, Statistics}
class StatisticsQuery(windowSize: Int)
extends Query[(String, SimpleRecord), (String, Statistics)] {
override def execute(stream: DataStream[(String, SimpleRecord)]): DataStream[(String, Statistics)] = {
stream
.keyBy(_._1)
.timeWindow(Time.milliseconds(windowSize))
.fold(("", new Statistics())) { (acc, value) => Statistics.fold(acc, value) }
}
}
Hi everybody,
I have the following flink/kafka setup:
I have 3 kafka “input” topics and 3 “output” topics with each 1 partition (only 1 partition because the order of the messages is important). I also have 1 master and 2 flink slave nodes with a total of 16 task slots.
In my flink program I have created 3 consumers - each for one of the input topics.
On each of the datastreams I run a query that generates statistics over a window of 1 second and I write the result to the corresponding output topic. You can find the execution plan with parallelism set to 2 attached.
This setup with parallelism=2 sometimes seems to give me the wrong order of the statistics results. I assume it is because of the rebalancing before the last map which leads to a race condition when writing to kafka.
If I set parallelism to 1 no rebalancing will be done but only one task slot is used.
This has led me to the following questions:
Why is only 1 task slot used with my 3 pipelines when parallelism is set to 1? As far as I understand, the parallelism refers to the number of parallel instances a task can be split into. Therefore, I would assume that I could still run multiple different tasks (e.g. different maps or window functions on different streams) in different task slots, right?
And to come back to my requirement: Is it not possible to run all 3 pipelines in parallel and still keep the order of the messages and results?
I also asked these questions on stackoverflow. And it seems that I have similar trouble understanding the terms “task slot”, “subtasks” etc. like Flavio mentioning in this flink mail thread.
Thank you and I would appreciate any input!
Best regards,
Ben
Free forum by Nabble | Edit this page |