Correlation between data streams/operators and threads

classic Classic list List threaded Threaded
19 messages Options
Reply | Threaded
Open this post in threaded view
|

Correlation between data streams/operators and threads

Shailesh Jain
Hi,

I'm trying to understand the runtime aspect of Flink when dealing with multiple data streams and multiple operators per data stream.

Use case: N data streams in a single flink job (each data stream representing 1 device - with different time latencies), and each of these data streams gets split into two streams, of which one goes into a bunch of CEP operators, and one into a process function.

Questions:
1. At runtime, will the engine create one thread per data stream? Or one thread per operator?
2. Is it possible to dynamically create a data stream at runtime when the job starts? (i.e. if N is read from a file when the job starts and corresponding N streams need to be created)
3. Are there any specific performance impacts when a large number of streams (N ~ 10000) are created, as opposed to N partitions within a single stream?

Are there any internal (design) documents which can help understanding the implementation details? Any references to the source will also be really helpful.

Thanks in advance.

Shailesh


Reply | Threaded
Open this post in threaded view
|

Re: Correlation between data streams/operators and threads

Piotr Nowojski
Hi,

1. 

Number of threads executing would be roughly speaking equal to of the number of input data streams multiplied by the parallelism.

2. 
Yes, you could dynamically create more data streams at the job startup.

3.
Running 10000 independent data streams on a small cluster (couple of nodes) will definitely be an issue, since even with parallelism set to 1, there would be quite a lot of unnecessary threads. 

It would be much better to treat your data as a single data input stream with multiple partitions. You could assign partitions between source instances based on parallelism. For example with parallelism 6:
- source 0 could get partitions 0, 6, 12, 18
- source 1, could get partitions 1, 7, …
- source 5, could get partitions 5, 11, ...

Piotrek

On 9 Nov 2017, at 10:18, Shailesh Jain <[hidden email]> wrote:

Hi,

I'm trying to understand the runtime aspect of Flink when dealing with multiple data streams and multiple operators per data stream.

Use case: N data streams in a single flink job (each data stream representing 1 device - with different time latencies), and each of these data streams gets split into two streams, of which one goes into a bunch of CEP operators, and one into a process function.

Questions:
1. At runtime, will the engine create one thread per data stream? Or one thread per operator?
2. Is it possible to dynamically create a data stream at runtime when the job starts? (i.e. if N is read from a file when the job starts and corresponding N streams need to be created)
3. Are there any specific performance impacts when a large number of streams (N ~ 10000) are created, as opposed to N partitions within a single stream?

Are there any internal (design) documents which can help understanding the implementation details? Any references to the source will also be really helpful.

Thanks in advance.

Shailesh



Reply | Threaded
Open this post in threaded view
|

Re: Correlation between data streams/operators and threads

Shailesh Jain
On 1. - is it tied specifically to the number of source operators or to the number of Datastream objects created. I mean does the answer change if I read all the data from a single Kafka topic, get a Datastream of all events, and the apply N filters to create N individual streams?

On 3. - the problem with partitions is that watermarks cannot be different per partition, and since in this use case, each stream is from a device, the latency could be different (but order will be correct almost always) and there are high chances of loosing out on events on operators like Patterns which work with windows. Any ideas for workarounds here?


Thanks,
Shailesh

On 09-Nov-2017 8:48 PM, "Piotr Nowojski" <[hidden email]> wrote:
Hi,

1. 

Number of threads executing would be roughly speaking equal to of the number of input data streams multiplied by the parallelism.

2. 
Yes, you could dynamically create more data streams at the job startup.

3.
Running 10000 independent data streams on a small cluster (couple of nodes) will definitely be an issue, since even with parallelism set to 1, there would be quite a lot of unnecessary threads. 

It would be much better to treat your data as a single data input stream with multiple partitions. You could assign partitions between source instances based on parallelism. For example with parallelism 6:
- source 0 could get partitions 0, 6, 12, 18
- source 1, could get partitions 1, 7, …
- source 5, could get partitions 5, 11, ...

Piotrek

On 9 Nov 2017, at 10:18, Shailesh Jain <[hidden email]> wrote:

Hi,

I'm trying to understand the runtime aspect of Flink when dealing with multiple data streams and multiple operators per data stream.

Use case: N data streams in a single flink job (each data stream representing 1 device - with different time latencies), and each of these data streams gets split into two streams, of which one goes into a bunch of CEP operators, and one into a process function.

Questions:
1. At runtime, will the engine create one thread per data stream? Or one thread per operator?
2. Is it possible to dynamically create a data stream at runtime when the job starts? (i.e. if N is read from a file when the job starts and corresponding N streams need to be created)
3. Are there any specific performance impacts when a large number of streams (N ~ 10000) are created, as opposed to N partitions within a single stream?

Are there any internal (design) documents which can help understanding the implementation details? Any references to the source will also be really helpful.

Thanks in advance.

Shailesh




Reply | Threaded
Open this post in threaded view
|

Re: Correlation between data streams/operators and threads

Piotr Nowojski
1.  It’s a little bit more complicated then that. Each operator chain/task will be executed in separate thread (parallelism
 Multiplies that). You can check in web ui how was your job split into tasks.

3. Yes that’s true, this is an issue. To preserve the individual watermarks/latencies (assuming that you have some way to calculate them individually per each device), you could either:

a) have separate jobs per each device with parallelism 1. Pros: independent failures/checkpoints, Cons: resource usage (number of threads increases with number of devices, there are also other resources consumed by each job), efficiency, 
b) have one job with multiple data streams. Cons: resource usage (threads)
c) ignore Flink’s watermarks, and implement your own code in place of it. You could read all of your data in single data stream, keyBy partition/device and manually handle watermarks logic. You could either try to wrap CEP/Window operators or copy/paste and modify them to suite your needs. 

I would start and try out from a). If it work for your cluster/scale then that’s fine. If not try b) (would share most of the code with a), and as a last resort try c).

Kostas, would you like to add something?

Piotrek

On 9 Nov 2017, at 19:16, Shailesh Jain <[hidden email]> wrote:

On 1. - is it tied specifically to the number of source operators or to the number of Datastream objects created. I mean does the answer change if I read all the data from a single Kafka topic, get a Datastream of all events, and the apply N filters to create N individual streams?

On 3. - the problem with partitions is that watermarks cannot be different per partition, and since in this use case, each stream is from a device, the latency could be different (but order will be correct almost always) and there are high chances of loosing out on events on operators like Patterns which work with windows. Any ideas for workarounds here?


Thanks,
Shailesh

On 09-Nov-2017 8:48 PM, "Piotr Nowojski" <[hidden email]> wrote:
Hi,

1. 

Number of threads executing would be roughly speaking equal to of the number of input data streams multiplied by the parallelism.

2. 
Yes, you could dynamically create more data streams at the job startup.

3.
Running 10000 independent data streams on a small cluster (couple of nodes) will definitely be an issue, since even with parallelism set to 1, there would be quite a lot of unnecessary threads. 

It would be much better to treat your data as a single data input stream with multiple partitions. You could assign partitions between source instances based on parallelism. For example with parallelism 6:
- source 0 could get partitions 0, 6, 12, 18
- source 1, could get partitions 1, 7, …
- source 5, could get partitions 5, 11, ...

Piotrek

On 9 Nov 2017, at 10:18, Shailesh Jain <[hidden email]> wrote:

Hi,

I'm trying to understand the runtime aspect of Flink when dealing with multiple data streams and multiple operators per data stream.

Use case: N data streams in a single flink job (each data stream representing 1 device - with different time latencies), and each of these data streams gets split into two streams, of which one goes into a bunch of CEP operators, and one into a process function.

Questions:
1. At runtime, will the engine create one thread per data stream? Or one thread per operator?
2. Is it possible to dynamically create a data stream at runtime when the job starts? (i.e. if N is read from a file when the job starts and corresponding N streams need to be created)
3. Are there any specific performance impacts when a large number of streams (N ~ 10000) are created, as opposed to N partitions within a single stream?

Are there any internal (design) documents which can help understanding the implementation details? Any references to the source will also be really helpful.

Thanks in advance.

Shailesh





Reply | Threaded
Open this post in threaded view
|

Re: Correlation between data streams/operators and threads

Shailesh Jain
Thanks, Piotr. I'll try it out and will get back in case of any further questions.

Shailesh

On Fri, Nov 10, 2017 at 5:52 PM, Piotr Nowojski <[hidden email]> wrote:
1.  It’s a little bit more complicated then that. Each operator chain/task will be executed in separate thread (parallelism
 Multiplies that). You can check in web ui how was your job split into tasks.

3. Yes that’s true, this is an issue. To preserve the individual watermarks/latencies (assuming that you have some way to calculate them individually per each device), you could either:

a) have separate jobs per each device with parallelism 1. Pros: independent failures/checkpoints, Cons: resource usage (number of threads increases with number of devices, there are also other resources consumed by each job), efficiency, 
b) have one job with multiple data streams. Cons: resource usage (threads)
c) ignore Flink’s watermarks, and implement your own code in place of it. You could read all of your data in single data stream, keyBy partition/device and manually handle watermarks logic. You could either try to wrap CEP/Window operators or copy/paste and modify them to suite your needs. 

I would start and try out from a). If it work for your cluster/scale then that’s fine. If not try b) (would share most of the code with a), and as a last resort try c).

Kostas, would you like to add something?

Piotrek

On 9 Nov 2017, at 19:16, Shailesh Jain <[hidden email]> wrote:

On 1. - is it tied specifically to the number of source operators or to the number of Datastream objects created. I mean does the answer change if I read all the data from a single Kafka topic, get a Datastream of all events, and the apply N filters to create N individual streams?

On 3. - the problem with partitions is that watermarks cannot be different per partition, and since in this use case, each stream is from a device, the latency could be different (but order will be correct almost always) and there are high chances of loosing out on events on operators like Patterns which work with windows. Any ideas for workarounds here?


Thanks,
Shailesh

On 09-Nov-2017 8:48 PM, "Piotr Nowojski" <[hidden email]> wrote:
Hi,

1. 

Number of threads executing would be roughly speaking equal to of the number of input data streams multiplied by the parallelism.

2. 
Yes, you could dynamically create more data streams at the job startup.

3.
Running 10000 independent data streams on a small cluster (couple of nodes) will definitely be an issue, since even with parallelism set to 1, there would be quite a lot of unnecessary threads. 

It would be much better to treat your data as a single data input stream with multiple partitions. You could assign partitions between source instances based on parallelism. For example with parallelism 6:
- source 0 could get partitions 0, 6, 12, 18
- source 1, could get partitions 1, 7, …
- source 5, could get partitions 5, 11, ...

Piotrek

On 9 Nov 2017, at 10:18, Shailesh Jain <[hidden email]> wrote:

Hi,

I'm trying to understand the runtime aspect of Flink when dealing with multiple data streams and multiple operators per data stream.

