Parallelism and keyed streams

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Parallelism and keyed streams

Nicholas Walton
I have a stream of tuples <channel: Int, index: Long, value: Double> , which I form into a keyedStream using keyBy on channel. I then need to process each channel in parallel. Each parallel stream must be processed in strict sequential order by index to calculate the ratios value(index)/value(index-1). If I set parallelism to 1 all is well, each channel is processed in order of index 1,2,3,,4…

My problem is when I set parallelism to a value greater than 1 each channel’s keyedStream  appears to be split across multiple processes. So a channel may be processed wrongly for example  as value(2), value(5), Value(6) , value(9)…..

The number of channels N is unknown. So how do I rig up N processing streams with an unknown parallelism so that each stream processes each channel by strictly increasing index v(1),v(2),…..v(t),v(t+1),…..v(t+n)

Thanks in advance

NIck Walton
Reply | Threaded
Open this post in threaded view
|

RE: Parallelism and keyed streams

Martin, Nick
Is value(index-1) stored in Keyed State, or just a local variable inside the operator?

-----Original Message-----
From: Nicholas Walton [mailto:[hidden email]]
Sent: Monday, July 16, 2018 1:33 PM
To: [hidden email]
Subject: Parallelism and keyed streams

I have a stream of tuples <channel: Int, index: Long, value: Double> , which I form into a keyedStream using keyBy on channel. I then need to process each channel in parallel. Each parallel stream must be processed in strict sequential order by index to calculate the ratios value(index)/value(index-1). If I set parallelism to 1 all is well, each channel is processed in order of index 1,2,3,,4…

My problem is when I set parallelism to a value greater than 1 each channel’s keyedStream  appears to be split across multiple processes. So a channel may be processed wrongly for example  as value(2), value(5), Value(6) , value(9)…..

The number of channels N is unknown. So how do I rig up N processing streams with an unknown parallelism so that each stream processes each channel by strictly increasing index v(1),v(2),…..v(t),v(t+1),…..v(t+n)

Thanks in advance

NIck Walton


------------------------------------------------------------------------------

Notice: This e-mail is intended solely for use of the individual or entity to which it is addressed and may contain information that is proprietary, privileged and/or exempt from disclosure under applicable law. If the reader is not the intended recipient or agent responsible for delivering the message to the intended recipient, you are hereby notified that any dissemination, distribution or copying of this communication is strictly prohibited. This communication may also contain data subject to U.S. export laws. If so, data subject to the International Traffic in Arms Regulation cannot be disseminated, distributed, transferred, or copied, whether incorporated or in its original form, to foreign nationals residing in the U.S. or abroad, absent the express prior approval of the U.S. Department of State. Data subject to the Export Administration Act may not be disseminated, distributed, transferred or copied contrary to U. S. Department of Commerce regulations. If you have received this communication in error, please notify the sender by reply e-mail and destroy the e-mail message and any physical copies made of the communication.
 Thank you.
*********************
Reply | Threaded
Open this post in threaded view
|

Re: Parallelism and keyed streams

Nicholas Walton
Martin,

To clarify things the code causing the issue is here, nothing clever. The code fails at the line in bold. The Long index values are set earlier in sequence 1,2,3,4,5,6,7…...

val scaledReadings : DataStream[(Int,Long, Double, Double)] = maxChannelReading
      .keyBy(0)
      .map { in =>
        LOG.info(s"scaledReadings $in")
        (in._1, in._2, in._3/in._4 + 2.0D, in._3) }


 val logRatioWindow: DataStream[(Int,Long, Int, Double)] = scaledReadings
      .keyBy(0)
      .countWindow(100, 99)
      .process(new logRatioWindowFunction() )


and

class logRatioWindowFunction extends ProcessWindowFunction[(Int, Long, Double, Double), (Int, Long, Int, Double), org.apache.flink.api.java.tuple.Tuple, GlobalWindow] {

