withBroadcastSet for a DataStream missing?

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

withBroadcastSet for a DataStream missing?

Stavros Kontopoulos
H i am new here...

I am trying to implement online k-means as here https://databricks.com/blog/2015/01/28/introducing-streaming-k-means-in-spark-1-2.html with flink.
I dont see anywhere a withBroadcastSet call to save intermediate results is this currently supported?

Thnx,
Stavros
Reply | Threaded
Open this post in threaded view
|

Re: withBroadcastSet for a DataStream missing?

Till Rohrmann

Hi Stavros,

you might be able to solve your problem using a CoFlatMap operation with iterations. You would use one of the inputs for the iteration on which you broadcast the model updates to every operator. On the other input you would receive the data points which you want to cluster. As output you would emit the clustered points and model updates. Here you have to use the split and select function to split the output stream into model updates and output elements. It’s important to broadcast the model updates, otherwise not all operators have the same clustering model.

Cheers,
Till


On Tue, Mar 29, 2016 at 7:23 PM, Stavros Kontopoulos <[hidden email]> wrote:
H i am new here...

I am trying to implement online k-means as here https://databricks.com/blog/2015/01/28/introducing-streaming-k-means-in-spark-1-2.html with flink.
I dont see anywhere a withBroadcastSet call to save intermediate results is this currently supported?

Thnx,
Stavros

Reply | Threaded
Open this post in threaded view
|

Re: withBroadcastSet for a DataStream missing?

Stavros Kontopoulos
Ok thnx Till i will give it a shot!

On Thu, Mar 31, 2016 at 11:25 AM, Till Rohrmann <[hidden email]> wrote:

Hi Stavros,

you might be able to solve your problem using a CoFlatMap operation with iterations. You would use one of the inputs for the iteration on which you broadcast the model updates to every operator. On the other input you would receive the data points which you want to cluster. As output you would emit the clustered points and model updates. Here you have to use the split and select function to split the output stream into model updates and output elements. It’s important to broadcast the model updates, otherwise not all operators have the same clustering model.

Cheers,
Till


On Tue, Mar 29, 2016 at 7:23 PM, Stavros Kontopoulos <[hidden email]> wrote:
H i am new here...

I am trying to implement online k-means as here https://databricks.com/blog/2015/01/28/introducing-streaming-k-means-in-spark-1-2.html with flink.
I dont see anywhere a withBroadcastSet call to save intermediate results is this currently supported?

Thnx,
Stavros


Reply | Threaded
Open this post in threaded view
|

Re: withBroadcastSet for a DataStream missing?

Stavros Kontopoulos
Im trying what you suggested. Is this what you are suggesting (this is just a skeleton of logic not the actual implementation)?

    val dataStream =  ... //window based stream

    val modelStream = ...

    val connected = dataStream.connect(modelStream)

    val output = connected.map(
    (x:String) => { true},
    (y: MyModel) => {false}
  ).iterate {
    iteration =>

      val feedback = iteration.filter(!_)
      feedback.broadcast
      (feedback, iteration.filter(x => x))
  }

  output.split(
    (b: Boolean) => b match {
      case true => List("true")
      case false => List("false")
    }
  ).select("true")


I could save the model In coFlatMap but ideally i need the same model everywhere. Broadcast does that? From the documentation i read it sends the output to all parallel operators.
Iteration is executed anytime there is data according to the input window stream or is it done independently so i can feed back my improved model (like in datasets case)?
If the latter holds does that mean all partial updates from all operators will have to be processed from each operator before the the next window processing begins?

Thnx!


On Fri, Apr 1, 2016 at 10:51 PM, Stavros Kontopoulos <[hidden email]> wrote:
Ok thnx Till i will give it a shot!

On Thu, Mar 31, 2016 at 11:25 AM, Till Rohrmann <[hidden email]> wrote:

Hi Stavros,

you might be able to solve your problem using a CoFlatMap operation with iterations. You would use one of the inputs for the iteration on which you broadcast the model updates to every operator. On the other input you would receive the data points which you want to cluster. As output you would emit the clustered points and model updates. Here you have to use the split and select function to split the output stream into model updates and output elements. It’s important to broadcast the model updates, otherwise not all operators have the same clustering model.

Cheers,
Till


On Tue, Mar 29, 2016 at 7:23 PM, Stavros Kontopoulos <[hidden email]> wrote:
H i am new here...

I am trying to implement online k-means as here https://databricks.com/blog/2015/01/28/introducing-streaming-k-means-in-spark-1-2.html with flink.
I dont see anywhere a withBroadcastSet call to save intermediate results is this currently supported?

Thnx,
Stavros



Reply | Threaded
Open this post in threaded view
|

Re: withBroadcastSet for a DataStream missing?

Till Rohrmann

Hi Stavros,

yes that’s how you could do it.

broadcast will send the data to every down stream operator.

An element will be processed whenever it arrives at the iteration head. There is no synchronization.

A windowed stream cannot be the input for a connected stream. Thus, the window results have to be first processed before the are inputted into the iteration.

Cheers,
Till