Use case: N data streams in a single flink job (each data stream representing 1 device - with different time latencies), and each of these data streams gets split into two streams, of which one goes into a bunch of CEP operators, and one into a process function.

Questions:
1. At runtime, will the engine create one thread per data stream? Or one thread per operator?
2. Is it possible to dynamically create a data stream at runtime when the job starts? (i.e. if N is read from a file when the job starts and corresponding N streams need to be created)
3. Are there any specific performance impacts when a large number of streams (N ~ 10000) are created, as opposed to N partitions within a single stream?

Are there any internal (design) documents which can help understanding the implementation details? Any references to the source will also be really helpful.

Thanks in advance.

Shailesh






Reply | Threaded
Open this post in threaded view
|

Re: Correlation between data streams/operators and threads

Piotr Nowojski
Sure, let us know if you have other questions or encounter some issues.

Thanks, Piotrek

On 13 Nov 2017, at 14:49, Shailesh Jain <[hidden email]> wrote:

Thanks, Piotr. I'll try it out and will get back in case of any further questions.

Shailesh

On Fri, Nov 10, 2017 at 5:52 PM, Piotr Nowojski <[hidden email]> wrote:
1.  It’s a little bit more complicated then that. Each operator chain/task will be executed in separate thread (parallelism
 Multiplies that). You can check in web ui how was your job split into tasks.

3. Yes that’s true, this is an issue. To preserve the individual watermarks/latencies (assuming that you have some way to calculate them individually per each device), you could either:

a) have separate jobs per each device with parallelism 1. Pros: independent failures/checkpoints, Cons: resource usage (number of threads increases with number of devices, there are also other resources consumed by each job), efficiency, 
b) have one job with multiple data streams. Cons: resource usage (threads)
c) ignore Flink’s watermarks, and implement your own code in place of it. You could read all of your data in single data stream, keyBy partition/device and manually handle watermarks logic. You could either try to wrap CEP/Window operators or copy/paste and modify them to suite your needs. 

I would start and try out from a). If it work for your cluster/scale then that’s fine. If not try b) (would share most of the code with a), and as a last resort try c).

Kostas, would you like to add something?

Piotrek

On 9 Nov 2017, at 19:16, Shailesh Jain <[hidden email]> wrote:

On 1. - is it tied specifically to the number of source operators or to the number of Datastream objects created. I mean does the answer change if I read all the data from a single Kafka topic, get a Datastream of all events, and the apply N filters to create N individual streams?

On 3. - the problem with partitions is that watermarks cannot be different per partition, and since in this use case, each stream is from a device, the latency could be different (but order will be correct almost always) and there are high chances of loosing out on events on operators like Patterns which work with windows. Any ideas for workarounds here?


Thanks,
Shailesh

On 09-Nov-2017 8:48 PM, "Piotr Nowojski" <[hidden email]> wrote:
Hi,

1. 

Number of threads executing would be roughly speaking equal to of the number of input data streams multiplied by the parallelism.

2. 
Yes, you could dynamically create more data streams at the job startup.

3.
Running 10000 independent data streams on a small cluster (couple of nodes) will definitely be an issue, since even with parallelism set to 1, there would be quite a lot of unnecessary threads. 

It would be much better to treat your data as a single data input stream with multiple partitions. You could assign partitions between source instances based on parallelism. For example with parallelism 6:
- source 0 could get partitions 0, 6, 12, 18
- source 1, could get partitions 1, 7, …
- source 5, could get partitions 5, 11, ...

Piotrek

On 9 Nov 2017, at 10:18, Shailesh Jain <[hidden email]> wrote:

Hi,

I'm trying to understand the runtime aspect of Flink when dealing with multiple data streams and multiple operators per data stream.

Use case: N data streams in a single flink job (each data stream representing 1 device - with different time latencies), and each of these data streams gets split into two streams, of which one goes into a bunch of CEP operators, and one into a process function.

Questions:
1. At runtime, will the engine create one thread per data stream? Or one thread per operator?
2. Is it possible to dynamically create a data stream at runtime when the job starts? (i.e. if N is read from a file when the job starts and corresponding N streams need to be created)
3. Are there any specific performance impacts when a large number of streams (N ~ 10000) are created, as opposed to N partitions within a single stream?

Are there any internal (design) documents which can help understanding the implementation details? Any references to the source will also be really helpful.

Thanks in advance.

Shailesh







Reply | Threaded
Open this post in threaded view
|

Re: Correlation between data streams/operators and threads

Shailesh Jain
In reply to this post by Shailesh Jain
Hi Piotrek,

I tried out option 'a' mentioned above, but instead of separate jobs, I'm creating separate streams per device. Following is the test deployment configuration as a local cluster (8GB ram, 2.5 GHz i5, ubuntu machine):

akka.client.timeout 15 min
jobmanager.heap.mb 1024
jobmanager.rpc.address localhost
jobmanager.rpc.port 6123
jobmanager.web.port 8081
metrics.reporter.jmx.class org.apache.flink.metrics.jmx.JMXReporter
metrics.reporter.jmx.port 8789
metrics.reporters jmx
parallelism.default 1
taskmanager.heap.mb 1024
taskmanager.memory.preallocate false
taskmanager.numberOfTaskSlots 4

The number of Operators per device stream is 4 (one sink function, 3 CEP operators).

Observations (and questions):

1. No. of threads (captured through JMX) is almost the same as the total number of operators being created. This clears my original question in this thread.

2. Even when the number of task slots is 4, on web ui, it shows 3 slots as free. Is this expected? Why are the subtasks not being distributed across slots?

3. Job deployment hangs (never switches to RUNNING) when the number of devices is greater than 5. Even on increasing the akka client timeout, it does not help. Will separate jobs being deployed per device instead of separate streams help here?

4. Is there an upper limit on number task slots which can be configured? I know that my operator state size at any given point in time would not be very high, so it looks OK to deploy independent jobs which can be deployed on the same task manager across slots.

Thanks,
Shailesh


On Mon, Nov 13, 2017 at 7:21 PM, Piotr Nowojski <[hidden email]> wrote:
Sure, let us know if you have other questions or encounter some issues.

Thanks, Piotrek


On 13 Nov 2017, at 14:49, Shailesh Jain <[hidden email]> wrote:

Thanks, Piotr. I'll try it out and will get back in case of any further questions.

Shailesh

On Fri, Nov 10, 2017 at 5:52 PM, Piotr Nowojski <[hidden email]> wrote:
1.  It’s a little bit more complicated then that. Each operator chain/task will be executed in separate thread (parallelism
 Multiplies that). You can check in web ui how was your job split into tasks.

3. Yes that’s true, this is an issue. To preserve the individual watermarks/latencies (assuming that you have some way to calculate them individually per each device), you could either:

a) have separate jobs per each device with parallelism 1. Pros: independent failures/checkpoints, Cons: resource usage (number of threads increases with number of devices, there are also other resources consumed by each job), efficiency, 
b) have one job with multiple data streams. Cons: resource usage (threads)
c) ignore Flink’s watermarks, and implement your own code in place of it. You could read all of your data in single data stream, keyBy partition/device and manually handle watermarks logic. You could either try to wrap CEP/Window operators or copy/paste and modify them to suite your needs. 

I would start and try out from a). If it work for your cluster/scale then that’s fine. If not try b) (would share most of the code with a), and as a last resort try c).

Kostas, would you like to add something?

Piotrek

On 9 Nov 2017, at 19:16, Shailesh Jain <[hidden email]> wrote:

On 1. - is it tied specifically to the number of source operators or to the number of Datastream objects created. I mean does the answer change if I read all the data from a single Kafka topic, get a Datastream of all events, and the apply N filters to create N individual streams?

On 3. - the problem with partitions is that watermarks cannot be different per partition, and since in this use case, each stream is from a device, the latency could be different (but order will be correct almost always) and there are high chances of loosing out on events on operators like Patterns which work with windows. Any ideas for workarounds here?


Thanks,
Shailesh

On 09-Nov-2017 8:48 PM, "Piotr Nowojski" <[hidden email]> wrote:
Hi,

1. 

Number of threads executing would be roughly speaking equal to of the number of input data streams multiplied by the parallelism.

2. 
Yes, you could dynamically create more data streams at the job startup.

3.
Running 10000 independent data streams on a small cluster (couple of nodes) will definitely be an issue, since even with parallelism set to 1, there would be quite a lot of unnecessary threads. 

It would be much better to treat your data as a single data input stream with multiple partitions. You could assign partitions between source instances based on parallelism. For example with parallelism 6:
- source 0 could get partitions 0, 6, 12, 18
- source 1, could get partitions 1, 7, …
- source 5, could get partitions 5, 11, ...

Piotrek

On 9 Nov 2017, at 10:18, Shailesh Jain <[hidden email]> wrote:

Hi,

I'm trying to understand the runtime aspect of Flink when dealing with multiple data streams and multiple operators per data stream.

Use case: N data streams in a single flink job (each data stream representing 1 device - with different time latencies), and each of these data streams gets split into two streams, of which one goes into a bunch of CEP operators, and one into a process function.

Questions:
1. At runtime, will the engine create one thread per data stream? Or one thread per operator?
2. Is it possible to dynamically create a data stream at runtime when the job starts? (i.e. if N is read from a file when the job starts and corresponding N streams need to be created)
3. Are there any specific performance impacts when a large number of streams (N ~ 10000) are created, as opposed to N partitions within a single stream?

Are there any internal (design) documents which can help understanding the implementation details? Any references to the source will also be really helpful.

Thanks in advance.

Shailesh








Reply | Threaded
Open this post in threaded view
|

Re: Correlation between data streams/operators and threads

Piotr Nowojski
Hi,

1. 
I’m not sure what is your code. However I have tested it and here is the example with multiple streams in one job:
As expected it created 5 source threads (checked in the debugger) and is printing 5 values to the output every seconds, so clearly those 5 sources are executed simultaneously.

Number of operators is not related to the number of threads. Number of operator chains is. Simple pipelines like source -> map -> filter -> sink will be chained and executed in one threads, please refer to the documentation link in one of my earlier response.

Can you share your job code?

2. Good point, I forgot to mention that. The job in my example will have 5 operator chains, but because of task slot sharing, they will share one single task slot. In order to distribute such job with parallelism 1 across the cluster you have to define different slot sharing groups per each chain:
Just set it on the sources.

3. Can you show the logs from job manager and task manager?

4. As long as you have enough heap memory to run your application/tasks there is no upper limit for number of task slots.

Piotrek 

On 14 Nov 2017, at 07:26, Shailesh Jain <[hidden email]> wrote:

Hi Piotrek,

I tried out option 'a' mentioned above, but instead of separate jobs, I'm creating separate streams per device. Following is the test deployment configuration as a local cluster (8GB ram, 2.5 GHz i5, ubuntu machine):