  def process(key: Tuple, context: logRatioWindowFunction.this.Context, input: Iterable[(Int, Long, Double, Double)], out: Collector[(Int, Long, Int, Double)]) = 
  {

    val a: Array[(Int, Long, Double, Double)] = input toArray;
    val ch = a(0)._1
    val s = a(0)._2
    val l = input.size

    if (l < 100) Job.LOG.info(s"Log ratio window length $l on channel $ch at sample $s")

    for (i <- 1 to a.size - 1) assert (a(i)._2 == a(i-1)._2+1, "logRatioWindowFunction:Failure non-monotonic indexes "+  a(i-1)._2 + " and " + a(i)._2 )

    if (l == 100) {
      for (i <- 0 to l-2) {
        val v: Int = rint(100 * log (E + a(i+1)._3 / a(i)._3)) toInt;
        assert(v > 0, "Bad minhash in medianLogRatioWindowFunction " + v)
        Job.LOG.debug("logRatioWindowFunction [" + a(i+1)._1 + ", " + a(i+1)._2 + ", " +  v+ ", " +  a(i+1)._4 +"]")
        out.collect(scala.Tuple4(a(i+1)._1, a(i+1)._2, v, a(i+1)._4))
      }
      Job.LOG.debug("logRatioWindowFunction [" + a.head._1 + ", " + a.head._2 + " ... " + a.last._2 +"] collected")
    }
  }

}


On 17 Jul 2018, at 00:15, Martin, Nick <[hidden email]> wrote:

Is value(index-1) stored in Keyed State, or just a local variable inside the operator?

-----Original Message-----
From: Nicholas Walton [[hidden email]]
Sent: Monday, July 16, 2018 1:33 PM
To: [hidden email]
Subject: Parallelism and keyed streams

I have a stream of tuples <channel: Int, index: Long, value: Double> , which I form into a keyedStream using keyBy on channel. I then need to process each channel in parallel. Each parallel stream must be processed in strict sequential order by index to calculate the ratios value(index)/value(index-1). If I set parallelism to 1 all is well, each channel is processed in order of index 1,2,3,,4…

My problem is when I set parallelism to a value greater than 1 each channel’s keyedStream  appears to be split across multiple processes. So a channel may be processed wrongly for example  as value(2), value(5), Value(6) , value(9)…..

The number of channels N is unknown. So how do I rig up N processing streams with an unknown parallelism so that each stream processes each channel by strictly increasing index v(1),v(2),…..v(t),v(t+1),…..v(t+n)

Thanks in advance

NIck Walton


------------------------------------------------------------------------------

Notice: This e-mail is intended solely for use of the individual or entity to which it is addressed and may contain information that is proprietary, privileged and/or exempt from disclosure under applicable law. If the reader is not the intended recipient or agent responsible for delivering the message to the intended recipient, you are hereby notified that any dissemination, distribution or copying of this communication is strictly prohibited. This communication may also contain data subject to U.S. export laws. If so, data subject to the International Traffic in Arms Regulation cannot be disseminated, distributed, transferred, or copied, whether incorporated or in its original form, to foreign nationals residing in the U.S. or abroad, absent the express prior approval of the U.S. Department of State. Data subject to the Export Administration Act may not be disseminated, distributed, transferred or copied contrary to U. S. Department of Commerce regulations. If you have received this communication in error, please notify the sender by reply e-mail and destroy the e-mail message and any physical copies made of the communication.
Thank you.
*********************

Reply | Threaded
Open this post in threaded view
|

Re: Parallelism and keyed streams

Fabian Hueske-2
Hi,

Flink guarantees order only within a partition. For example, if you have the program map_1 -> map_2 and both map functions run with parallelism 4, the order of records in each of the 4 partitions is not changed..
In case of a shuffle (such as a keyBy or change in parallelism) records are shipped to the same downstream task in the same order as they are produced by the shipping task.
However, the receiving task merges streams of received records from multiple sending tasks without considering the order across parallel streams.

This means that you have to manually sort the data if the input data of the scaled-reading map function receives the input data for the same key from different source tasks.
Sorting streaming data is of course not as straight-forward as for finite sets, but you can do it with a process function and timers, i.e., you collect data for a certain amount of time and sort it.

Best, Fabian



2018-07-17 10:06 GMT+02:00 Nicholas Walton <[hidden email]>:
Martin,

To clarify things the code causing the issue is here, nothing clever. The code fails at the line in bold. The Long index values are set earlier in sequence 1,2,3,4,5,6,7…...

