multiple streams with multiple actions - proper way?

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

multiple streams with multiple actions - proper way?

Peter Ertl
Hello Flink People :-)


I am trying to get my head around flink - is it a supported use case to register multiple streams with possibly more than one transformation / action per stream?


def main(args: Array[String]): Unit = {

val env = StreamExecutionEnvironment.getExecutionEnvironment

val prop = new Properties()
prop.setProperty("bootstrap.servers", "vmi:9092")

// first stream
val ins = env.addSource(new FlinkKafkaConsumer010("foo", new SimpleStringSchema(), prop))
.map(s => "transformation-1: " + s)

ins.map(s => "transformation-2:" + s).print() // one action
ins.map(s => "transformation-3:" + s).print() // one more action
ins.map(s => "transformation-4:" + s).print() // another action on the same stream

// second, different stream
val ins2 = env.addSource(new FlinkKafkaConsumer010("bar", new SimpleStringSchema(), prop))
.map(s => "transformation-5: " + s)

ins2.map(s => "transformation-7:" + s).print() // action
ins2.map(s => "transformation-6:" + s).print() // different action

env.execute("run all streams with multiple actions attached")
}


Is this program abusing flnk or is this just how you are supposed to do things?

also, how many threads will this programm consume when running with parallelism = 4 ?


Best regards
Peter

Reply | Threaded
Open this post in threaded view
|

Re: multiple streams with multiple actions - proper way?

Fabian Hueske-2
Hi Peter,

this kind of use case is supported, but it is best practice to split independent pipelines into individual jobs.
One reason for that is to isolate failures and restarts.
For example, I would split the program you posted into two programs, one for the "foo" topic and one of the "bar" topic. Depending on the complexity of the operations, you might also want to split it further.

Best, Fabian




2017-07-29 18:51 GMT+02:00 Peter Ertl <[hidden email]>:
Hello Flink People :-)


I am trying to get my head around flink - is it a supported use case to register multiple streams with possibly more than one transformation / action per stream?


def main(args: Array[String]): Unit = {

val env = StreamExecutionEnvironment.getExecutionEnvironment

val prop = new Properties()
prop.setProperty("bootstrap.servers", "vmi:9092")

// first stream
val ins = env.addSource(new FlinkKafkaConsumer010("foo", new SimpleStringSchema(), prop))
.map(s => "transformation-1: " + s)

ins.map(s => "transformation-2:" + s).print() // one action
ins.map(s => "transformation-3:" + s).print() // one more action
ins.map(s => "transformation-4:" + s).print() // another action on the same stream

// second, different stream
val ins2 = env.addSource(new FlinkKafkaConsumer010("bar", new SimpleStringSchema(), prop))
.map(s => "transformation-5: " + s)

ins2.map(s => "transformation-7:" + s).print() // action
ins2.map(s => "transformation-6:" + s).print() // different action

env.execute("run all streams with multiple actions attached")
}


Is this program abusing flnk or is this just how you are supposed to do things?

also, how many threads will this programm consume when running with parallelism = 4 ?


Best regards
Peter