akka.client.timeout 15 min
jobmanager.heap.mb 1024
jobmanager.rpc.address localhost
jobmanager.rpc.port 6123
jobmanager.web.port 8081
metrics.reporter.jmx.class org.apache.flink.metrics.jmx.JMXReporter
metrics.reporter.jmx.port 8789
metrics.reporters jmx
parallelism.default 1
taskmanager.heap.mb 1024
taskmanager.memory.preallocate false
taskmanager.numberOfTaskSlots 4

The number of Operators per device stream is 4 (one sink function, 3 CEP operators).

Observations (and questions):

1. No. of threads (captured through JMX) is almost the same as the total number of operators being created. This clears my original question in this thread.

2. Even when the number of task slots is 4, on web ui, it shows 3 slots as free. Is this expected? Why are the subtasks not being distributed across slots?

3. Job deployment hangs (never switches to RUNNING) when the number of devices is greater than 5. Even on increasing the akka client timeout, it does not help. Will separate jobs being deployed per device instead of separate streams help here?

4. Is there an upper limit on number task slots which can be configured? I know that my operator state size at any given point in time would not be very high, so it looks OK to deploy independent jobs which can be deployed on the same task manager across slots.

Thanks,
Shailesh


On Mon, Nov 13, 2017 at 7:21 PM, Piotr Nowojski <[hidden email]> wrote:
Sure, let us know if you have other questions or encounter some issues.

Thanks, Piotrek


On 13 Nov 2017, at 14:49, Shailesh Jain <[hidden email]> wrote:

Thanks, Piotr. I'll try it out and will get back in case of any further questions.

Shailesh

On Fri, Nov 10, 2017 at 5:52 PM, Piotr Nowojski <[hidden email]> wrote:
1.  It’s a little bit more complicated then that. Each operator chain/task will be executed in separate thread (parallelism
 Multiplies that). You can check in web ui how was your job split into tasks.

3. Yes that’s true, this is an issue. To preserve the individual watermarks/latencies (assuming that you have some way to calculate them individually per each device), you could either:

a) have separate jobs per each device with parallelism 1. Pros: independent failures/checkpoints, Cons: resource usage (number of threads increases with number of devices, there are also other resources consumed by each job), efficiency, 
b) have one job with multiple data streams. Cons: resource usage (threads)
c) ignore Flink’s watermarks, and implement your own code in place of it. You could read all of your data in single data stream, keyBy partition/device and manually handle watermarks logic. You could either try to wrap CEP/Window operators or copy/paste and modify them to suite your needs. 

I would start and try out from a). If it work for your cluster/scale then that’s fine. If not try b) (would share most of the code with a), and as a last resort try c).

Kostas, would you like to add something?

Piotrek

On 9 Nov 2017, at 19:16, Shailesh Jain <[hidden email]> wrote:

On 1. - is it tied specifically to the number of source operators or to the number of Datastream objects created. I mean does the answer change if I read all the data from a single Kafka topic, get a Datastream of all events, and the apply N filters to create N individual streams?

On 3. - the problem with partitions is that watermarks cannot be different per partition, and since in this use case, each stream is from a device, the latency could be different (but order will be correct almost always) and there are high chances of loosing out on events on operators like Patterns which work with windows. Any ideas for workarounds here?


Thanks,
Shailesh

On 09-Nov-2017 8:48 PM, "Piotr Nowojski" <[hidden email]> wrote:
Hi,

1. 

Number of threads executing would be roughly speaking equal to of the number of input data streams multiplied by the parallelism.

2. 
Yes, you could dynamically create more data streams at the job startup.

3.
Running 10000 independent data streams on a small cluster (couple of nodes) will definitely be an issue, since even with parallelism set to 1, there would be quite a lot of unnecessary threads. 

It would be much better to treat your data as a single data input stream with multiple partitions. You could assign partitions between source instances based on parallelism. For example with parallelism 6:
- source 0 could get partitions 0, 6, 12, 18
- source 1, could get partitions 1, 7, …
- source 5, could get partitions 5, 11, ...

Piotrek

On 9 Nov 2017, at 10:18, Shailesh Jain <[hidden email]> wrote:

Hi,

I'm trying to understand the runtime aspect of Flink when dealing with multiple data streams and multiple operators per data stream.

Use case: N data streams in a single flink job (each data stream representing 1 device - with different time latencies), and each of these data streams gets split into two streams, of which one goes into a bunch of CEP operators, and one into a process function.

Questions:
1. At runtime, will the engine create one thread per data stream? Or one thread per operator?
2. Is it possible to dynamically create a data stream at runtime when the job starts? (i.e. if N is read from a file when the job starts and corresponding N streams need to be created)
3. Are there any specific performance impacts when a large number of streams (N ~ 10000) are created, as opposed to N partitions within a single stream?

Are there any internal (design) documents which can help understanding the implementation details? Any references to the source will also be really helpful.

Thanks in advance.

Shailesh









Reply | Threaded
Open this post in threaded view
|

Re: Correlation between data streams/operators and threads

Shailesh Jain
In reply to this post by Shailesh Jain
1. Okay, I understand. My code is similar to what you demonstrated. I have attached a snap of my job plan visualization.

3. Have attached the logs and exception raised (15min - configured akka timeout) after submitting the job.

Thanks,
Shailesh


On Tue, Nov 14, 2017 at 2:46 PM, Piotr Nowojski <[hidden email]> wrote:
Hi,

1. 
I’m not sure what is your code. However I have tested it and here is the example with multiple streams in one job:
As expected it created 5 source threads (checked in the debugger) and is printing 5 values to the output every seconds, so clearly those 5 sources are executed simultaneously.

Number of operators is not related to the number of threads. Number of operator chains is. Simple pipelines like source -> map -> filter -> sink will be chained and executed in one threads, please refer to the documentation link in one of my earlier response.

Can you share your job code?

2. Good point, I forgot to mention that. The job in my example will have 5 operator chains, but because of task slot sharing, they will share one single task slot. In order to distribute such job with parallelism 1 across the cluster you have to define different slot sharing groups per each chain:
Just set it on the sources.

3. Can you show the logs from job manager and task manager?

4. As long as you have enough heap memory to run your application/tasks there is no upper limit for number of task slots.

Piotrek 

On 14 Nov 2017, at 07:26, Shailesh Jain <[hidden email]> wrote:

Hi Piotrek,

I tried out option 'a' mentioned above, but instead of separate jobs, I'm creating separate streams per device. Following is the test deployment configuration as a local cluster (8GB ram, 2.5 GHz i5, ubuntu machine):

akka.client.timeout 15 min
jobmanager.heap.mb 1024
jobmanager.rpc.address localhost
jobmanager.rpc.port 6123
jobmanager.web.port 8081
metrics.reporter.jmx.class org.apache.flink.metrics.jmx.JMXReporter
metrics.reporter.jmx.port 8789
metrics.reporters jmx
parallelism.default 1
taskmanager.heap.mb 1024
taskmanager.memory.preallocate false
taskmanager.numberOfTaskSlots 4

The number of Operators per device stream is 4 (one sink function, 3 CEP operators).

Observations (and questions):

1. No. of threads (captured through JMX) is almost the same as the total number of operators being created. This clears my original question in this thread.

2. Even when the number of task slots is 4, on web ui, it shows 3 slots as free. Is this expected? Why are the subtasks not being distributed across slots?

3. Job deployment hangs (never switches to RUNNING) when the number of devices is greater than 5. Even on increasing the akka client timeout, it does not help. Will separate jobs being deployed per device instead of separate streams help here?

4. Is there an upper limit on number task slots which can be configured? I know that my operator state size at any given point in time would not be very high, so it looks OK to deploy independent jobs which can be deployed on the same task manager across slots.

Thanks,
Shailesh


On Mon, Nov 13, 2017 at 7:21 PM, Piotr Nowojski <[hidden email]> wrote:
Sure, let us know if you have other questions or encounter some issues.

Thanks, Piotrek


On 13 Nov 2017, at 14:49, Shailesh Jain <[hidden email]> wrote:

Thanks, Piotr. I'll try it out and will get back in case of any further questions.

Shailesh

On Fri, Nov 10, 2017 at 5:52 PM, Piotr Nowojski <[hidden email]> wrote:
1.  It’s a little bit more complicated then that. Each operator chain/task will be executed in separate thread (parallelism
 Multiplies that). You can check in web ui how was your job split into tasks.

3. Yes that’s true, this is an issue. To preserve the individual watermarks/latencies (assuming that you have some way to calculate them individually per each device), you could either:

a) have separate jobs per each device with parallelism 1. Pros: independent failures/checkpoints, Cons: resource usage (number of threads increases with number of devices, there are also other resources consumed by each job), efficiency, 
b) have one job with multiple data streams. Cons: resource usage (threads)
c) ignore Flink’s watermarks, and implement your own code in place of it. You could read all of your data in single data stream, keyBy partition/device and manually handle watermarks logic. You could either try to wrap CEP/Window operators or copy/paste and modify them to suite your needs. 

I would start and try out from a). If it work for your cluster/scale then that’s fine. If not try b) (would share most of the code with a), and as a last resort try c).

Kostas, would you like to add something?

Piotrek

On 9 Nov 2017, at 19:16, Shailesh Jain <[hidden email]> wrote:

On 1. - is it tied specifically to the number of source operators or to the number of Datastream objects created. I mean does the answer change if I read all the data from a single Kafka topic, get a Datastream of all events, and the apply N filters to create N individual streams?

On 3. - the problem with partitions is that watermarks cannot be different per partition, and since in this use case, each stream is from a device, the latency could be different (but order will be correct almost always) and there are high chances of loosing out on events on operators like Patterns which work with windows. Any ideas for workarounds here?


Thanks,
Shailesh

On 09-Nov-2017 8:48 PM, "Piotr Nowojski" <[hidden email]> wrote:
Hi,

1. 

Number of threads executing would be roughly speaking equal to of the number of input data streams multiplied by the parallelism.

2. 
Yes, you could dynamically create more data streams at the job startup.

3.
Running 10000 independent data streams on a small cluster (couple of nodes) will definitely be an issue, since even with parallelism set to 1, there would be quite a lot of unnecessary threads. 

It would be much better to treat your data as a single data input stream with multiple partitions. You could assign partitions between source instances based on parallelism. For example with parallelism 6:
- source 0 could get partitions 0, 6, 12, 18
- source 1, could get partitions 1, 7, …
- source 5, could get partitions 5, 11, ...

Piotrek

On 9 Nov 2017, at 10:18, Shailesh Jain <[hidden email]> wrote:

Hi,

I'm trying to understand the runtime aspect of Flink when dealing with multiple data streams and multiple operators per data stream.

Use case: N data streams in a single flink job (each data stream representing 1 device - with different time latencies), and each of these data streams gets split into two streams, of which one goes into a bunch of CEP operators, and one into a process function.

Questions:
1. At runtime, will the engine create one thread per data stream? Or one thread per operator?
2. Is it possible to dynamically create a data stream at runtime when the job starts? (i.e. if N is read from a file when the job starts and corresponding N streams need to be created)
3. Are there any specific performance impacts when a large number of streams (N ~ 10000) are created, as opposed to N partitions within a single stream?