val scaledReadings : DataStream[(Int,Long, Double, Double)] = maxChannelReading
      .keyBy(0)
      .map { in =>
        LOG.info(s"scaledReadings $in")
        (in._1, in._2, in._3/in._4 + 2.0D, in._3) }


 val logRatioWindow: DataStream[(Int,Long, Int, Double)] = scaledReadings
      .keyBy(0)
      .countWindow(100, 99)
      .process(new logRatioWindowFunction() )


and

class logRatioWindowFunction extends ProcessWindowFunction[(Int, Long, Double, Double), (Int, Long, Int, Double), org.apache.flink.api.java.tuple.Tuple, GlobalWindow] {

  def process(key: Tuple, context: logRatioWindowFunction.this.Context, input: Iterable[(Int, Long, Double, Double)], out: Collector[(Int, Long, Int, Double)]) = 
  {

    val a: Array[(Int, Long, Double, Double)] = input toArray;
    val ch = a(0)._1
    val s = a(0)._2
    val l = input.size

    if (l < 100) Job.LOG.info(s"Log ratio window length $l on channel $ch at sample $s")

    for (i <- 1 to a.size - 1) assert (a(i)._2 == a(i-1)._2+1, "logRatioWindowFunction:Failure non-monotonic indexes "+  a(i-1)._2 + " and " + a(i)._2 )

    if (l == 100) {
      for (i <- 0 to l-2) {
        val v: Int = rint(100 * log (E + a(i+1)._3 / a(i)._3)) toInt;
        assert(v > 0, "Bad minhash in medianLogRatioWindowFunction " + v)
        Job.LOG.debug("logRatioWindowFunction [" + a(i+1)._1 + ", " + a(i+1)._2 + ", " +  v+ ", " +  a(i+1)._4 +"]")
        out.collect(scala.Tuple4(a(i+1)._1, a(i+1)._2, v, a(i+1)._4))
      }
      Job.LOG.debug("logRatioWindowFunction [" + a.head._1 + ", " + a.head._2 + " ... " + a.last._2 +"] collected")
    }
  }

}


On 17 Jul 2018, at 00:15, Martin, Nick <[hidden email]> wrote:

Is value(index-1) stored in Keyed State, or just a local variable inside the operator?

-----Original Message-----
From: Nicholas Walton [[hidden email]]
Sent: Monday, July 16, 2018 1:33 PM
To: [hidden email]
Subject: Parallelism and keyed streams

I have a stream of tuples <channel: Int, index: Long, value: Double> , which I form into a keyedStream using keyBy on channel. I then need to process each channel in parallel. Each parallel stream must be processed in strict sequential order by index to calculate the ratios value(index)/value(index-1). If I set parallelism to 1 all is well, each channel is processed in order of index 1,2,3,,4…

My problem is when I set parallelism to a value greater than 1 each channel’s keyedStream  appears to be split across multiple processes. So a channel may be processed wrongly for example  as value(2), value(5), Value(6) , value(9)…..

The number of channels N is unknown. So how do I rig up N processing streams with an unknown parallelism so that each stream processes each channel by strictly increasing index v(1),v(2),…..v(t),v(t+1),…..v(t+n)

Thanks in advance

NIck Walton


------------------------------------------------------------------------------

Notice: This e-mail is intended solely for use of the individual or entity to which it is addressed and may contain information that is proprietary, privileged and/or exempt from disclosure under applicable law. If the reader is not the intended recipient or agent responsible for delivering the message to the intended recipient, you are hereby notified that any dissemination, distribution or copying of this communication is strictly prohibited. This communication may also contain data subject to U.S. export laws. If so, data subject to the International Traffic in Arms Regulation cannot be disseminated, distributed, transferred, or copied, whether incorporated or in its original form, to foreign nationals residing in the U.S. or abroad, absent the express prior approval of the U.S. Department of State. Data subject to the Export Administration Act may not be disseminated, distributed, transferred or copied contrary to U. S. Department of Commerce regulations. If you have received this communication in error, please notify the sender by reply e-mail and destroy the e-mail message and any physical copies made of the communication.
Thank you.
*********************