On Sun, Apr 17, 2016 at 7:16 PM, Stavros Kontopoulos <[hidden email]> wrote:
Im trying what you suggested. Is this what you are suggesting (this is just a skeleton of logic not the actual implementation)?

    val dataStream =  ... //window based stream

    val modelStream = ...

    val connected = dataStream.connect(modelStream)

    val output = connected.map(
    (x:String) => { true},
    (y: MyModel) => {false}
  ).iterate {
    iteration =>

      val feedback = iteration.filter(!_)
      feedback.broadcast
      (feedback, iteration.filter(x => x))
  }

  output.split(
    (b: Boolean) => b match {
      case true => List("true")
      case false => List("false")
    }
  ).select("true")


I could save the model In coFlatMap but ideally i need the same model everywhere. Broadcast does that? From the documentation i read it sends the output to all parallel operators.
Iteration is executed anytime there is data according to the input window stream or is it done independently so i can feed back my improved model (like in datasets case)?
If the latter holds does that mean all partial updates from all operators will have to be processed from each operator before the the next window processing begins?

Thnx!


On Fri, Apr 1, 2016 at 10:51 PM, Stavros Kontopoulos <[hidden email]> wrote:
Ok thnx Till i will give it a shot!

On Thu, Mar 31, 2016 at 11:25 AM, Till Rohrmann <[hidden email]> wrote:

Hi Stavros,

you might be able to solve your problem using a CoFlatMap operation with iterations. You would use one of the inputs for the iteration on which you broadcast the model updates to every operator. On the other input you would receive the data points which you want to cluster. As output you would emit the clustered points and model updates. Here you have to use the split and select function to split the output stream into model updates and output elements. It’s important to broadcast the model updates, otherwise not all operators have the same clustering model.

Cheers,
Till


On Tue, Mar 29, 2016 at 7:23 PM, Stavros Kontopoulos <[hidden email]> wrote:
H i am new here...

I am trying to implement online k-means as here https://databricks.com/blog/2015/01/28/introducing-streaming-k-means-in-spark-1-2.html with flink.
I dont see anywhere a withBroadcastSet call to save intermediate results is this currently supported?

Thnx,
Stavros




Reply | Threaded
Open this post in threaded view
|

Re: withBroadcastSet for a DataStream missing?

Stavros Kontopoulos
Hi Till,

That means you will have to wait somehow to aggregate all broadcasted data from all operators if you need all the computed results.
That is the case with the array of centroids. That means you need to wait within the iteration. Is this correct?
Is it possible to nest an outer stream within iterate? Not just split the iteration stream but also join it with an external one, is this
legal use?

Cheers,
Stavros

On Mon, Apr 18, 2016 at 10:56 AM, Till Rohrmann <[hidden email]> wrote:

Hi Stavros,

yes that’s how you could do it.

broadcast will send the data to every down stream operator.

An element will be processed whenever it arrives at the iteration head. There is no synchronization.

A windowed stream cannot be the input for a connected stream. Thus, the window results have to be first processed before the are inputted into the iteration.

Cheers,
Till


On Sun, Apr 17, 2016 at 7:16 PM, Stavros Kontopoulos <[hidden email]> wrote:
Im trying what you suggested. Is this what you are suggesting (this is just a skeleton of logic not the actual implementation)?

    val dataStream =  ... //window based stream

    val modelStream = ...

    val connected = dataStream.connect(modelStream)

    val output = connected.map(
    (x:String) => { true},
    (y: MyModel) => {false}
  ).iterate {
    iteration =>

      val feedback = iteration.filter(!_)
      feedback.broadcast
      (feedback, iteration.filter(x => x))
  }

  output.split(
    (b: Boolean) => b match {
      case true => List("true")
      case false => List("false")
    }
  ).select("true")


I could save the model In coFlatMap but ideally i need the same model everywhere. Broadcast does that? From the documentation i read it sends the output to all parallel operators.
Iteration is executed anytime there is data according to the input window stream or is it done independently so i can feed back my improved model (like in datasets case)?
If the latter holds does that mean all partial updates from all operators will have to be processed from each operator before the the next window processing begins?

Thnx!


On Fri, Apr 1, 2016 at 10:51 PM, Stavros Kontopoulos <[hidden email]> wrote:
Ok thnx Till i will give it a shot!

On Thu, Mar 31, 2016 at 11:25 AM, Till Rohrmann <[hidden email]> wrote:

Hi Stavros,

you might be able to solve your problem using a CoFlatMap operation with iterations. You would use one of the inputs for the iteration on which you broadcast the model updates to every operator. On the other input you would receive the data points which you want to cluster. As output you would emit the clustered points and model updates. Here you have to use the split and select function to split the output stream into model updates and output elements. It’s important to broadcast the model updates, otherwise not all operators have the same clustering model.

Cheers,
Till


On Tue, Mar 29, 2016 at 7:23 PM, Stavros Kontopoulos <[hidden email]> wrote:
H i am new here...

I am trying to implement online k-means as here https://databricks.com/blog/2015/01/28/introducing-streaming-k-means-in-spark-1-2.html with flink.
I dont see anywhere a withBroadcastSet call to save intermediate results is this currently supported?

Thnx,
Stavros





Reply | Threaded
Open this post in threaded view
|

Re: withBroadcastSet for a DataStream missing?

souibil
In reply to this post by Stavros Kontopoulos
Hi Stavros,

I have  same problem as you and i try to solve it , did you find some solution meanwhile?
thankyou