Are there any internal (design) documents which can help understanding the implementation details? Any references to the source will also be really helpful.

Thanks in advance.

Shailesh











Screenshot-2017-11-14 Flink Plan Visualizer.png (86K) Download Attachment
flink-shailesh-jobmanager-0-shailesh.log (28K) Download Attachment
Exception (5K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Correlation between data streams/operators and threads

Piotr Nowojski
1. It seems like you have one single data source, not one per device. That might make a difference. Single data source followed by comap might create one single operator chain. If you want to go this way, please use my suggested solution c), since you will have troubles with handling watermarks anyway with single data source.

3. Nico, can you take a look at this one? Isn’t this a blob server issue?

Piotrek

On 14 Nov 2017, at 11:35, Shailesh Jain <[hidden email]> wrote:

1. Okay, I understand. My code is similar to what you demonstrated. I have attached a snap of my job plan visualization.

3. Have attached the logs and exception raised (15min - configured akka timeout) after submitting the job.

Thanks,
Shailesh


On Tue, Nov 14, 2017 at 2:46 PM, Piotr Nowojski <[hidden email]> wrote:
Hi,

1. 
I’m not sure what is your code. However I have tested it and here is the example with multiple streams in one job:
As expected it created 5 source threads (checked in the debugger) and is printing 5 values to the output every seconds, so clearly those 5 sources are executed simultaneously.

Number of operators is not related to the number of threads. Number of operator chains is. Simple pipelines like source -> map -> filter -> sink will be chained and executed in one threads, please refer to the documentation link in one of my earlier response.

Can you share your job code?

2. Good point, I forgot to mention that. The job in my example will have 5 operator chains, but because of task slot sharing, they will share one single task slot. In order to distribute such job with parallelism 1 across the cluster you have to define different slot sharing groups per each chain:
Just set it on the sources.

3. Can you show the logs from job manager and task manager?

4. As long as you have enough heap memory to run your application/tasks there is no upper limit for number of task slots.

Piotrek 

On 14 Nov 2017, at 07:26, Shailesh Jain <[hidden email]> wrote:

Hi Piotrek,

I tried out option 'a' mentioned above, but instead of separate jobs, I'm creating separate streams per device. Following is the test deployment configuration as a local cluster (8GB ram, 2.5 GHz i5, ubuntu machine):

akka.client.timeout 15 min
jobmanager.heap.mb 1024
jobmanager.rpc.address localhost
jobmanager.rpc.port 6123
jobmanager.web.port 8081
metrics.reporter.jmx.class org.apache.flink.metrics.jmx.JMXReporter
metrics.reporter.jmx.port 8789
metrics.reporters jmx
parallelism.default 1
taskmanager.heap.mb 1024
taskmanager.memory.preallocate false
taskmanager.numberOfTaskSlots 4

The number of Operators per device stream is 4 (one sink function, 3 CEP operators).

Observations (and questions):

1. No. of threads (captured through JMX) is almost the same as the total number of operators being created. This clears my original question in this thread.

2. Even when the number of task slots is 4, on web ui, it shows 3 slots as free. Is this expected? Why are the subtasks not being distributed across slots?

3. Job deployment hangs (never switches to RUNNING) when the number of devices is greater than 5. Even on increasing the akka client timeout, it does not help. Will separate jobs being deployed per device instead of separate streams help here?

4. Is there an upper limit on number task slots which can be configured? I know that my operator state size at any given point in time would not be very high, so it looks OK to deploy independent jobs which can be deployed on the same task manager across slots.

Thanks,
Shailesh


On Mon, Nov 13, 2017 at 7:21 PM, Piotr Nowojski <[hidden email]> wrote:
Sure, let us know if you have other questions or encounter some issues.

Thanks, Piotrek


On 13 Nov 2017, at 14:49, Shailesh Jain <[hidden email]> wrote:

Thanks, Piotr. I'll try it out and will get back in case of any further questions.

Shailesh

On Fri, Nov 10, 2017 at 5:52 PM, Piotr Nowojski <[hidden email]> wrote:
1.  It’s a little bit more complicated then that. Each operator chain/task will be executed in separate thread (parallelism
 Multiplies that). You can check in web ui how was your job split into tasks.

3. Yes that’s true, this is an issue. To preserve the individual watermarks/latencies (assuming that you have some way to calculate them individually per each device), you could either:

a) have separate jobs per each device with parallelism 1. Pros: independent failures/checkpoints, Cons: resource usage (number of threads increases with number of devices, there are also other resources consumed by each job), efficiency, 
b) have one job with multiple data streams. Cons: resource usage (threads)
c) ignore Flink’s watermarks, and implement your own code in place of it. You could read all of your data in single data stream, keyBy partition/device and manually handle watermarks logic. You could either try to wrap CEP/Window operators or copy/paste and modify them to suite your needs. 

I would start and try out from a). If it work for your cluster/scale then that’s fine. If not try b) (would share most of the code with a), and as a last resort try c).

Kostas, would you like to add something?

Piotrek

On 9 Nov 2017, at 19:16, Shailesh Jain <[hidden email]> wrote:

On 1. - is it tied specifically to the number of source operators or to the number of Datastream objects created. I mean does the answer change if I read all the data from a single Kafka topic, get a Datastream of all events, and the apply N filters to create N individual streams?

On 3. - the problem with partitions is that watermarks cannot be different per partition, and since in this use case, each stream is from a device, the latency could be different (but order will be correct almost always) and there are high chances of loosing out on events on operators like Patterns which work with windows. Any ideas for workarounds here?


Thanks,
Shailesh

On 09-Nov-2017 8:48 PM, "Piotr Nowojski" <[hidden email]> wrote:
Hi,

1. 

Number of threads executing would be roughly speaking equal to of the number of input data streams multiplied by the parallelism.

2. 
Yes, you could dynamically create more data streams at the job startup.

3.
Running 10000 independent data streams on a small cluster (couple of nodes) will definitely be an issue, since even with parallelism set to 1, there would be quite a lot of unnecessary threads. 

It would be much better to treat your data as a single data input stream with multiple partitions. You could assign partitions between source instances based on parallelism. For example with parallelism 6:
- source 0 could get partitions 0, 6, 12, 18
- source 1, could get partitions 1, 7, …
- source 5, could get partitions 5, 11, ...

Piotrek

On 9 Nov 2017, at 10:18, Shailesh Jain <[hidden email]> wrote:

Hi,

I'm trying to understand the runtime aspect of Flink when dealing with multiple data streams and multiple operators per data stream.

Use case: N data streams in a single flink job (each data stream representing 1 device - with different time latencies), and each of these data streams gets split into two streams, of which one goes into a bunch of CEP operators, and one into a process function.

Questions:
1. At runtime, will the engine create one thread per data stream? Or one thread per operator?
2. Is it possible to dynamically create a data stream at runtime when the job starts? (i.e. if N is read from a file when the job starts and corresponding N streams need to be created)
3. Are there any specific performance impacts when a large number of streams (N ~ 10000) are created, as opposed to N partitions within a single stream?

Are there any internal (design) documents which can help understanding the implementation details? Any references to the source will also be really helpful.

Thanks in advance.

Shailesh










<Screenshot-2017-11-14 Flink Plan Visualizer.png><flink-shailesh-jobmanager-0-shailesh.log><Exception>

Reply | Threaded
Open this post in threaded view
|

Re: Correlation between data streams/operators and threads

Shailesh Jain
1. Single data source because I have one kafka topic where all events get published. But I am creating multiple data streams by applying a series of filter operations on the single input stream, to generate device specific data stream, and then assigning the watermarks on that stream. Will this not result in downstream operators (for a particular device specific stream) to get correct device specific watermarks?

Job code:

// eventStream initially contains all events from all devices

        for (int i = 0; i < TOTAL_DEVICES; i++) {
            DataStream<Event> deviceOnlyEvents = eventStream.filter(new DeviceFilter(i))
                    .assignTimestampsAndWatermarks(new EventTimeStampExtractor(Time.milliseconds(1))).setParallelism(1);
            // apply CEP operators, and generate derived events
            DataStream<Event> derivedEvents = PatternCreator.addPatternsOnStream(deviceOnlyEvents, appId);
            // also pass the stream through a process function (this gets chained with the source operator as you had mentioned above)
            DataStream<Event> stateTransitionEvents = deviceOnlyEvents.process(new StateMachineOperator(appId)).setParallelism(1);
            // add sink to the new event streams
            derivedEvents.union(stateTransitionEvents).addSink(kafkaSink);
        }

Comments?

Thanks,
Shailesh

On Tue, Nov 14, 2017 at 6:57 PM, Piotr Nowojski <[hidden email]> wrote:
1. It seems like you have one single data source, not one per device. That might make a difference. Single data source followed by comap might create one single operator chain. If you want to go this way, please use my suggested solution c), since you will have troubles with handling watermarks anyway with single data source.

3. Nico, can you take a look at this one? Isn’t this a blob server issue?

Piotrek

On 14 Nov 2017, at 11:35, Shailesh Jain <[hidden email]> wrote:

1. Okay, I understand. My code is similar to what you demonstrated. I have attached a snap of my job plan visualization.

3. Have attached the logs and exception raised (15min - configured akka timeout) after submitting the job.

Thanks,
Shailesh


On Tue, Nov 14, 2017 at 2:46 PM, Piotr Nowojski <[hidden email]> wrote:
Hi,

1. 
I’m not sure what is your code. However I have tested it and here is the example with multiple streams in one job:
As expected it created 5 source threads (checked in the debugger) and is printing 5 values to the output every seconds, so clearly those 5 sources are executed simultaneously.

Number of operators is not related to the number of threads. Number of operator chains is. Simple pipelines like source -> map -> filter -> sink will be chained and executed in one threads, please refer to the documentation link in one of my earlier response.

Can you share your job code?

2. Good point, I forgot to mention that. The job in my example will have 5 operator chains, but because of task slot sharing, they will share one single task slot. In order to distribute such job with parallelism 1 across the cluster you have to define different slot sharing groups per each chain:
Just set it on the sources.

3. Can you show the logs from job manager and task manager?

4. As long as you have enough heap memory to run your application/tasks there is no upper limit for number of task slots.

Piotrek 

On 14 Nov 2017, at 07:26, Shailesh Jain <[hidden email]> wrote:

Hi Piotrek,

I tried out option 'a' mentioned above, but instead of separate jobs, I'm creating separate streams per device. Following is the test deployment configuration as a local cluster (8GB ram, 2.5 GHz i5, ubuntu machine):

akka.client.timeout 15 min
jobmanager.heap.mb 1024
jobmanager.rpc.address localhost
jobmanager.rpc.port 6123
jobmanager.web.port 8081
metrics.reporter.jmx.class org.apache.flink.metrics.jmx.JMXReporter
metrics.reporter.jmx.port 8789
metrics.reporters jmx
parallelism.default 1
taskmanager.heap.mb 1024
taskmanager.memory.preallocate false
taskmanager.numberOfTaskSlots 4

