some questions about data skew

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

some questions about data skew

jester jim
Hi,
I have run a program to monitor the sum of the delay in every minutes of a stream,this is my code:
.map(new RichMapFunction[String,(Long,Int)] {
override def map(in: String): (Long,Int) = {
var str:String = ""
try {
val arr = in.split("\\|")
((System.currentTimeMillis()/1000 - arr(10).trim.toLong) / 60,1)
}catch {
case e:Exception =>{
System.out.println("data has been dropped" + str)
null
}
}
}
}).slotSharingGroup("kafkaSource").setParallelism(200)
.filter(item =>item !=null && item._1 >=0).slotSharingGroup("kafkaSource").setParallelism(200)
signalSource.keyBy(f=>f._1 )
.window(TumblingProcessingTimeWindows.of(Time.minutes(5)))
.reduce { (e1,e2) =>(e1._1,e1._2+e2._2)}.setParallelism(20).slotSharingGroup("Delay")
.addSink(new OracleSink).setParallelism(1).slotSharingGroup("OracleSink").name("OracleSinkDelay")
but there is a problem,when the data is not delaying,the key of 1,2,3,4,5 have so much data that the backPressure is always 1,has any way to avoid this condition?
please give me some advice!thank you so much. 
Reply | Threaded
Open this post in threaded view
|

Re: some questions about data skew

Dawid Wysakowicz-2

Hi,

What you could do to improve processing of a skewed data is to introduce an artificial preaggregation. You could add some artificial uniformly distributed secondary key and calculate your aggregates on (original key, secondary uniform key) and then do the final aggregation in an additional step.

Best,

Dawid

On 06/05/2021 09:24, jester jim wrote:
Hi,
I have run a program to monitor the sum of the delay in every minutes of a stream,this is my code:
.map(new RichMapFunction[String,(Long,Int)] {
    override def map(in: String): (Long,Int) = {
      var str:String = ""
      try {
        val arr = in.split("\\|")
        ((System.currentTimeMillis()/1000 - arr(10).trim.toLong) / 60,1)
      }catch {
        case e:Exception =>{
          System.out.println("data has been dropped" + str)
          null
        }
      }
    }
  }).slotSharingGroup("kafkaSource").setParallelism(200)
    .filter(item =>item !=null && item._1 >=0).slotSharingGroup("kafkaSource").setParallelism(200)
signalSource.keyBy(f=>f._1  )
.window(TumblingProcessingTimeWindows.of(Time.minutes(5)))
  .reduce { (e1,e2) =>(e1._1,e1._2+e2._2)}.setParallelism(20).slotSharingGroup("Delay")
    .addSink(new OracleSink).setParallelism(1).slotSharingGroup("OracleSink").name("OracleSinkDelay")
but there is a problem,when the data is not delaying,the key of 1,2,3,4,5 have so much data that the backPressure is always 1,has any way to avoid this condition?
please give me some advice!thank you so much. 

OpenPGP_signature (855 bytes) Download Attachment