Assigning operators to slots

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Assigning operators to slots

AndreaKinn
Hi, firstly excuse me for the long post.
I already read the documentation about parallelism, slots and the API about
it but I still have some doubts about practical implementations of them.
My program is composed essentially by three operations:

- get data from a kafka source
- perform a machine learning operator on the retrieved stream
- push out data to a cassandra sink

I'd like to investigate and trying implement them in two different
situations:


1) FIRST ONE

Imagine I have a single dual core physical node and suppose I set
NumberOfTaskSlot = NumberOfCore (As suggested by the doc).

<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t985/tonabble.png>

I suppose I can divide in a fixed way the operations into slots as described
in the figure. Is this possible?
Can I do that using slotSharingGroup(groupname) method ? Or have I to use
startNewChain() between the operator?
Example:

*DataStream<MyEvent> stream = env
                                .addSource(new FlinkKafkaConsumer010<>(TOPIC, new CustomDeserializer(),
properties))
                                .assignTimestampsAndWatermarks(new CustomTimestampExtractor())
                                .map(...)
                                .slotSharingGroup("source");*

or

*DataStream<MyEvent> stream = env
                                .addSource(new FlinkKafkaConsumer010<>(TOPIC, new CustomDeserializer(),
properties))
.startNewChain()
                                .assignTimestampsAndWatermarks(new CustomTimestampExtractor())
.startNewChain()
                                .map(...);
                                *


2) SECOND ONE

Imagine I have 3 dual core physical nodes.

<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t985/tonabble2.png>

I suppose I can reserve one physical NODE for each operation. Is this
possible?
In this case honestly I don't know how to implement that at level code.
Moreover, I don't know if it would has sense set NumberTaskSlot =
NumberOfCores or to leave this option to Flink's choice.








--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Assigning operators to slots

Aljoscha Krettek
Hi,

For the first question, I think both approaches should work. You only have to be careful about startNewChain() because the behaviour can be somewhat unexpected. What it does is specify, that a new chain should be started with the operator on which you call startNewChain(). For example, in:

DataStream input = ...

input
  .map().name("map1")
  .map().name("map2")
  .startNewChain()
  .map().name("map3")

You will have one chain ("map1") and a second chain ("map2", "map3").

For the second question, I think to make sure that each operator is on a separate machine you would have to set the number of slots to 1. This way you get 3 slots and if you set the resource group or chaining right you will have each operator on a different slot.

Best,
Aljoscha

> On 8. Sep 2017, at 16:32, AndreaKinn <[hidden email]> wrote:
>
> Hi, firstly excuse me for the long post.
> I already read the documentation about parallelism, slots and the API about
> it but I still have some doubts about practical implementations of them.
> My program is composed essentially by three operations:
>
> - get data from a kafka source
> - perform a machine learning operator on the retrieved stream
> - push out data to a cassandra sink
>
> I'd like to investigate and trying implement them in two different
> situations:
>
>
> 1) FIRST ONE
>
> Imagine I have a single dual core physical node and suppose I set
> NumberOfTaskSlot = NumberOfCore (As suggested by the doc).
>
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t985/tonabble.png>
>
> I suppose I can divide in a fixed way the operations into slots as described
> in the figure. Is this possible?
> Can I do that using slotSharingGroup(groupname) method ? Or have I to use
> startNewChain() between the operator?
> Example:
>
> *DataStream<MyEvent> stream = env
> .addSource(new FlinkKafkaConsumer010<>(TOPIC, new CustomDeserializer(),
> properties))
> .assignTimestampsAndWatermarks(new CustomTimestampExtractor())
> .map(...)
> .slotSharingGroup("source");*
>
> or
>
> *DataStream<MyEvent> stream = env
> .addSource(new FlinkKafkaConsumer010<>(TOPIC, new CustomDeserializer(),
> properties))
> .startNewChain()
> .assignTimestampsAndWatermarks(new CustomTimestampExtractor())
> .startNewChain()
> .map(...);
> *
>
>
> 2) SECOND ONE
>
> Imagine I have 3 dual core physical nodes.
>
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t985/tonabble2.png>
>
> I suppose I can reserve one physical NODE for each operation. Is this
> possible?
> In this case honestly I don't know how to implement that at level code.
> Moreover, I don't know if it would has sense set NumberTaskSlot =
> NumberOfCores or to leave this option to Flink's choice.
>
>
>
>
>
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply | Threaded
Open this post in threaded view
|

Re: Assigning operators to slots

AndreaKinn
Nice, thank you for reply.

So if I call slotSharedGroup(groupname) on the last operator as here:

DataStream<MyEvent> stream = env
 .addSource(new FlinkKafkaConsumer010<>(TOPIC, new CustomDeserializer(),
 properties))
 .assignTimestampsAndWatermarks(new CustomTimestampExtractor())
 .map(...)
 .slotSharingGroup("source");

it is applied to all the previous operator right? Or I have to call it after
each operator?
i.e I want that addSource, assignTimestamp and map reside on the same slot.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Assigning operators to slots

AndreaKinn
UPDATE:

I'm trying to implement the version with one node and two task slots on my
laptop. I have also in configured flink-conf.yaml the key:

taskmanager.numberOfTaskSlots: 2

but when I execute my program in the IDE:

/org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
Not enough free slots available to run the job. You can decrease the
operator parallelism or increase the number of slots per TaskManager in the
configuration. /

parallelism is set 1.

Which could be the problem?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Assigning operators to slots

Elias Levy
The execution within the IDE is most likely not loading the flink-conf.yaml file to read the configuration.  When run from the IDE you get a LocalStreamEnvironment, which starts a LocalFlinkMiniCluster.  LocalStreamEnvironment is created by StreamExecutionEnvironment.createLocalEnvironment without passing it any configuration.  So none of StreamExecutionEnvironment., LocalStreamEnvironment, and LocalFlinkMiniCluster try to read the config file.

This makes it difficult to test certain Flink features from within the IDE, as some configuration properties can't be set programmatically.  For instance, you can't configure the external checkpoint URL in code.  It can only be yet in the config file.  That means you can't run a job that turns on external checkpoints from within the IDE.

Ideally one of these components would try load the config file when executing locally.  You could then point it to the config file via the FLINK_CONF_DIR environment variable.


On Fri, Sep 8, 2017 at 8:47 AM, AndreaKinn <[hidden email]> wrote:
UPDATE:

I'm trying to implement the version with one node and two task slots on my
laptop. I have also in configured flink-conf.yaml the key:

taskmanager.numberOfTaskSlots: 2

but when I execute my program in the IDE:

/org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
Not enough free slots available to run the job. You can decrease the
operator parallelism or increase the number of slots per TaskManager in the
configuration. /

parallelism is set 1.

Which could be the problem?