The number of Operators per device stream is 4 (one sink function, 3 CEP operators).

Observations (and questions):

1. No. of threads (captured through JMX) is almost the same as the total number of operators being created. This clears my original question in this thread.

2. Even when the number of task slots is 4, on web ui, it shows 3 slots as free. Is this expected? Why are the subtasks not being distributed across slots?

3. Job deployment hangs (never switches to RUNNING) when the number of devices is greater than 5. Even on increasing the akka client timeout, it does not help. Will separate jobs being deployed per device instead of separate streams help here?

4. Is there an upper limit on number task slots which can be configured? I know that my operator state size at any given point in time would not be very high, so it looks OK to deploy independent jobs which can be deployed on the same task manager across slots.

Thanks,
Shailesh


On Mon, Nov 13, 2017 at 7:21 PM, Piotr Nowojski <[hidden email]> wrote:
Sure, let us know if you have other questions or encounter some issues.

Thanks, Piotrek


On 13 Nov 2017, at 14:49, Shailesh Jain <[hidden email]> wrote:

Thanks, Piotr. I'll try it out and will get back in case of any further questions.

Shailesh

On Fri, Nov 10, 2017 at 5:52 PM, Piotr Nowojski <[hidden email]> wrote:
1.  It’s a little bit more complicated then that. Each operator chain/task will be executed in separate thread (parallelism
 Multiplies that). You can check in web ui how was your job split into tasks.

3. Yes that’s true, this is an issue. To preserve the individual watermarks/latencies (assuming that you have some way to calculate them individually per each device), you could either:

a) have separate jobs per each device with parallelism 1. Pros: independent failures/checkpoints, Cons: resource usage (number of threads increases with number of devices, there are also other resources consumed by each job), efficiency, 
b) have one job with multiple data streams. Cons: resource usage (threads)
c) ignore Flink’s watermarks, and implement your own code in place of it. You could read all of your data in single data stream, keyBy partition/device and manually handle watermarks logic. You could either try to wrap CEP/Window operators or copy/paste and modify them to suite your needs. 

I would start and try out from a). If it work for your cluster/scale then that’s fine. If not try b) (would share most of the code with a), and as a last resort try c).

Kostas, would you like to add something?

Piotrek

On 9 Nov 2017, at 19:16, Shailesh Jain <[hidden email]> wrote:

On 1. - is it tied specifically to the number of source operators or to the number of Datastream objects created. I mean does the answer change if I read all the data from a single Kafka topic, get a Datastream of all events, and the apply N filters to create N individual streams?

On 3. - the problem with partitions is that watermarks cannot be different per partition, and since in this use case, each stream is from a device, the latency could be different (but order will be correct almost always) and there are high chances of loosing out on events on operators like Patterns which work with windows. Any ideas for workarounds here?


Thanks,
Shailesh

On 09-Nov-2017 8:48 PM, "Piotr Nowojski" <[hidden email]> wrote:
Hi,

1. 

Number of threads executing would be roughly speaking equal to of the number of input data streams multiplied by the parallelism.

2. 
Yes, you could dynamically create more data streams at the job startup.

3.
Running 10000 independent data streams on a small cluster (couple of nodes) will definitely be an issue, since even with parallelism set to 1, there would be quite a lot of unnecessary threads. 

It would be much better to treat your data as a single data input stream with multiple partitions. You could assign partitions between source instances based on parallelism. For example with parallelism 6:
- source 0 could get partitions 0, 6, 12, 18
- source 1, could get partitions 1, 7, …
- source 5, could get partitions 5, 11, ...

Piotrek

On 9 Nov 2017, at 10:18, Shailesh Jain <[hidden email]> wrote:

Hi,

I'm trying to understand the runtime aspect of Flink when dealing with multiple data streams and multiple operators per data stream.

Use case: N data streams in a single flink job (each data stream representing 1 device - with different time latencies), and each of these data streams gets split into two streams, of which one goes into a bunch of CEP operators, and one into a process function.

Questions:
1. At runtime, will the engine create one thread per data stream? Or one thread per operator?
2. Is it possible to dynamically create a data stream at runtime when the job starts? (i.e. if N is read from a file when the job starts and corresponding N streams need to be created)
3. Are there any specific performance impacts when a large number of streams (N ~ 10000) are created, as opposed to N partitions within a single stream?

Are there any internal (design) documents which can help understanding the implementation details? Any references to the source will also be really helpful.

Thanks in advance.

Shailesh










<Screenshot-2017-11-14 Flink Plan Visualizer.png><flink-shailesh-jobmanager-0-shailesh.log><Exception>


Reply | Threaded
Open this post in threaded view
|

Re: Correlation between data streams/operators and threads

Shailesh Jain
Bump.

On Wed, Nov 15, 2017 at 12:34 AM, Shailesh Jain <[hidden email]> wrote:
1. Single data source because I have one kafka topic where all events get published. But I am creating multiple data streams by applying a series of filter operations on the single input stream, to generate device specific data stream, and then assigning the watermarks on that stream. Will this not result in downstream operators (for a particular device specific stream) to get correct device specific watermarks?

Job code:

// eventStream initially contains all events from all devices

        for (int i = 0; i < TOTAL_DEVICES; i++) {
            DataStream<Event> deviceOnlyEvents = eventStream.filter(new DeviceFilter(i))
                    .assignTimestampsAndWatermarks(new EventTimeStampExtractor(Time.milliseconds(1))).setParallelism(1);
            // apply CEP operators, and generate derived events
            DataStream<Event> derivedEvents = PatternCreator.addPatternsOnStream(deviceOnlyEvents, appId);
            // also pass the stream through a process function (this gets chained with the source operator as you had mentioned above)
            DataStream<Event> stateTransitionEvents = deviceOnlyEvents.process(new StateMachineOperator(appId)).setParallelism(1);
            // add sink to the new event streams
            derivedEvents.union(stateTransitionEvents).addSink(kafkaSink);
        }

Comments?

Thanks,
Shailesh

On Tue, Nov 14, 2017 at 6:57 PM, Piotr Nowojski <[hidden email]> wrote:
1. It seems like you have one single data source, not one per device. That might make a difference. Single data source followed by comap might create one single operator chain. If you want to go this way, please use my suggested solution c), since you will have troubles with handling watermarks anyway with single data source.

3. Nico, can you take a look at this one? Isn’t this a blob server issue?

Piotrek

On 14 Nov 2017, at 11:35, Shailesh Jain <[hidden email]> wrote:

1. Okay, I understand. My code is similar to what you demonstrated. I have attached a snap of my job plan visualization.

3. Have attached the logs and exception raised (15min - configured akka timeout) after submitting the job.

Thanks,
Shailesh


On Tue, Nov 14, 2017 at 2:46 PM, Piotr Nowojski <[hidden email]> wrote:
Hi,

1. 
I’m not sure what is your code. However I have tested it and here is the example with multiple streams in one job:
As expected it created 5 source threads (checked in the debugger) and is printing 5 values to the output every seconds, so clearly those 5 sources are executed simultaneously.

Number of operators is not related to the number of threads. Number of operator chains is. Simple pipelines like source -> map -> filter -> sink will be chained and executed in one threads, please refer to the documentation link in one of my earlier response.

Can you share your job code?

2. Good point, I forgot to mention that. The job in my example will have 5 operator chains, but because of task slot sharing, they will share one single task slot. In order to distribute such job with parallelism 1 across the cluster you have to define different slot sharing groups per each chain:
Just set it on the sources.

3. Can you show the logs from job manager and task manager?

4. As long as you have enough heap memory to run your application/tasks there is no upper limit for number of task slots.

Piotrek 

On 14 Nov 2017, at 07:26, Shailesh Jain <[hidden email]> wrote:

Hi Piotrek,

I tried out option 'a' mentioned above, but instead of separate jobs, I'm creating separate streams per device. Following is the test deployment configuration as a local cluster (8GB ram, 2.5 GHz i5, ubuntu machine):

akka.client.timeout 15 min
jobmanager.heap.mb 1024
jobmanager.rpc.address localhost
jobmanager.rpc.port 6123
jobmanager.web.port 8081
metrics.reporter.jmx.class org.apache.flink.metrics.jmx.JMXReporter
metrics.reporter.jmx.port 8789
metrics.reporters jmx
parallelism.default 1
taskmanager.heap.mb 1024
taskmanager.memory.preallocate false
taskmanager.numberOfTaskSlots 4

The number of Operators per device stream is 4 (one sink function, 3 CEP operators).

Observations (and questions):

1. No. of threads (captured through JMX) is almost the same as the total number of operators being created. This clears my original question in this thread.

2. Even when the number of task slots is 4, on web ui, it shows 3 slots as free. Is this expected? Why are the subtasks not being distributed across slots?

3. Job deployment hangs (never switches to RUNNING) when the number of devices is greater than 5. Even on increasing the akka client timeout, it does not help. Will separate jobs being deployed per device instead of separate streams help here?

4. Is there an upper limit on number task slots which can be configured? I know that my operator state size at any given point in time would not be very high, so it looks OK to deploy independent jobs which can be deployed on the same task manager across slots.

Thanks,
Shailesh


On Mon, Nov 13, 2017 at 7:21 PM, Piotr Nowojski <[hidden email]> wrote:
Sure, let us know if you have other questions or encounter some issues.

Thanks, Piotrek


On 13 Nov 2017, at 14:49, Shailesh Jain <[hidden email]> wrote:

Thanks, Piotr. I'll try it out and will get back in case of any further questions.

Shailesh

On Fri, Nov 10, 2017 at 5:52 PM, Piotr Nowojski <[hidden email]> wrote:
1.  It’s a little bit more complicated then that. Each operator chain/task will be executed in separate thread (parallelism
 Multiplies that). You can check in web ui how was your job split into tasks.

3. Yes that’s true, this is an issue. To preserve the individual watermarks/latencies (assuming that you have some way to calculate them individually per each device), you could either:

a) have separate jobs per each device with parallelism 1. Pros: independent failures/checkpoints, Cons: resource usage (number of threads increases with number of devices, there are also other resources consumed by each job), efficiency, 
b) have one job with multiple data streams. Cons: resource usage (threads)
c) ignore Flink’s watermarks, and implement your own code in place of it. You could read all of your data in single data stream, keyBy partition/device and manually handle watermarks logic. You could either try to wrap CEP/Window operators or copy/paste and modify them to suite your needs. 

I would start and try out from a). If it work for your cluster/scale then that’s fine. If not try b) (would share most of the code with a), and as a last resort try c).

Kostas, would you like to add something?

Piotrek

On 9 Nov 2017, at 19:16, Shailesh Jain <[hidden email]> wrote:

On 1. - is it tied specifically to the number of source operators or to the number of Datastream objects created. I mean does the answer change if I read all the data from a single Kafka topic, get a Datastream of all events, and the apply N filters to create N individual streams?

On 3. - the problem with partitions is that watermarks cannot be different per partition, and since in this use case, each stream is from a device, the latency could be different (but order will be correct almost always) and there are high chances of loosing out on events on operators like Patterns which work with windows. Any ideas for workarounds here?


Thanks,
Shailesh

On 09-Nov-2017 8:48 PM, "Piotr Nowojski" <[hidden email]> wrote:
Hi,

1. 

Number of threads executing would be roughly speaking equal to of the number of input data streams multiplied by the parallelism.

2. 
Yes, you could dynamically create more data streams at the job startup.

3.
Running 10000 independent data streams on a small cluster (couple of nodes) will definitely be an issue, since even with parallelism set to 1, there would be quite a lot of unnecessary threads. 

It would be much better to treat your data as a single data input stream with multiple partitions. You could assign partitions between source instances based on parallelism. For example with parallelism 6:
- source 0 could get partitions 0, 6, 12, 18
- source 1, could get partitions 1, 7, …
- source 5, could get partitions 5, 11, ...

Piotrek

On 9 Nov 2017, at 10:18, Shailesh Jain <[hidden email]> wrote:

Hi,

I'm trying to understand the runtime aspect of Flink when dealing with multiple data streams and multiple operators per data stream.

Use case: N data streams in a single flink job (each data stream representing 1 device - with different time latencies), and each of these data streams gets split into two streams, of which one goes into a bunch of CEP operators, and one into a process function.

Questions:
1. At runtime, will the engine create one thread per data stream? Or one thread per operator?
2. Is it possible to dynamically create a data stream at runtime when the job starts? (i.e. if N is read from a file when the job starts and corresponding N streams need to be created)
3. Are there any specific performance impacts when a large number of streams (N ~ 10000) are created, as opposed to N partitions within a single stream?

Are there any internal (design) documents which can help understanding the implementation details? Any references to the source will also be really helpful.

Thanks in advance.

Shailesh










<Screenshot-2017-11-14 Flink Plan Visualizer.png><flink-shailesh-jobmanager-0-shailesh.log><Exception>



Reply | Threaded
Open this post in threaded view
|

Re: Correlation between data streams/operators and threads

Nico Kruber
In reply to this post by Piotr Nowojski
regarding 3.
a) The taskmanager logs are missing, are there any?
b) Also, the JobManager logs say you have 4 slots available in total - is this
enough for your 5 devices scenario?
c) The JobManager log, however, does not really reveal what it is currently
doing, can you set the log level to DEBUG to see more?
d) Also, do you still observe CPU load during the 15min as an indication that
it is actually doing something?
e) During this 15min period where apparently nothing happens, can you provide
the output of "jstack <jobmanager_pid>" (with the PID of your JobManager)?
f) You may further be able to debug into what is happening by running this in
your IDE in debug mode and pause the execution when you suspect it to hang.


Nico

On Tuesday, 14 November 2017 14:27:36 CET Piotr Nowojski wrote:

> 3. Nico, can you take a look at this one? Isn’t this a blob server issue?
>
> Piotrek
>
> > On 14 Nov 2017, at 11:35, Shailesh Jain <[hidden email]>
> > wrote:
> >
> > 3. Have attached the logs and exception raised (15min - configured akka
> > timeout) after submitting the job.
> >
> > Thanks,
> > Shailesh
> >
> >
> > On Tue, Nov 14, 2017 at 2:46 PM, Piotr Nowojski <[hidden email]
> > <mailto:[hidden email]>> wrote: Hi,
> >
> > 3. Can you show the logs from job manager and task manager?
> >
> >> On 14 Nov 2017, at 07:26, Shailesh Jain <[hidden email]
> >> <mailto:[hidden email]>> wrote:
> >>
> >> Hi Piotrek,
> >>
> >> I tried out option 'a' mentioned above, but instead of separate jobs, I'm
> >> creating separate streams per device. Following is the test deployment
> >> configuration as a local cluster (8GB ram, 2.5 GHz i5, ubuntu machine):
> >>
> >> akka.client.timeout 15 min
> >> jobmanager.heap.mb 1024
> >> jobmanager.rpc.address localhost
> >> jobmanager.rpc.port 6123
> >> jobmanager.web.port 8081
> >> metrics.reporter.jmx.class org.apache.flink.metrics.jmx.JMXReporter
> >> metrics.reporter.jmx.port 8789
> >> metrics.reporters jmx
> >> parallelism.default 1
> >> taskmanager.heap.mb 1024
> >> taskmanager.memory.preallocate false
> >> taskmanager.numberOfTaskSlots 4
> >>
> >> The number of Operators per device stream is 4 (one sink function, 3 CEP
> >> operators).
> >>
> >> Observations (and questions):
> >>
> >> 3. Job deployment hangs (never switches to RUNNING) when the number of
> >> devices is greater than 5. Even on increasing the akka client timeout,
> >> it does not help. Will separate jobs being deployed per device instead
> >> of separate streams help here?
> >>
> >> Thanks,
> >> Shailesh

signature.asc (201 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Correlation between data streams/operators and threads

Piotr Nowojski
Sorry for not responding but I was away.

Regarding 1.

One source operator, followed by multiple tasks with parallelism 1 (as visible on your screen shot) that share resource group will collapse to one task slot - only one TaskManager will execute all of your job.


Because all of your events are written into one Kafka topic, previously proposed solutions A) (multiple jobs), and B) (one job with multiple sources) can not work. In that case what you have to do is either:

D) set parallelism as you wish in the environment, read from Kafka, keyBy device type, split the stream by filtering by device type (or using side outputs), perform your logic

This will create TOTAL_DEVICES number of data streams after keyBy on each machine, and filtering will cost you (it will be linear according TOTAL_DEVICES), but should be the easiest solution.

E) set parallelism as you wish, read from Kafka, keyBy device type, write custom operators with custom logic handling watermarks using KeyedState

However I would strongly suggest to re-consider

F) ignore all the issue of assigning different watermarks per device stream, just assign minimal from all of the devices. It would be the easiest to implement.

Piotrek

> On 17 Nov 2017, at 09:22, Nico Kruber <[hidden email]> wrote:
>
> regarding 3.
> a) The taskmanager logs are missing, are there any?
> b) Also, the JobManager logs say you have 4 slots available in total - is this
> enough for your 5 devices scenario?
> c) The JobManager log, however, does not really reveal what it is currently
> doing, can you set the log level to DEBUG to see more?
> d) Also, do you still observe CPU load during the 15min as an indication that
> it is actually doing something?
> e) During this 15min period where apparently nothing happens, can you provide
> the output of "jstack <jobmanager_pid>" (with the PID of your JobManager)?
> f) You may further be able to debug into what is happening by running this in
> your IDE in debug mode and pause the execution when you suspect it to hang.
>
>
> Nico
>
> On Tuesday, 14 November 2017 14:27:36 CET Piotr Nowojski wrote:
>> 3. Nico, can you take a look at this one? Isn’t this a blob server issue?
>>
>> Piotrek
>>
>>> On 14 Nov 2017, at 11:35, Shailesh Jain <[hidden email]>
>>> wrote:
>>>
>>> 3. Have attached the logs and exception raised (15min - configured akka
>>> timeout) after submitting the job.
>>>
>>> Thanks,
>>> Shailesh
>>>
>>>
>>> On Tue, Nov 14, 2017 at 2:46 PM, Piotr Nowojski <[hidden email]
>>> <mailto:[hidden email]>> wrote: Hi,
>>>
>>> 3. Can you show the logs from job manager and task manager?
>>>
>>>> On 14 Nov 2017, at 07:26, Shailesh Jain <[hidden email]
>>>> <mailto:[hidden email]>> wrote:
>>>>
>>>> Hi Piotrek,
>>>>
>>>> I tried out option 'a' mentioned above, but instead of separate jobs, I'm
>>>> creating separate streams per device. Following is the test deployment
>>>> configuration as a local cluster (8GB ram, 2.5 GHz i5, ubuntu machine):
>>>>
>>>> akka.client.timeout 15 min
>>>> jobmanager.heap.mb 1024
>>>> jobmanager.rpc.address localhost
>>>> jobmanager.rpc.port 6123
>>>> jobmanager.web.port 8081
>>>> metrics.reporter.jmx.class org.apache.flink.metrics.jmx.JMXReporter
>>>> metrics.reporter.jmx.port 8789
>>>> metrics.reporters jmx
>>>> parallelism.default 1
>>>> taskmanager.heap.mb 1024
>>>> taskmanager.memory.preallocate false
>>>> taskmanager.numberOfTaskSlots 4
>>>>
>>>> The number of Operators per device stream is 4 (one sink function, 3 CEP
>>>> operators).
>>>>
>>>> Observations (and questions):
>>>>
>>>> 3. Job deployment hangs (never switches to RUNNING) when the number of
>>>> devices is greater than 5. Even on increasing the akka client timeout,
>>>> it does not help. Will separate jobs being deployed per device instead
>>>> of separate streams help here?
>>>>
>>>> Thanks,
>>>> Shailesh

Reply | Threaded
Open this post in threaded view
|

Re: Correlation between data streams/operators and threads

Shailesh Jain
In reply to this post by Nico Kruber
a) Nope, there are no taskmanager logs, the job never switches to RUNNING state.

b) I think so, because even when I start the job with 4 devices, only 1 slot is used, and 3 are free.

c) Attached

d) Attached

e) I'll try the debug mode in Eclipse.

Thanks,
Shailesh

On Fri, Nov 17, 2017 at 1:52 PM, Nico Kruber <[hidden email]> wrote:
regarding 3.
a) The taskmanager logs are missing, are there any?
b) Also, the JobManager logs say you have 4 slots available in total - is this
enough for your 5 devices scenario?
c) The JobManager log, however, does not really reveal what it is currently
doing, can you set the log level to DEBUG to see more?
d) Also, do you still observe CPU load during the 15min as an indication that
it is actually doing something?
e) During this 15min period where apparently nothing happens, can you provide
the output of "jstack <jobmanager_pid>" (with the PID of your JobManager)?
f) You may further be able to debug into what is happening by running this in
your IDE in debug mode and pause the execution when you suspect it to hang.


Nico

On Tuesday, 14 November 2017 14:27:36 CET Piotr Nowojski wrote:
> 3. Nico, can you take a look at this one? Isn’t this a blob server issue?
>
> Piotrek
>
> > On 14 Nov 2017, at 11:35, Shailesh Jain <[hidden email]>
> > wrote:
> >
> > 3. Have attached the logs and exception raised (15min - configured akka
> > timeout) after submitting the job.
> >
> > Thanks,
> > Shailesh
> >
> >
> > On Tue, Nov 14, 2017 at 2:46 PM, Piotr Nowojski <[hidden email]
> > <mailto:[hidden email]>> wrote: Hi,
> >
> > 3. Can you show the logs from job manager and task manager?
> >
> >> On 14 Nov 2017, at 07:26, Shailesh Jain <[hidden email]
> >> <mailto:[hidden email]>> wrote:
> >>
> >> Hi Piotrek,
> >>
> >> I tried out option 'a' mentioned above, but instead of separate jobs, I'm
> >> creating separate streams per device. Following is the test deployment
> >> configuration as a local cluster (8GB ram, 2.5 GHz i5, ubuntu machine):
> >>
> >> akka.client.timeout 15 min
> >> jobmanager.heap.mb 1024
> >> jobmanager.rpc.address localhost
> >> jobmanager.rpc.port 6123
> >> jobmanager.web.port 8081
> >> metrics.reporter.jmx.class org.apache.flink.metrics.jmx.JMXReporter
> >> metrics.reporter.jmx.port 8789
> >> metrics.reporters jmx
> >> parallelism.default 1
> >> taskmanager.heap.mb 1024
> >> taskmanager.memory.preallocate false
> >> taskmanager.numberOfTaskSlots 4
> >>
> >> The number of Operators per device stream is 4 (one sink function, 3 CEP
> >> operators).
> >>
> >> Observations (and questions):
> >>
> >> 3. Job deployment hangs (never switches to RUNNING) when the number of
> >> devices is greater than 5. Even on increasing the akka client timeout,
> >> it does not help. Will separate jobs being deployed per device instead
> >> of separate streams help here?
> >>
> >> Thanks,
> >> Shailesh


flink-shailesh-jobmanager-0-shailesh.log (89K) Download Attachment
jstackOutput (62K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Correlation between data streams/operators and threads

Shailesh Jain
In reply to this post by Piotr Nowojski
Thanks for your time in helping me here.

So as long as the parallelism of my kafka source and sink operators is 1, all the subsequent operators (multiple filters to create multiple streams, and then individual CEP and Process operators per stream) will be executed in the same task slot?

I cannot take approach F as the entire business logic revolves around event timing.

Regarding approach D, I'm not sure how this is different from the current approach I had provided the code for above, and will it solve this problem of different data streams not getting distributed across slots?

Thanks again,
Shailesh

On Fri, Nov 17, 2017 at 3:01 PM, Piotr Nowojski <[hidden email]> wrote:
Sorry for not responding but I was away.

Regarding 1.

One source operator, followed by multiple tasks with parallelism 1 (as visible on your screen shot) that share resource group will collapse to one task slot - only one TaskManager will execute all of your job.


Because all of your events are written into one Kafka topic, previously proposed solutions A) (multiple jobs), and B) (one job with multiple sources) can not work. In that case what you have to do is either:

D) set parallelism as you wish in the environment, read from Kafka, keyBy device type, split the stream by filtering by device type (or using side outputs), perform your logic

This will create TOTAL_DEVICES number of data streams after keyBy on each machine, and filtering will cost you (it will be linear according TOTAL_DEVICES), but should be the easiest solution.

E) set parallelism as you wish, read from Kafka, keyBy device type, write custom operators with custom logic handling watermarks using KeyedState

However I would strongly suggest to re-consider

F) ignore all the issue of assigning different watermarks per device stream, just assign minimal from all of the devices. It would be the easiest to implement.

Piotrek

> On 17 Nov 2017, at 09:22, Nico Kruber <[hidden email]> wrote:
>
> regarding 3.
> a) The taskmanager logs are missing, are there any?
> b) Also, the JobManager logs say you have 4 slots available in total - is this
> enough for your 5 devices scenario?
> c) The JobManager log, however, does not really reveal what it is currently
> doing, can you set the log level to DEBUG to see more?
> d) Also, do you still observe CPU load during the 15min as an indication that
> it is actually doing something?
> e) During this 15min period where apparently nothing happens, can you provide
> the output of "jstack <jobmanager_pid>" (with the PID of your JobManager)?
> f) You may further be able to debug into what is happening by running this in
> your IDE in debug mode and pause the execution when you suspect it to hang.
>
>
> Nico
>
> On Tuesday, 14 November 2017 14:27:36 CET Piotr Nowojski wrote:
>> 3. Nico, can you take a look at this one? Isn’t this a blob server issue?
>>
>> Piotrek
>>
>>> On 14 Nov 2017, at 11:35, Shailesh Jain <[hidden email]>
>>> wrote:
>>>
>>> 3. Have attached the logs and exception raised (15min - configured akka
>>> timeout) after submitting the job.
>>>
>>> Thanks,
>>> Shailesh
>>>
>>>
>>> On Tue, Nov 14, 2017 at 2:46 PM, Piotr Nowojski <[hidden email]
>>> <mailto:[hidden email]>> wrote: Hi,
>>>
>>> 3. Can you show the logs from job manager and task manager?
>>>
>>>> On 14 Nov 2017, at 07:26, Shailesh Jain <[hidden email]
>>>> <mailto:[hidden email]>> wrote:
>>>>
>>>> Hi Piotrek,
>>>>
>>>> I tried out option 'a' mentioned above, but instead of separate jobs, I'm
>>>> creating separate streams per device. Following is the test deployment
>>>> configuration as a local cluster (8GB ram, 2.5 GHz i5, ubuntu machine):
>>>>
>>>> akka.client.timeout 15 min
>>>> jobmanager.heap.mb 1024
>>>> jobmanager.rpc.address localhost
>>>> jobmanager.rpc.port 6123
>>>> jobmanager.web.port 8081
>>>> metrics.reporter.jmx.class org.apache.flink.metrics.jmx.JMXReporter
>>>> metrics.reporter.jmx.port 8789
>>>> metrics.reporters jmx
>>>> parallelism.default 1
>>>> taskmanager.heap.mb 1024
>>>> taskmanager.memory.preallocate false
>>>> taskmanager.numberOfTaskSlots 4
>>>>
>>>> The number of Operators per device stream is 4 (one sink function, 3 CEP
>>>> operators).
>>>>
>>>> Observations (and questions):
>>>>
>>>> 3. Job deployment hangs (never switches to RUNNING) when the number of
>>>> devices is greater than 5. Even on increasing the akka client timeout,
>>>> it does not help. Will separate jobs being deployed per device instead
>>>> of separate streams help here?
>>>>
>>>> Thanks,
>>>> Shailesh


Reply | Threaded
Open this post in threaded view
|

Re: Correlation between data streams/operators and threads

Piotr Nowojski
So as long as the parallelism of my kafka source and sink operators is 1, all the subsequent operators (multiple filters to create multiple streams, and then individual CEP and Process operators per stream) will be executed in the same task slot? 

Yes, unless you specify different resource sharing group for subsequent operators. 

Regarding approach D, I'm not sure how this is different from the current approach I had provided the code for above, and will it solve this problem of different data streams not getting distributed across slots?

Difference is huge. Without keyBy you can not have multiple instances (parallelism > 1) of source and filtering operators (unless you create different Kafka partitions per each device, which in your case would solve a lot of problems btw). Your solution that you shown earlier, will simply not scale beyond one machine. You could distribute your business logic among as many machines as you want, but there always would be a potential bottleneck of single source/filtering operations. With keyBy you could have multiple source operators and keyBy would ensure that events from the same device are processed always by one task/machine.

Piotrek

On 21 Nov 2017, at 07:39, Shailesh Jain <[hidden email]> wrote:

Thanks for your time in helping me here.

So as long as the parallelism of my kafka source and sink operators is 1, all the subsequent operators (multiple filters to create multiple streams, and then individual CEP and Process operators per stream) will be executed in the same task slot?

I cannot take approach F as the entire business logic revolves around event timing.

Regarding approach D, I'm not sure how this is different from the current approach I had provided the code for above, and will it solve this problem of different data streams not getting distributed across slots?

Thanks again,
Shailesh

On Fri, Nov 17, 2017 at 3:01 PM, Piotr Nowojski <[hidden email]> wrote:
Sorry for not responding but I was away.

Regarding 1.

One source operator, followed by multiple tasks with parallelism 1 (as visible on your screen shot) that share resource group will collapse to one task slot - only one TaskManager will execute all of your job.


Because all of your events are written into one Kafka topic, previously proposed solutions A) (multiple jobs), and B) (one job with multiple sources) can not work. In that case what you have to do is either:

D) set parallelism as you wish in the environment, read from Kafka, keyBy device type, split the stream by filtering by device type (or using side outputs), perform your logic

This will create TOTAL_DEVICES number of data streams after keyBy on each machine, and filtering will cost you (it will be linear according TOTAL_DEVICES), but should be the easiest solution.

E) set parallelism as you wish, read from Kafka, keyBy device type, write custom operators with custom logic handling watermarks using KeyedState

However I would strongly suggest to re-consider

F) ignore all the issue of assigning different watermarks per device stream, just assign minimal from all of the devices. It would be the easiest to implement.

Piotrek

> On 17 Nov 2017, at 09:22, Nico Kruber <[hidden email]> wrote:
>
> regarding 3.
> a) The taskmanager logs are missing, are there any?
> b) Also, the JobManager logs say you have 4 slots available in total - is this
> enough for your 5 devices scenario?
> c) The JobManager log, however, does not really reveal what it is currently
> doing, can you set the log level to DEBUG to see more?
> d) Also, do you still observe CPU load during the 15min as an indication that
> it is actually doing something?
> e) During this 15min period where apparently nothing happens, can you provide
> the output of "jstack <jobmanager_pid>" (with the PID of your JobManager)?
> f) You may further be able to debug into what is happening by running this in
> your IDE in debug mode and pause the execution when you suspect it to hang.
>
>
> Nico
>
> On Tuesday, 14 November 2017 14:27:36 CET Piotr Nowojski wrote:
>> 3. Nico, can you take a look at this one? Isn’t this a blob server issue?
>>
>> Piotrek
>>
>>> On 14 Nov 2017, at 11:35, Shailesh Jain <[hidden email]>
>>> wrote:
>>>
>>> 3. Have attached the logs and exception raised (15min - configured akka
>>> timeout) after submitting the job.
>>>
>>> Thanks,
>>> Shailesh
>>>
>>>
>>> On Tue, Nov 14, 2017 at 2:46 PM, Piotr Nowojski <[hidden email]
>>> <mailto:[hidden email]>> wrote: Hi,
>>>
>>> 3. Can you show the logs from job manager and task manager?
>>>
>>>> On 14 Nov 2017, at 07:26, Shailesh Jain <[hidden email]
>>>> <mailto:[hidden email]>> wrote:
>>>>
>>>> Hi Piotrek,
>>>>
>>>> I tried out option 'a' mentioned above, but instead of separate jobs, I'm
>>>> creating separate streams per device. Following is the test deployment
>>>> configuration as a local cluster (8GB ram, 2.5 GHz i5, ubuntu machine):
>>>>
>>>> akka.client.timeout 15 min
>>>> jobmanager.heap.mb 1024
>>>> jobmanager.rpc.address localhost
>>>> jobmanager.rpc.port 6123
>>>> jobmanager.web.port 8081
>>>> metrics.reporter.jmx.class org.apache.flink.metrics.jmx.JMXReporter
>>>> metrics.reporter.jmx.port 8789
>>>> metrics.reporters jmx
>>>> parallelism.default 1
>>>> taskmanager.heap.mb 1024
>>>> taskmanager.memory.preallocate false
>>>> taskmanager.numberOfTaskSlots 4
>>>>
>>>> The number of Operators per device stream is 4 (one sink function, 3 CEP
>>>> operators).
>>>>
>>>> Observations (and questions):
>>>>
>>>> 3. Job deployment hangs (never switches to RUNNING) when the number of
>>>> devices is greater than 5. Even on increasing the akka client timeout,
>>>> it does not help. Will separate jobs being deployed per device instead
>>>> of separate streams help here?
>>>>
>>>> Thanks,
>>>> Shailesh



Reply | Threaded
Open this post in threaded view
|

Re: Correlation between data streams/operators and threads

Shailesh Jain
Understood. Thanks a lot!

I'll try out the keyBy approach first.

Shailesh


On Tue, Nov 21, 2017 at 1:53 PM, Piotr Nowojski <[hidden email]> wrote:
So as long as the parallelism of my kafka source and sink operators is 1, all the subsequent operators (multiple filters to create multiple streams, and then individual CEP and Process operators per stream) will be executed in the same task slot? 

Yes, unless you specify different resource sharing group for subsequent operators. 

Regarding approach D, I'm not sure how this is different from the current approach I had provided the code for above, and will it solve this problem of different data streams not getting distributed across slots?

Difference is huge. Without keyBy you can not have multiple instances (parallelism > 1) of source and filtering operators (unless you create different Kafka partitions per each device, which in your case would solve a lot of problems btw). Your solution that you shown earlier, will simply not scale beyond one machine. You could distribute your business logic among as many machines as you want, but there always would be a potential bottleneck of single source/filtering operations. With keyBy you could have multiple source operators and keyBy would ensure that events from the same device are processed always by one task/machine.

Piotrek

On 21 Nov 2017, at 07:39, Shailesh Jain <[hidden email]> wrote:

Thanks for your time in helping me here.

So as long as the parallelism of my kafka source and sink operators is 1, all the subsequent operators (multiple filters to create multiple streams, and then individual CEP and Process operators per stream) will be executed in the same task slot?

I cannot take approach F as the entire business logic revolves around event timing.

Regarding approach D, I'm not sure how this is different from the current approach I had provided the code for above, and will it solve this problem of different data streams not getting distributed across slots?

Thanks again,
Shailesh

On Fri, Nov 17, 2017 at 3:01 PM, Piotr Nowojski <[hidden email]> wrote:
Sorry for not responding but I was away.

Regarding 1.

One source operator, followed by multiple tasks with parallelism 1 (as visible on your screen shot) that share resource group will collapse to one task slot - only one TaskManager will execute all of your job.


Because all of your events are written into one Kafka topic, previously proposed solutions A) (multiple jobs), and B) (one job with multiple sources) can not work. In that case what you have to do is either:

D) set parallelism as you wish in the environment, read from Kafka, keyBy device type, split the stream by filtering by device type (or using side outputs), perform your logic

This will create TOTAL_DEVICES number of data streams after keyBy on each machine, and filtering will cost you (it will be linear according TOTAL_DEVICES), but should be the easiest solution.

E) set parallelism as you wish, read from Kafka, keyBy device type, write custom operators with custom logic handling watermarks using KeyedState

However I would strongly suggest to re-consider

F) ignore all the issue of assigning different watermarks per device stream, just assign minimal from all of the devices. It would be the easiest to implement.

Piotrek

> On 17 Nov 2017, at 09:22, Nico Kruber <[hidden email]> wrote:
>
> regarding 3.
> a) The taskmanager logs are missing, are there any?
> b) Also, the JobManager logs say you have 4 slots available in total - is this
> enough for your 5 devices scenario?
> c) The JobManager log, however, does not really reveal what it is currently
> doing, can you set the log level to DEBUG to see more?
> d) Also, do you still observe CPU load during the 15min as an indication that
> it is actually doing something?
> e) During this 15min period where apparently nothing happens, can you provide
> the output of "jstack <jobmanager_pid>" (with the PID of your JobManager)?
> f) You may further be able to debug into what is happening by running this in
> your IDE in debug mode and pause the execution when you suspect it to hang.
>
>
> Nico
>
> On Tuesday, 14 November 2017 14:27:36 CET Piotr Nowojski wrote:
>> 3. Nico, can you take a look at this one? Isn’t this a blob server issue?
>>
>> Piotrek
>>
>>> On 14 Nov 2017, at 11:35, Shailesh Jain <[hidden email]>
>>> wrote:
>>>
>>> 3. Have attached the logs and exception raised (15min - configured akka
>>> timeout) after submitting the job.
>>>
>>> Thanks,
>>> Shailesh
>>>
>>>
>>> On Tue, Nov 14, 2017 at 2:46 PM, Piotr Nowojski <[hidden email]
>>> <mailto:[hidden email]>> wrote: Hi,
>>>
>>> 3. Can you show the logs from job manager and task manager?
>>>
>>>> On 14 Nov 2017, at 07:26, Shailesh Jain <[hidden email]
>>>> <mailto:[hidden email]>> wrote:
>>>>
>>>> Hi Piotrek,
>>>>
>>>> I tried out option 'a' mentioned above, but instead of separate jobs, I'm
>>>> creating separate streams per device. Following is the test deployment
>>>> configuration as a local cluster (8GB ram, 2.5 GHz i5, ubuntu machine):
>>>>
>>>> akka.client.timeout 15 min
>>>> jobmanager.heap.mb 1024
>>>> jobmanager.rpc.address localhost
>>>> jobmanager.rpc.port 6123
>>>> jobmanager.web.port 8081
>>>> metrics.reporter.jmx.class org.apache.flink.metrics.jmx.JMXReporter
>>>> metrics.reporter.jmx.port 8789
>>>> metrics.reporters jmx
>>>> parallelism.default 1
>>>> taskmanager.heap.mb 1024
>>>> taskmanager.memory.preallocate false
>>>> taskmanager.numberOfTaskSlots 4
>>>>
>>>> The number of Operators per device stream is 4 (one sink function, 3 CEP
>>>> operators).
>>>>
>>>> Observations (and questions):
>>>>
>>>> 3. Job deployment hangs (never switches to RUNNING) when the number of
>>>> devices is greater than 5. Even on increasing the akka client timeout,
>>>> it does not help. Will separate jobs being deployed per device instead
>>>> of separate streams help here?
>>>>
>>>> Thanks,
>>>> Shailesh




Reply | Threaded
Open this post in threaded view
|

Re: Correlation between data streams/operators and threads

Nico Kruber
In reply to this post by Shailesh Jain
Hi Shailesh,
your JobManager log suggests that this same JVM instance actually contains a
TaskManager as well (sorry for not noticing earlier). Also this time, there is
nothing regarding the BlobServer/BlobCache, but it looks like the task manager
may think the jobmanager is down.
Can you try with "start-cluster.sh" instead?

Nico

On Tuesday, 21 November 2017 07:26:09 CET Shailesh Jain wrote:

> a) Nope, there are no taskmanager logs, the job never switches to RUNNING
> state.
>
> b) I think so, because even when I start the job with 4 devices, only 1
> slot is used, and 3 are free.
>
> c) Attached
>
> d) Attached
>
> e) I'll try the debug mode in Eclipse.
>
> Thanks,
> Shailesh
>
> On Fri, Nov 17, 2017 at 1:52 PM, Nico Kruber <[hidden email]> wrote:
> > regarding 3.
> > a) The taskmanager logs are missing, are there any?
> > b) Also, the JobManager logs say you have 4 slots available in total - is
> > this
> > enough for your 5 devices scenario?
> > c) The JobManager log, however, does not really reveal what it is
> > currently
> > doing, can you set the log level to DEBUG to see more?
> > d) Also, do you still observe CPU load during the 15min as an indication
> > that
> > it is actually doing something?
> > e) During this 15min period where apparently nothing happens, can you
> > provide
> > the output of "jstack <jobmanager_pid>" (with the PID of your JobManager)?
> > f) You may further be able to debug into what is happening by running this
> > in
> > your IDE in debug mode and pause the execution when you suspect it to
> > hang.
> >
> >
> > Nico
> >
> > On Tuesday, 14 November 2017 14:27:36 CET Piotr Nowojski wrote:
> > > 3. Nico, can you take a look at this one? Isn’t this a blob server
> > > issue?
> > >
> > > Piotrek
> > >
> > > > On 14 Nov 2017, at 11:35, Shailesh Jain <[hidden email]>
> > > > wrote:
> > > >
> > > > 3. Have attached the logs and exception raised (15min - configured
> > > > akka
> > > > timeout) after submitting the job.
> > > >
> > > > Thanks,
> > > > Shailesh
> > > >
> > > >
> > > > On Tue, Nov 14, 2017 at 2:46 PM, Piotr Nowojski <
> >
> > [hidden email]
> >
> > > > <mailto:[hidden email]>> wrote: Hi,
> > > >
> > > > 3. Can you show the logs from job manager and task manager?
> > > >
> > > >> On 14 Nov 2017, at 07:26, Shailesh Jain <[hidden email]
> > > >> <mailto:[hidden email]>> wrote:
> > > >>
> > > >> Hi Piotrek,
> > > >>
> > > >> I tried out option 'a' mentioned above, but instead of separate jobs,
> >
> > I'm
> >
> > > >> creating separate streams per device. Following is the test
> > > >> deployment
> > > >> configuration as a local cluster (8GB ram, 2.5 GHz i5, ubuntu
> >
> > machine):
> > > >> akka.client.timeout 15 min
> > > >> jobmanager.heap.mb 1024
> > > >> jobmanager.rpc.address localhost
> > > >> jobmanager.rpc.port 6123
> > > >> jobmanager.web.port 8081
> > > >> metrics.reporter.jmx.class org.apache.flink.metrics.jmx.JMXReporter
> > > >> metrics.reporter.jmx.port 8789
> > > >> metrics.reporters jmx
> > > >> parallelism.default 1
> > > >> taskmanager.heap.mb 1024
> > > >> taskmanager.memory.preallocate false
> > > >> taskmanager.numberOfTaskSlots 4
> > > >>
> > > >> The number of Operators per device stream is 4 (one sink function, 3
> >
> > CEP
> >
> > > >> operators).
> > > >>
> > > >> Observations (and questions):
> > > >>
> > > >> 3. Job deployment hangs (never switches to RUNNING) when the number
> > > >> of
> > > >> devices is greater than 5. Even on increasing the akka client
> > > >> timeout,
> > > >> it does not help. Will separate jobs being deployed per device
> > > >> instead
> > > >> of separate streams help here?
> > > >>
> > > >> Thanks,
> > > >> Shailesh


signature.asc (201 bytes) Download Attachment